haystack_client/client.rs
1use std::collections::HashSet;
2
3use crate::error::ClientError;
4use crate::transport::Transport;
5use crate::transport::http::HttpTransport;
6use crate::transport::ws::WsTransport;
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::kinds::{HRef, Kind, Number};
9
10/// A client for communicating with a Haystack HTTP API server.
11///
12/// Provides typed methods for all standard Haystack ops (about, read, hisRead,
13/// etc.) as well as a generic `call` method for custom ops.
14///
15/// Supports both HTTP and WebSocket transports.
16pub struct HaystackClient<T: Transport> {
17 transport: T,
18}
19
20impl HaystackClient<HttpTransport> {
21 /// Connect to a Haystack server via HTTP, performing SCRAM authentication.
22 ///
23 /// # Arguments
24 /// * `url` - The server API root (e.g. `http://localhost:8080/api`)
25 /// * `username` - The username to authenticate as
26 /// * `password` - The user's plaintext password
27 pub async fn connect(url: &str, username: &str, password: &str) -> Result<Self, ClientError> {
28 crate::ensure_crypto_provider();
29 let client = reqwest::Client::new();
30 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
31 let transport = HttpTransport::new(url, auth_token);
32 Ok(Self { transport })
33 }
34
35 /// Connect to a Haystack server via HTTP with mutual TLS (mTLS) client
36 /// certificate authentication, then perform SCRAM authentication.
37 ///
38 /// Builds a custom `reqwest::Client` configured with the provided TLS
39 /// identity (client certificate + key) and optional CA certificate, then
40 /// runs the standard SCRAM handshake over that client.
41 ///
42 /// # Arguments
43 /// * `url` - The server API root (e.g. `https://localhost:8443/api`)
44 /// * `username` - The username to authenticate as
45 /// * `password` - The user's plaintext password
46 /// * `tls` - The mTLS configuration (cert, key, optional CA)
47 pub async fn connect_with_tls(
48 url: &str,
49 username: &str,
50 password: &str,
51 tls: &crate::tls::TlsConfig,
52 ) -> Result<Self, ClientError> {
53 crate::ensure_crypto_provider();
54 // Combine cert + key into a single PEM buffer for reqwest::Identity
55 let mut combined_pem = tls.client_cert_pem.clone();
56 combined_pem.extend_from_slice(&tls.client_key_pem);
57
58 let identity = reqwest::Identity::from_pem(&combined_pem)
59 .map_err(|e| ClientError::Connection(format!("invalid client certificate: {e}")))?;
60
61 let mut builder = reqwest::Client::builder().identity(identity);
62
63 if let Some(ref ca) = tls.ca_cert_pem {
64 let cert = reqwest::Certificate::from_pem(ca)
65 .map_err(|e| ClientError::Connection(format!("invalid CA certificate: {e}")))?;
66 builder = builder.add_root_certificate(cert);
67 }
68
69 let client = builder
70 .build()
71 .map_err(|e| ClientError::Connection(format!("TLS client build failed: {e}")))?;
72
73 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
74 let transport = HttpTransport::new(url, auth_token);
75 Ok(Self { transport })
76 }
77}
78
79impl HaystackClient<WsTransport> {
80 /// Connect to a Haystack server via WebSocket.
81 ///
82 /// Performs SCRAM authentication over HTTP first to obtain an auth token,
83 /// then establishes a WebSocket connection using that token.
84 ///
85 /// # Arguments
86 /// * `url` - The server API root for HTTP auth (e.g. `http://localhost:8080/api`)
87 /// * `ws_url` - The WebSocket URL (e.g. `ws://localhost:8080/api/ws`)
88 /// * `username` - The username to authenticate as
89 /// * `password` - The user's plaintext password
90 pub async fn connect_ws(
91 url: &str,
92 ws_url: &str,
93 username: &str,
94 password: &str,
95 ) -> Result<Self, ClientError> {
96 crate::ensure_crypto_provider();
97 // Authenticate via HTTP first to get the token
98 let client = reqwest::Client::new();
99 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
100
101 // Connect WebSocket with the token
102 let transport = WsTransport::connect(ws_url, &auth_token).await?;
103 Ok(Self { transport })
104 }
105}
106
107impl<T: Transport> HaystackClient<T> {
108 /// Create a client with an already-configured transport.
109 ///
110 /// Useful for testing or when you have a custom [`Transport`] implementation.
111 pub fn from_transport(transport: T) -> Self {
112 Self { transport }
113 }
114
115 /// Call a raw Haystack op with a request grid.
116 ///
117 /// `op` is the operation name (e.g. `"read"`, `"hisRead"`). The server
118 /// returns an error grid (as [`ClientError::Grid`]) if the op fails.
119 pub async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, ClientError> {
120 self.transport.call(op, req).await
121 }
122
123 // -----------------------------------------------------------------------
124 // Standard ops
125 // -----------------------------------------------------------------------
126
127 /// Call the `about` op. Returns a single-row grid with server metadata
128 /// including `vendorName`, `productName`, `productVersion`, and `tz`.
129 pub async fn about(&self) -> Result<HGrid, ClientError> {
130 self.call("about", &HGrid::new()).await
131 }
132
133 /// Call the `ops` op. Returns a grid listing every operation the server
134 /// supports, with `name`, `summary`, and `doc` columns.
135 pub async fn ops(&self) -> Result<HGrid, ClientError> {
136 self.call("ops", &HGrid::new()).await
137 }
138
139 /// Call the `formats` op. Returns a grid of MIME types the server can
140 /// read/write, with `mime`, `receive`, and `send` columns.
141 pub async fn formats(&self) -> Result<HGrid, ClientError> {
142 self.call("formats", &HGrid::new()).await
143 }
144
145 /// Call the `libs` op. Returns a grid of Xeto library modules installed
146 /// on the server, including `name` and `version` for each library.
147 pub async fn libs(&self) -> Result<HGrid, ClientError> {
148 self.call("libs", &HGrid::new()).await
149 }
150
151 /// Call the `read` op with a Haystack filter expression and optional limit.
152 ///
153 /// `filter` is a Haystack filter string (e.g. `"site"`, `"equip and siteRef==@s"`,
154 /// `"temp > 72"`). Returns a grid of matching entity dicts. Pass `limit` to
155 /// cap the number of results. Returns an empty grid if nothing matches.
156 pub async fn read(&self, filter: &str, limit: Option<usize>) -> Result<HGrid, ClientError> {
157 let mut row = HDict::new();
158 row.set("filter", Kind::Str(filter.to_string()));
159 if let Some(lim) = limit {
160 row.set("limit", Kind::Number(Number::unitless(lim as f64)));
161 }
162 let cols = if limit.is_some() {
163 vec![HCol::new("filter"), HCol::new("limit")]
164 } else {
165 vec![HCol::new("filter")]
166 };
167 let grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
168 self.call("read", &grid).await
169 }
170
171 /// Call the `read` op with a list of entity ref ids.
172 ///
173 /// Each entry in `ids` is a ref value string (e.g. `"@site-1"`, `"@equip-2"`).
174 /// Returns one row per id. Rows for unknown ids contain only a `Null` marker.
175 pub async fn read_by_ids(&self, ids: &[&str]) -> Result<HGrid, ClientError> {
176 let rows: Vec<HDict> = ids
177 .iter()
178 .map(|id| {
179 let mut d = HDict::new();
180 d.set("id", Kind::Ref(HRef::from_val(*id)));
181 d
182 })
183 .collect();
184 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("id")], rows);
185 self.call("read", &grid).await
186 }
187
188 /// Call the `nav` op to walk the site/equip/point navigation tree.
189 ///
190 /// Pass `None` to get root-level nodes, or `Some(nav_id)` to expand a
191 /// specific node. Returns a grid of child navigation entries.
192 pub async fn nav(&self, nav_id: Option<&str>) -> Result<HGrid, ClientError> {
193 let mut row = HDict::new();
194 if let Some(id) = nav_id {
195 row.set("navId", Kind::Str(id.to_string()));
196 }
197 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("navId")], vec![row]);
198 self.call("nav", &grid).await
199 }
200
201 /// Call the `defs` op to query the server's tag/def ontology.
202 ///
203 /// `filter` is an optional Haystack filter (e.g. `"point"`, `"equip"`) to
204 /// narrow results. Returns a grid of matching def dicts.
205 pub async fn defs(&self, filter: Option<&str>) -> Result<HGrid, ClientError> {
206 let mut row = HDict::new();
207 if let Some(f) = filter {
208 row.set("filter", Kind::Str(f.to_string()));
209 }
210 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row]);
211 self.call("defs", &grid).await
212 }
213
214 /// Call the `watchSub` op to subscribe to real-time changes for a set of entities.
215 ///
216 /// `ids` are ref value strings (e.g. `["@point-1", "@point-2"]`). `lease` is
217 /// an optional duration string (e.g. `"1min"`, `"30sec"`, `"1hr"`) controlling
218 /// how long the server keeps the watch alive without polls. Returns a grid
219 /// whose meta contains the assigned `watchId`.
220 pub async fn watch_sub(&self, ids: &[&str], lease: Option<&str>) -> Result<HGrid, ClientError> {
221 let rows: Vec<HDict> = ids
222 .iter()
223 .map(|id| {
224 let mut d = HDict::new();
225 d.set("id", Kind::Ref(HRef::from_val(*id)));
226 d
227 })
228 .collect();
229 let mut meta = HDict::new();
230 if let Some(l) = lease {
231 meta.set("lease", Kind::Str(l.to_string()));
232 }
233 let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
234 self.call("watchSub", &grid).await
235 }
236
237 /// Call the `watchPoll` op to poll a watch for changed entity values.
238 ///
239 /// `watch_id` is the identifier returned by [`watch_sub`](Self::watch_sub).
240 /// Returns a grid of entities whose `curVal` has changed since the last poll.
241 pub async fn watch_poll(&self, watch_id: &str) -> Result<HGrid, ClientError> {
242 let mut meta = HDict::new();
243 meta.set("watchId", Kind::Str(watch_id.to_string()));
244 let grid = HGrid::from_parts(meta, vec![], vec![]);
245 self.call("watchPoll", &grid).await
246 }
247
248 /// Call the `watchUnsub` op to remove entities from an active watch.
249 ///
250 /// `watch_id` identifies the watch. `ids` are the ref value strings to
251 /// unsubscribe (e.g. `["@point-1"]`). The watch is closed automatically
252 /// when all entities have been removed.
253 pub async fn watch_unsub(&self, watch_id: &str, ids: &[&str]) -> Result<HGrid, ClientError> {
254 let rows: Vec<HDict> = ids
255 .iter()
256 .map(|id| {
257 let mut d = HDict::new();
258 d.set("id", Kind::Ref(HRef::from_val(*id)));
259 d
260 })
261 .collect();
262 let mut meta = HDict::new();
263 meta.set("watchId", Kind::Str(watch_id.to_string()));
264 let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
265 self.call("watchUnsub", &grid).await
266 }
267
268 /// Call the `pointWrite` op to write a value to a writable point.
269 ///
270 /// `id` is the point ref (e.g. `"@point-1"`). `level` is the priority
271 /// array level 1–17 per the Haystack spec (1=emergency, 8=manual,
272 /// 16=default, 17=fallback). `val` is the value to write at that level.
273 pub async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, ClientError> {
274 let mut row = HDict::new();
275 row.set("id", Kind::Ref(HRef::from_val(id)));
276 row.set("level", Kind::Number(Number::unitless(level as f64)));
277 row.set("val", val);
278 let grid = HGrid::from_parts(
279 HDict::new(),
280 vec![HCol::new("id"), HCol::new("level"), HCol::new("val")],
281 vec![row],
282 );
283 self.call("pointWrite", &grid).await
284 }
285
286 /// Call the `hisRead` op to read time-series history for a point.
287 ///
288 /// `id` is the point ref (e.g. `"@sensor-1"`). `range` is a Haystack range
289 /// string: `"today"`, `"yesterday"`, a single date like `"2024-01-01"`, a
290 /// date span `"2024-01-01,2024-01-31"`, or a datetime with timezone like
291 /// `"2024-01-01T00:00:00-05:00 New_York"`. Returns a grid of `ts`/`val` rows.
292 pub async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, ClientError> {
293 let mut row = HDict::new();
294 row.set("id", Kind::Ref(HRef::from_val(id)));
295 row.set("range", Kind::Str(range.to_string()));
296 let grid = HGrid::from_parts(
297 HDict::new(),
298 vec![HCol::new("id"), HCol::new("range")],
299 vec![row],
300 );
301 self.call("hisRead", &grid).await
302 }
303
304 /// Call the `hisWrite` op to write time-series samples for a point.
305 ///
306 /// `id` is the point ref. `items` must be dicts each containing a `ts`
307 /// (DateTime) and `val` tag. Returns an empty grid on success or an
308 /// error grid if the write is rejected.
309 pub async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, ClientError> {
310 let mut meta = HDict::new();
311 meta.set("id", Kind::Ref(HRef::from_val(id)));
312 let grid = HGrid::from_parts(meta, vec![HCol::new("ts"), HCol::new("val")], items);
313 self.call("hisWrite", &grid).await
314 }
315
316 /// Call the `invokeAction` op to invoke a named action on an entity.
317 ///
318 /// `id` is the target entity ref. `action` is the action name. `args` is
319 /// an [`HDict`] of additional parameters for the action. Returns the
320 /// action's result grid.
321 pub async fn invoke_action(
322 &self,
323 id: &str,
324 action: &str,
325 args: HDict,
326 ) -> Result<HGrid, ClientError> {
327 let mut row = args;
328 row.set("id", Kind::Ref(HRef::from_val(id)));
329 row.set("action", Kind::Str(action.to_string()));
330 let grid = HGrid::from_parts(
331 HDict::new(),
332 vec![HCol::new("id"), HCol::new("action")],
333 vec![row],
334 );
335 self.call("invokeAction", &grid).await
336 }
337
338 /// Call the `close` op to close the current server session.
339 ///
340 /// This is distinct from [`close`](Self::close) which shuts down the
341 /// underlying transport connection.
342 pub async fn close_session(&self) -> Result<HGrid, ClientError> {
343 self.call("close", &HGrid::new()).await
344 }
345
346 // -----------------------------------------------------------------------
347 // Library & spec management ops
348 // -----------------------------------------------------------------------
349
350 /// List all Xeto specs on the server, optionally filtered by library name.
351 ///
352 /// Pass `None` to list specs across all libraries, or `Some("lib_name")`
353 /// to restrict to a single library. Returns a grid of spec dicts.
354 pub async fn specs(&self, lib: Option<&str>) -> Result<HGrid, ClientError> {
355 let mut row = HDict::new();
356 if let Some(lib_name) = lib {
357 row.set("lib", Kind::Str(lib_name.to_string()));
358 }
359 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("lib")], vec![row]);
360 self.call("specs", &grid).await
361 }
362
363 /// Get a single Xeto spec by its fully-qualified name.
364 ///
365 /// `qname` is the qualified spec name (e.g. `"ph::Site"`). Returns a
366 /// single-row grid with the spec definition.
367 pub async fn spec(&self, qname: &str) -> Result<HGrid, ClientError> {
368 let mut row = HDict::new();
369 row.set("qname", Kind::Str(qname.to_string()));
370 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("qname")], vec![row]);
371 self.call("spec", &grid).await
372 }
373
374 /// Load a Xeto library from source text into the server's ontology.
375 ///
376 /// `name` is the library name and `source` is the raw Xeto source code.
377 /// Returns an error grid if the source fails to parse or validate.
378 pub async fn load_lib(&self, name: &str, source: &str) -> Result<HGrid, ClientError> {
379 let mut row = HDict::new();
380 row.set("name", Kind::Str(name.to_string()));
381 row.set("source", Kind::Str(source.to_string()));
382 let grid = HGrid::from_parts(
383 HDict::new(),
384 vec![HCol::new("name"), HCol::new("source")],
385 vec![row],
386 );
387 self.call("loadLib", &grid).await
388 }
389
390 /// Unload a previously loaded Xeto library by name.
391 ///
392 /// Removes the library and its specs from the server's active ontology.
393 pub async fn unload_lib(&self, name: &str) -> Result<HGrid, ClientError> {
394 let mut row = HDict::new();
395 row.set("name", Kind::Str(name.to_string()));
396 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
397 self.call("unloadLib", &grid).await
398 }
399
400 /// Export a library as Xeto source text.
401 ///
402 /// Returns a grid whose first row contains the serialized Xeto source
403 /// for the named library.
404 pub async fn export_lib(&self, name: &str) -> Result<HGrid, ClientError> {
405 let mut row = HDict::new();
406 row.set("name", Kind::Str(name.to_string()));
407 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
408 self.call("exportLib", &grid).await
409 }
410
411 /// Validate entity dicts against the server's Xeto ontology.
412 ///
413 /// `entities` is a list of [`HDict`] records to check. Returns a grid
414 /// with validation results — each row reports errors for the corresponding
415 /// input entity. An empty result grid means all entities are valid.
416 pub async fn validate(&self, entities: Vec<HDict>) -> Result<HGrid, ClientError> {
417 // Build column set from all entities
418 let mut col_names: Vec<String> = Vec::new();
419 let mut seen = HashSet::new();
420 for entity in &entities {
421 for name in entity.tag_names() {
422 if seen.insert(name.to_string()) {
423 col_names.push(name.to_string());
424 }
425 }
426 }
427 col_names.sort();
428 let cols: Vec<HCol> = col_names.iter().map(|n| HCol::new(n.as_str())).collect();
429 let grid = HGrid::from_parts(HDict::new(), cols, entities);
430 self.call("validate", &grid).await
431 }
432
433 /// Close the underlying transport connection (HTTP or WebSocket).
434 ///
435 /// Call [`close_session`](Self::close_session) first if you want to
436 /// explicitly end the server-side session before disconnecting.
437 pub async fn close(&self) -> Result<(), ClientError> {
438 self.transport.close().await
439 }
440}