Skip to main content

haystack_client/
client.rs

1use std::collections::HashSet;
2
3use crate::error::ClientError;
4use crate::transport::Transport;
5use crate::transport::http::HttpTransport;
6use crate::transport::ws::WsTransport;
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::kinds::{HRef, Kind, Number};
9
10/// A client for communicating with a Haystack HTTP API server.
11///
12/// Provides typed methods for all standard Haystack ops (about, read, hisRead,
13/// etc.) as well as a generic `call` method for custom ops.
14///
15/// Supports both HTTP and WebSocket transports.
16pub struct HaystackClient<T: Transport> {
17    transport: T,
18}
19
20impl HaystackClient<HttpTransport> {
21    /// Connect to a Haystack server via HTTP, performing SCRAM authentication.
22    ///
23    /// # Arguments
24    /// * `url` - The server API root (e.g. `http://localhost:8080/api`)
25    /// * `username` - The username to authenticate as
26    /// * `password` - The user's plaintext password
27    pub async fn connect(url: &str, username: &str, password: &str) -> Result<Self, ClientError> {
28        crate::ensure_crypto_provider();
29        let client = reqwest::Client::new();
30        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
31        let transport = HttpTransport::new(url, auth_token);
32        Ok(Self { transport })
33    }
34
35    /// Connect to a Haystack server via HTTP with mutual TLS (mTLS) client
36    /// certificate authentication, then perform SCRAM authentication.
37    ///
38    /// Builds a custom `reqwest::Client` configured with the provided TLS
39    /// identity (client certificate + key) and optional CA certificate, then
40    /// runs the standard SCRAM handshake over that client.
41    ///
42    /// # Arguments
43    /// * `url` - The server API root (e.g. `https://localhost:8443/api`)
44    /// * `username` - The username to authenticate as
45    /// * `password` - The user's plaintext password
46    /// * `tls` - The mTLS configuration (cert, key, optional CA)
47    pub async fn connect_with_tls(
48        url: &str,
49        username: &str,
50        password: &str,
51        tls: &crate::tls::TlsConfig,
52    ) -> Result<Self, ClientError> {
53        crate::ensure_crypto_provider();
54        // Combine cert + key into a single PEM buffer for reqwest::Identity
55        let mut combined_pem = tls.client_cert_pem.clone();
56        combined_pem.extend_from_slice(&tls.client_key_pem);
57
58        let identity = reqwest::Identity::from_pem(&combined_pem)
59            .map_err(|e| ClientError::Connection(format!("invalid client certificate: {e}")))?;
60
61        let mut builder = reqwest::Client::builder().identity(identity);
62
63        if let Some(ref ca) = tls.ca_cert_pem {
64            let cert = reqwest::Certificate::from_pem(ca)
65                .map_err(|e| ClientError::Connection(format!("invalid CA certificate: {e}")))?;
66            builder = builder.add_root_certificate(cert);
67        }
68
69        let client = builder
70            .build()
71            .map_err(|e| ClientError::Connection(format!("TLS client build failed: {e}")))?;
72
73        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
74        let transport = HttpTransport::new(url, auth_token);
75        Ok(Self { transport })
76    }
77}
78
79impl HaystackClient<WsTransport> {
80    /// Connect to a Haystack server via WebSocket.
81    ///
82    /// Performs SCRAM authentication over HTTP first to obtain an auth token,
83    /// then establishes a WebSocket connection using that token.
84    ///
85    /// # Arguments
86    /// * `url` - The server API root for HTTP auth (e.g. `http://localhost:8080/api`)
87    /// * `ws_url` - The WebSocket URL (e.g. `ws://localhost:8080/api/ws`)
88    /// * `username` - The username to authenticate as
89    /// * `password` - The user's plaintext password
90    pub async fn connect_ws(
91        url: &str,
92        ws_url: &str,
93        username: &str,
94        password: &str,
95    ) -> Result<Self, ClientError> {
96        crate::ensure_crypto_provider();
97        // Authenticate via HTTP first to get the token
98        let client = reqwest::Client::new();
99        let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
100
101        // Connect WebSocket with the token
102        let transport = WsTransport::connect(ws_url, &auth_token).await?;
103        Ok(Self { transport })
104    }
105}
106
107impl<T: Transport> HaystackClient<T> {
108    /// Create a client with an already-configured transport.
109    ///
110    /// Useful for testing or when you have a custom [`Transport`] implementation.
111    pub fn from_transport(transport: T) -> Self {
112        Self { transport }
113    }
114
115    /// Call a raw Haystack op with a request grid.
116    ///
117    /// `op` is the operation name (e.g. `"read"`, `"hisRead"`). The server
118    /// returns an error grid (as [`ClientError::Grid`]) if the op fails.
119    pub async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, ClientError> {
120        self.transport.call(op, req).await
121    }
122
123    // -----------------------------------------------------------------------
124    // Standard ops
125    // -----------------------------------------------------------------------
126
127    /// Call the `about` op. Returns a single-row grid with server metadata
128    /// including `vendorName`, `productName`, `productVersion`, and `tz`.
129    pub async fn about(&self) -> Result<HGrid, ClientError> {
130        self.call("about", &HGrid::new()).await
131    }
132
133    /// Call the `ops` op. Returns a grid listing every operation the server
134    /// supports, with `name`, `summary`, and `doc` columns.
135    pub async fn ops(&self) -> Result<HGrid, ClientError> {
136        self.call("ops", &HGrid::new()).await
137    }
138
139    /// Call the `formats` op. Returns a grid of MIME types the server can
140    /// read/write, with `mime`, `receive`, and `send` columns.
141    pub async fn formats(&self) -> Result<HGrid, ClientError> {
142        self.call("formats", &HGrid::new()).await
143    }
144
145    /// Call the `libs` op. Returns a grid of Xeto library modules installed
146    /// on the server, including `name` and `version` for each library.
147    pub async fn libs(&self) -> Result<HGrid, ClientError> {
148        self.call("libs", &HGrid::new()).await
149    }
150
151    /// Call the `read` op with a Haystack filter expression and optional limit.
152    ///
153    /// `filter` is a Haystack filter string (e.g. `"site"`, `"equip and siteRef==@s"`,
154    /// `"temp > 72"`). Returns a grid of matching entity dicts. Pass `limit` to
155    /// cap the number of results. Returns an empty grid if nothing matches.
156    pub async fn read(&self, filter: &str, limit: Option<usize>) -> Result<HGrid, ClientError> {
157        let mut row = HDict::new();
158        row.set("filter", Kind::Str(filter.to_string()));
159        if let Some(lim) = limit {
160            row.set("limit", Kind::Number(Number::unitless(lim as f64)));
161        }
162        let cols = if limit.is_some() {
163            vec![HCol::new("filter"), HCol::new("limit")]
164        } else {
165            vec![HCol::new("filter")]
166        };
167        let grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
168        self.call("read", &grid).await
169    }
170
171    /// Call the `read` op with a list of entity ref ids.
172    ///
173    /// Each entry in `ids` is a ref value string (e.g. `"@site-1"`, `"@equip-2"`).
174    /// Returns one row per id. Rows for unknown ids contain only a `Null` marker.
175    pub async fn read_by_ids(&self, ids: &[&str]) -> Result<HGrid, ClientError> {
176        let rows: Vec<HDict> = ids
177            .iter()
178            .map(|id| {
179                let mut d = HDict::new();
180                d.set("id", Kind::Ref(HRef::from_val(*id)));
181                d
182            })
183            .collect();
184        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("id")], rows);
185        self.call("read", &grid).await
186    }
187
188    /// Call the `nav` op to walk the site/equip/point navigation tree.
189    ///
190    /// Pass `None` to get root-level nodes, or `Some(nav_id)` to expand a
191    /// specific node. Returns a grid of child navigation entries.
192    pub async fn nav(&self, nav_id: Option<&str>) -> Result<HGrid, ClientError> {
193        let mut row = HDict::new();
194        if let Some(id) = nav_id {
195            row.set("navId", Kind::Str(id.to_string()));
196        }
197        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("navId")], vec![row]);
198        self.call("nav", &grid).await
199    }
200
201    /// Call the `defs` op to query the server's tag/def ontology.
202    ///
203    /// `filter` is an optional Haystack filter (e.g. `"point"`, `"equip"`) to
204    /// narrow results. Returns a grid of matching def dicts.
205    pub async fn defs(&self, filter: Option<&str>) -> Result<HGrid, ClientError> {
206        let mut row = HDict::new();
207        if let Some(f) = filter {
208            row.set("filter", Kind::Str(f.to_string()));
209        }
210        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row]);
211        self.call("defs", &grid).await
212    }
213
214    /// Call the `watchSub` op to subscribe to real-time changes for a set of entities.
215    ///
216    /// `ids` are ref value strings (e.g. `["@point-1", "@point-2"]`). `lease` is
217    /// an optional duration string (e.g. `"1min"`, `"30sec"`, `"1hr"`) controlling
218    /// how long the server keeps the watch alive without polls. Returns a grid
219    /// whose meta contains the assigned `watchId`.
220    pub async fn watch_sub(&self, ids: &[&str], lease: Option<&str>) -> Result<HGrid, ClientError> {
221        let rows: Vec<HDict> = ids
222            .iter()
223            .map(|id| {
224                let mut d = HDict::new();
225                d.set("id", Kind::Ref(HRef::from_val(*id)));
226                d
227            })
228            .collect();
229        let mut meta = HDict::new();
230        if let Some(l) = lease {
231            meta.set("lease", Kind::Str(l.to_string()));
232        }
233        let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
234        self.call("watchSub", &grid).await
235    }
236
237    /// Call the `watchPoll` op to poll a watch for changed entity values.
238    ///
239    /// `watch_id` is the identifier returned by [`watch_sub`](Self::watch_sub).
240    /// Returns a grid of entities whose `curVal` has changed since the last poll.
241    pub async fn watch_poll(&self, watch_id: &str) -> Result<HGrid, ClientError> {
242        let mut meta = HDict::new();
243        meta.set("watchId", Kind::Str(watch_id.to_string()));
244        let grid = HGrid::from_parts(meta, vec![], vec![]);
245        self.call("watchPoll", &grid).await
246    }
247
248    /// Call the `watchUnsub` op to remove entities from an active watch.
249    ///
250    /// `watch_id` identifies the watch. `ids` are the ref value strings to
251    /// unsubscribe (e.g. `["@point-1"]`). The watch is closed automatically
252    /// when all entities have been removed.
253    pub async fn watch_unsub(&self, watch_id: &str, ids: &[&str]) -> Result<HGrid, ClientError> {
254        let rows: Vec<HDict> = ids
255            .iter()
256            .map(|id| {
257                let mut d = HDict::new();
258                d.set("id", Kind::Ref(HRef::from_val(*id)));
259                d
260            })
261            .collect();
262        let mut meta = HDict::new();
263        meta.set("watchId", Kind::Str(watch_id.to_string()));
264        let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
265        self.call("watchUnsub", &grid).await
266    }
267
268    /// Call the `pointWrite` op to write a value to a writable point.
269    ///
270    /// `id` is the point ref (e.g. `"@point-1"`). `level` is the priority
271    /// array level 1–17 per the Haystack spec (1=emergency, 8=manual,
272    /// 16=default, 17=fallback). `val` is the value to write at that level.
273    pub async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, ClientError> {
274        let mut row = HDict::new();
275        row.set("id", Kind::Ref(HRef::from_val(id)));
276        row.set("level", Kind::Number(Number::unitless(level as f64)));
277        row.set("val", val);
278        let grid = HGrid::from_parts(
279            HDict::new(),
280            vec![HCol::new("id"), HCol::new("level"), HCol::new("val")],
281            vec![row],
282        );
283        self.call("pointWrite", &grid).await
284    }
285
286    /// Call the `hisRead` op to read time-series history for a point.
287    ///
288    /// `id` is the point ref (e.g. `"@sensor-1"`). `range` is a Haystack range
289    /// string: `"today"`, `"yesterday"`, a single date like `"2024-01-01"`, a
290    /// date span `"2024-01-01,2024-01-31"`, or a datetime with timezone like
291    /// `"2024-01-01T00:00:00-05:00 New_York"`. Returns a grid of `ts`/`val` rows.
292    pub async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, ClientError> {
293        let mut row = HDict::new();
294        row.set("id", Kind::Ref(HRef::from_val(id)));
295        row.set("range", Kind::Str(range.to_string()));
296        let grid = HGrid::from_parts(
297            HDict::new(),
298            vec![HCol::new("id"), HCol::new("range")],
299            vec![row],
300        );
301        self.call("hisRead", &grid).await
302    }
303
304    /// Call the `hisWrite` op to write time-series samples for a point.
305    ///
306    /// `id` is the point ref. `items` must be dicts each containing a `ts`
307    /// (DateTime) and `val` tag. Returns an empty grid on success or an
308    /// error grid if the write is rejected.
309    pub async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, ClientError> {
310        let mut meta = HDict::new();
311        meta.set("id", Kind::Ref(HRef::from_val(id)));
312        let grid = HGrid::from_parts(meta, vec![HCol::new("ts"), HCol::new("val")], items);
313        self.call("hisWrite", &grid).await
314    }
315
316    /// Call the `invokeAction` op to invoke a named action on an entity.
317    ///
318    /// `id` is the target entity ref. `action` is the action name. `args` is
319    /// an [`HDict`] of additional parameters for the action. Returns the
320    /// action's result grid.
321    pub async fn invoke_action(
322        &self,
323        id: &str,
324        action: &str,
325        args: HDict,
326    ) -> Result<HGrid, ClientError> {
327        let mut row = args;
328        row.set("id", Kind::Ref(HRef::from_val(id)));
329        row.set("action", Kind::Str(action.to_string()));
330        let grid = HGrid::from_parts(
331            HDict::new(),
332            vec![HCol::new("id"), HCol::new("action")],
333            vec![row],
334        );
335        self.call("invokeAction", &grid).await
336    }
337
338    /// Call the `close` op to close the current server session.
339    ///
340    /// This is distinct from [`close`](Self::close) which shuts down the
341    /// underlying transport connection.
342    pub async fn close_session(&self) -> Result<HGrid, ClientError> {
343        self.call("close", &HGrid::new()).await
344    }
345
346    // -----------------------------------------------------------------------
347    // Library & spec management ops
348    // -----------------------------------------------------------------------
349
350    /// List all Xeto specs on the server, optionally filtered by library name.
351    ///
352    /// Pass `None` to list specs across all libraries, or `Some("lib_name")`
353    /// to restrict to a single library. Returns a grid of spec dicts.
354    pub async fn specs(&self, lib: Option<&str>) -> Result<HGrid, ClientError> {
355        let mut row = HDict::new();
356        if let Some(lib_name) = lib {
357            row.set("lib", Kind::Str(lib_name.to_string()));
358        }
359        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("lib")], vec![row]);
360        self.call("specs", &grid).await
361    }
362
363    /// Get a single Xeto spec by its fully-qualified name.
364    ///
365    /// `qname` is the qualified spec name (e.g. `"ph::Site"`). Returns a
366    /// single-row grid with the spec definition.
367    pub async fn spec(&self, qname: &str) -> Result<HGrid, ClientError> {
368        let mut row = HDict::new();
369        row.set("qname", Kind::Str(qname.to_string()));
370        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("qname")], vec![row]);
371        self.call("spec", &grid).await
372    }
373
374    /// Load a Xeto library from source text into the server's ontology.
375    ///
376    /// `name` is the library name and `source` is the raw Xeto source code.
377    /// Returns an error grid if the source fails to parse or validate.
378    pub async fn load_lib(&self, name: &str, source: &str) -> Result<HGrid, ClientError> {
379        let mut row = HDict::new();
380        row.set("name", Kind::Str(name.to_string()));
381        row.set("source", Kind::Str(source.to_string()));
382        let grid = HGrid::from_parts(
383            HDict::new(),
384            vec![HCol::new("name"), HCol::new("source")],
385            vec![row],
386        );
387        self.call("loadLib", &grid).await
388    }
389
390    /// Unload a previously loaded Xeto library by name.
391    ///
392    /// Removes the library and its specs from the server's active ontology.
393    pub async fn unload_lib(&self, name: &str) -> Result<HGrid, ClientError> {
394        let mut row = HDict::new();
395        row.set("name", Kind::Str(name.to_string()));
396        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
397        self.call("unloadLib", &grid).await
398    }
399
400    /// Export a library as Xeto source text.
401    ///
402    /// Returns a grid whose first row contains the serialized Xeto source
403    /// for the named library.
404    pub async fn export_lib(&self, name: &str) -> Result<HGrid, ClientError> {
405        let mut row = HDict::new();
406        row.set("name", Kind::Str(name.to_string()));
407        let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
408        self.call("exportLib", &grid).await
409    }
410
411    /// Validate entity dicts against the server's Xeto ontology.
412    ///
413    /// `entities` is a list of [`HDict`] records to check. Returns a grid
414    /// with validation results — each row reports errors for the corresponding
415    /// input entity. An empty result grid means all entities are valid.
416    pub async fn validate(&self, entities: Vec<HDict>) -> Result<HGrid, ClientError> {
417        // Build column set from all entities
418        let mut col_names: Vec<String> = Vec::new();
419        let mut seen = HashSet::new();
420        for entity in &entities {
421            for name in entity.tag_names() {
422                if seen.insert(name.to_string()) {
423                    col_names.push(name.to_string());
424                }
425            }
426        }
427        col_names.sort();
428        let cols: Vec<HCol> = col_names.iter().map(|n| HCol::new(n.as_str())).collect();
429        let grid = HGrid::from_parts(HDict::new(), cols, entities);
430        self.call("validate", &grid).await
431    }
432
433    /// Close the underlying transport connection (HTTP or WebSocket).
434    ///
435    /// Call [`close_session`](Self::close_session) first if you want to
436    /// explicitly end the server-side session before disconnecting.
437    pub async fn close(&self) -> Result<(), ClientError> {
438        self.transport.close().await
439    }
440}