Skip to main content

haystack_client/
client.rs

1use std::collections::HashSet;
2
3use crate::error::ClientError;
4use crate::transport::Transport;
5use crate::transport::http::HttpTransport;
6use crate::transport::ws::WsTransport;
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::kinds::{HRef, Kind, Number};
9
10/// A client for communicating with a Haystack HTTP API server.
11///
12/// Provides typed methods for all standard Haystack ops (about, read, hisRead,
13/// etc.) as well as a generic `call` method for custom ops.
14///
15/// Supports both HTTP and WebSocket transports.
16pub struct HaystackClient<T: Transport> {
17    transport: T,
18}
19
20impl HaystackClient<HttpTransport> {
21    /// Connect to a Haystack server via HTTP, performing SCRAM authentication.
22    ///
23    /// # Arguments
24    /// * `url` - The server API root (e.g. `http://localhost:8080/api`)
25    /// * `username` - The username to authenticate as
26    /// * `password` - The user's plaintext password
27    pub async fn connect(url: &str, username: &str, password: &str) -> Result<Self, ClientError> {
28        let client = reqwest::Client::new();
29        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
30        let transport = HttpTransport::new(url, auth_token);
31        Ok(Self { transport })
32    }
33
34    /// Connect to a Haystack server via HTTP with mutual TLS (mTLS) client
35    /// certificate authentication, then perform SCRAM authentication.
36    ///
37    /// Builds a custom `reqwest::Client` configured with the provided TLS
38    /// identity (client certificate + key) and optional CA certificate, then
39    /// runs the standard SCRAM handshake over that client.
40    ///
41    /// # Arguments
42    /// * `url` - The server API root (e.g. `https://localhost:8443/api`)
43    /// * `username` - The username to authenticate as
44    /// * `password` - The user's plaintext password
45    /// * `tls` - The mTLS configuration (cert, key, optional CA)
46    pub async fn connect_with_tls(
47        url: &str,
48        username: &str,
49        password: &str,
50        tls: &crate::tls::TlsConfig,
51    ) -> Result<Self, ClientError> {
52        // Combine cert + key into a single PEM buffer for reqwest::Identity
53        let mut combined_pem = tls.client_cert_pem.clone();
54        combined_pem.extend_from_slice(&tls.client_key_pem);
55
56        let identity = reqwest::Identity::from_pem(&combined_pem)
57            .map_err(|e| ClientError::Connection(format!("invalid client certificate: {e}")))?;
58
59        let mut builder = reqwest::Client::builder().identity(identity);
60
61        if let Some(ref ca) = tls.ca_cert_pem {
62            let cert = reqwest::Certificate::from_pem(ca)
63                .map_err(|e| ClientError::Connection(format!("invalid CA certificate: {e}")))?;
64            builder = builder.add_root_certificate(cert);
65        }
66
67        let client = builder
68            .build()
69            .map_err(|e| ClientError::Connection(format!("TLS client build failed: {e}")))?;
70
71        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
72        let transport = HttpTransport::new(url, auth_token);
73        Ok(Self { transport })
74    }
75}
76
77impl HaystackClient<WsTransport> {
78    /// Connect to a Haystack server via WebSocket.
79    ///
80    /// Performs SCRAM authentication over HTTP first to obtain an auth token,
81    /// then establishes a WebSocket connection using that token.
82    ///
83    /// # Arguments
84    /// * `url` - The server API root for HTTP auth (e.g. `http://localhost:8080/api`)
85    /// * `ws_url` - The WebSocket URL (e.g. `ws://localhost:8080/api/ws`)
86    /// * `username` - The username to authenticate as
87    /// * `password` - The user's plaintext password
88    pub async fn connect_ws(
89        url: &str,
90        ws_url: &str,
91        username: &str,
92        password: &str,
93    ) -> Result<Self, ClientError> {
94        // Authenticate via HTTP first to get the token
95        let client = reqwest::Client::new();
96        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
97
98        // Connect WebSocket with the token
99        let transport = WsTransport::connect(ws_url, &auth_token).await?;
100        Ok(Self { transport })
101    }
102}
103
104impl<T: Transport> HaystackClient<T> {
105    /// Create a client with an already-configured transport.
106    ///
107    /// Useful for testing or when you have a custom [`Transport`] implementation.
108    pub fn from_transport(transport: T) -> Self {
109        Self { transport }
110    }
111
112    /// Call a raw Haystack op with a request grid.
113    ///
114    /// `op` is the operation name (e.g. `"read"`, `"hisRead"`). The server
115    /// returns an error grid (as [`ClientError::Grid`]) if the op fails.
116    pub async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, ClientError> {
117        self.transport.call(op, req).await
118    }
119
120    // -----------------------------------------------------------------------
121    // Standard ops
122    // -----------------------------------------------------------------------
123
124    /// Call the `about` op. Returns a single-row grid with server metadata
125    /// including `vendorName`, `productName`, `productVersion`, and `tz`.
126    pub async fn about(&self) -> Result<HGrid, ClientError> {
127        self.call("about", &HGrid::new()).await
128    }
129
130    /// Call the `ops` op. Returns a grid listing every operation the server
131    /// supports, with `name`, `summary`, and `doc` columns.
132    pub async fn ops(&self) -> Result<HGrid, ClientError> {
133        self.call("ops", &HGrid::new()).await
134    }
135
136    /// Call the `formats` op. Returns a grid of MIME types the server can
137    /// read/write, with `mime`, `receive`, and `send` columns.
138    pub async fn formats(&self) -> Result<HGrid, ClientError> {
139        self.call("formats", &HGrid::new()).await
140    }
141
142    /// Call the `libs` op. Returns a grid of Xeto library modules installed
143    /// on the server, including `name` and `version` for each library.
144    pub async fn libs(&self) -> Result<HGrid, ClientError> {
145        self.call("libs", &HGrid::new()).await
146    }
147
148    /// Call the `read` op with a Haystack filter expression and optional limit.
149    ///
150    /// `filter` is a Haystack filter string (e.g. `"site"`, `"equip and siteRef==@s"`,
151    /// `"temp > 72"`). Returns a grid of matching entity dicts. Pass `limit` to
152    /// cap the number of results. Returns an empty grid if nothing matches.
153    pub async fn read(&self, filter: &str, limit: Option<usize>) -> Result<HGrid, ClientError> {
154        let mut row = HDict::new();
155        row.set("filter", Kind::Str(filter.to_string()));
156        if let Some(lim) = limit {
157            row.set("limit", Kind::Number(Number::unitless(lim as f64)));
158        }
159        let cols = if limit.is_some() {
160            vec![HCol::new("filter"), HCol::new("limit")]
161        } else {
162            vec![HCol::new("filter")]
163        };
164        let grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
165        self.call("read", &grid).await
166    }
167
168    /// Call the `read` op with a list of entity ref ids.
169    ///
170    /// Each entry in `ids` is a ref value string (e.g. `"@site-1"`, `"@equip-2"`).
171    /// Returns one row per id. Rows for unknown ids contain only a `Null` marker.
172    pub async fn read_by_ids(&self, ids: &[&str]) -> Result<HGrid, ClientError> {
173        let rows: Vec<HDict> = ids
174            .iter()
175            .map(|id| {
176                let mut d = HDict::new();
177                d.set("id", Kind::Ref(HRef::from_val(*id)));
178                d
179            })
180            .collect();
181        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("id")], rows);
182        self.call("read", &grid).await
183    }
184
185    /// Call the `nav` op to walk the site/equip/point navigation tree.
186    ///
187    /// Pass `None` to get root-level nodes, or `Some(nav_id)` to expand a
188    /// specific node. Returns a grid of child navigation entries.
189    pub async fn nav(&self, nav_id: Option<&str>) -> Result<HGrid, ClientError> {
190        let mut row = HDict::new();
191        if let Some(id) = nav_id {
192            row.set("navId", Kind::Str(id.to_string()));
193        }
194        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("navId")], vec![row]);
195        self.call("nav", &grid).await
196    }
197
198    /// Call the `defs` op to query the server's tag/def ontology.
199    ///
200    /// `filter` is an optional Haystack filter (e.g. `"point"`, `"equip"`) to
201    /// narrow results. Returns a grid of matching def dicts.
202    pub async fn defs(&self, filter: Option<&str>) -> Result<HGrid, ClientError> {
203        let mut row = HDict::new();
204        if let Some(f) = filter {
205            row.set("filter", Kind::Str(f.to_string()));
206        }
207        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row]);
208        self.call("defs", &grid).await
209    }
210
211    /// Call the `watchSub` op to subscribe to real-time changes for a set of entities.
212    ///
213    /// `ids` are ref value strings (e.g. `["@point-1", "@point-2"]`). `lease` is
214    /// an optional duration string (e.g. `"1min"`, `"30sec"`, `"1hr"`) controlling
215    /// how long the server keeps the watch alive without polls. Returns a grid
216    /// whose meta contains the assigned `watchId`.
217    pub async fn watch_sub(&self, ids: &[&str], lease: Option<&str>) -> Result<HGrid, ClientError> {
218        let rows: Vec<HDict> = ids
219            .iter()
220            .map(|id| {
221                let mut d = HDict::new();
222                d.set("id", Kind::Ref(HRef::from_val(*id)));
223                d
224            })
225            .collect();
226        let mut meta = HDict::new();
227        if let Some(l) = lease {
228            meta.set("lease", Kind::Str(l.to_string()));
229        }
230        let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
231        self.call("watchSub", &grid).await
232    }
233
234    /// Call the `watchPoll` op to poll a watch for changed entity values.
235    ///
236    /// `watch_id` is the identifier returned by [`watch_sub`](Self::watch_sub).
237    /// Returns a grid of entities whose `curVal` has changed since the last poll.
238    pub async fn watch_poll(&self, watch_id: &str) -> Result<HGrid, ClientError> {
239        let mut meta = HDict::new();
240        meta.set("watchId", Kind::Str(watch_id.to_string()));
241        let grid = HGrid::from_parts(meta, vec![], vec![]);
242        self.call("watchPoll", &grid).await
243    }
244
245    /// Call the `watchUnsub` op to remove entities from an active watch.
246    ///
247    /// `watch_id` identifies the watch. `ids` are the ref value strings to
248    /// unsubscribe (e.g. `["@point-1"]`). The watch is closed automatically
249    /// when all entities have been removed.
250    pub async fn watch_unsub(&self, watch_id: &str, ids: &[&str]) -> Result<HGrid, ClientError> {
251        let rows: Vec<HDict> = ids
252            .iter()
253            .map(|id| {
254                let mut d = HDict::new();
255                d.set("id", Kind::Ref(HRef::from_val(*id)));
256                d
257            })
258            .collect();
259        let mut meta = HDict::new();
260        meta.set("watchId", Kind::Str(watch_id.to_string()));
261        let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
262        self.call("watchUnsub", &grid).await
263    }
264
265    /// Call the `pointWrite` op to write a value to a writable point.
266    ///
267    /// `id` is the point ref (e.g. `"@point-1"`). `level` is the priority
268    /// array level 1–17 per the Haystack spec (1=emergency, 8=manual,
269    /// 16=default, 17=fallback). `val` is the value to write at that level.
270    pub async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, ClientError> {
271        let mut row = HDict::new();
272        row.set("id", Kind::Ref(HRef::from_val(id)));
273        row.set("level", Kind::Number(Number::unitless(level as f64)));
274        row.set("val", val);
275        let grid = HGrid::from_parts(
276            HDict::new(),
277            vec![HCol::new("id"), HCol::new("level"), HCol::new("val")],
278            vec![row],
279        );
280        self.call("pointWrite", &grid).await
281    }
282
283    /// Call the `hisRead` op to read time-series history for a point.
284    ///
285    /// `id` is the point ref (e.g. `"@sensor-1"`). `range` is a Haystack range
286    /// string: `"today"`, `"yesterday"`, a single date like `"2024-01-01"`, a
287    /// date span `"2024-01-01,2024-01-31"`, or a datetime with timezone like
288    /// `"2024-01-01T00:00:00-05:00 New_York"`. Returns a grid of `ts`/`val` rows.
289    pub async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, ClientError> {
290        let mut row = HDict::new();
291        row.set("id", Kind::Ref(HRef::from_val(id)));
292        row.set("range", Kind::Str(range.to_string()));
293        let grid = HGrid::from_parts(
294            HDict::new(),
295            vec![HCol::new("id"), HCol::new("range")],
296            vec![row],
297        );
298        self.call("hisRead", &grid).await
299    }
300
301    /// Call the `hisWrite` op to write time-series samples for a point.
302    ///
303    /// `id` is the point ref. `items` must be dicts each containing a `ts`
304    /// (DateTime) and `val` tag. Returns an empty grid on success or an
305    /// error grid if the write is rejected.
306    pub async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, ClientError> {
307        let mut meta = HDict::new();
308        meta.set("id", Kind::Ref(HRef::from_val(id)));
309        let grid = HGrid::from_parts(meta, vec![HCol::new("ts"), HCol::new("val")], items);
310        self.call("hisWrite", &grid).await
311    }
312
313    /// Call the `invokeAction` op to invoke a named action on an entity.
314    ///
315    /// `id` is the target entity ref. `action` is the action name. `args` is
316    /// an [`HDict`] of additional parameters for the action. Returns the
317    /// action's result grid.
318    pub async fn invoke_action(
319        &self,
320        id: &str,
321        action: &str,
322        args: HDict,
323    ) -> Result<HGrid, ClientError> {
324        let mut row = args;
325        row.set("id", Kind::Ref(HRef::from_val(id)));
326        row.set("action", Kind::Str(action.to_string()));
327        let grid = HGrid::from_parts(
328            HDict::new(),
329            vec![HCol::new("id"), HCol::new("action")],
330            vec![row],
331        );
332        self.call("invokeAction", &grid).await
333    }
334
335    /// Call the `close` op to close the current server session.
336    ///
337    /// This is distinct from [`close`](Self::close) which shuts down the
338    /// underlying transport connection.
339    pub async fn close_session(&self) -> Result<HGrid, ClientError> {
340        self.call("close", &HGrid::new()).await
341    }
342
343    // -----------------------------------------------------------------------
344    // Library & spec management ops
345    // -----------------------------------------------------------------------
346
347    /// List all Xeto specs on the server, optionally filtered by library name.
348    ///
349    /// Pass `None` to list specs across all libraries, or `Some("lib_name")`
350    /// to restrict to a single library. Returns a grid of spec dicts.
351    pub async fn specs(&self, lib: Option<&str>) -> Result<HGrid, ClientError> {
352        let mut row = HDict::new();
353        if let Some(lib_name) = lib {
354            row.set("lib", Kind::Str(lib_name.to_string()));
355        }
356        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("lib")], vec![row]);
357        self.call("specs", &grid).await
358    }
359
360    /// Get a single Xeto spec by its fully-qualified name.
361    ///
362    /// `qname` is the qualified spec name (e.g. `"ph::Site"`). Returns a
363    /// single-row grid with the spec definition.
364    pub async fn spec(&self, qname: &str) -> Result<HGrid, ClientError> {
365        let mut row = HDict::new();
366        row.set("qname", Kind::Str(qname.to_string()));
367        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("qname")], vec![row]);
368        self.call("spec", &grid).await
369    }
370
371    /// Load a Xeto library from source text into the server's ontology.
372    ///
373    /// `name` is the library name and `source` is the raw Xeto source code.
374    /// Returns an error grid if the source fails to parse or validate.
375    pub async fn load_lib(&self, name: &str, source: &str) -> Result<HGrid, ClientError> {
376        let mut row = HDict::new();
377        row.set("name", Kind::Str(name.to_string()));
378        row.set("source", Kind::Str(source.to_string()));
379        let grid = HGrid::from_parts(
380            HDict::new(),
381            vec![HCol::new("name"), HCol::new("source")],
382            vec![row],
383        );
384        self.call("loadLib", &grid).await
385    }
386
387    /// Unload a previously loaded Xeto library by name.
388    ///
389    /// Removes the library and its specs from the server's active ontology.
390    pub async fn unload_lib(&self, name: &str) -> Result<HGrid, ClientError> {
391        let mut row = HDict::new();
392        row.set("name", Kind::Str(name.to_string()));
393        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
394        self.call("unloadLib", &grid).await
395    }
396
397    /// Export a library as Xeto source text.
398    ///
399    /// Returns a grid whose first row contains the serialized Xeto source
400    /// for the named library.
401    pub async fn export_lib(&self, name: &str) -> Result<HGrid, ClientError> {
402        let mut row = HDict::new();
403        row.set("name", Kind::Str(name.to_string()));
404        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
405        self.call("exportLib", &grid).await
406    }
407
408    /// Validate entity dicts against the server's Xeto ontology.
409    ///
410    /// `entities` is a list of [`HDict`] records to check. Returns a grid
411    /// with validation results — each row reports errors for the corresponding
412    /// input entity. An empty result grid means all entities are valid.
413    pub async fn validate(&self, entities: Vec<HDict>) -> Result<HGrid, ClientError> {
414        // Build column set from all entities
415        let mut col_names: Vec<String> = Vec::new();
416        let mut seen = HashSet::new();
417        for entity in &entities {
418            for name in entity.tag_names() {
419                if seen.insert(name.to_string()) {
420                    col_names.push(name.to_string());
421                }
422            }
423        }
424        col_names.sort();
425        let cols: Vec<HCol> = col_names.iter().map(|n| HCol::new(n.as_str())).collect();
426        let grid = HGrid::from_parts(HDict::new(), cols, entities);
427        self.call("validate", &grid).await
428    }
429
430    /// Close the underlying transport connection (HTTP or WebSocket).
431    ///
432    /// Call [`close_session`](Self::close_session) first if you want to
433    /// explicitly end the server-side session before disconnecting.
434    pub async fn close(&self) -> Result<(), ClientError> {
435        self.transport.close().await
436    }
437}