haystack_client/client.rs
1use std::collections::HashSet;
2
3use crate::error::ClientError;
4use crate::transport::Transport;
5use crate::transport::http::HttpTransport;
6use crate::transport::ws::WsTransport;
7use haystack_core::data::{HCol, HDict, HGrid};
8use haystack_core::kinds::{HRef, Kind, Number};
9
10/// A client for communicating with a Haystack HTTP API server.
11///
12/// Provides typed methods for all standard Haystack ops (about, read, hisRead,
13/// etc.) as well as a generic `call` method for custom ops.
14///
15/// Supports both HTTP and WebSocket transports.
16pub struct HaystackClient<T: Transport> {
17 transport: T,
18}
19
20impl HaystackClient<HttpTransport> {
21 /// Connect to a Haystack server via HTTP, performing SCRAM authentication.
22 ///
23 /// # Arguments
24 /// * `url` - The server API root (e.g. `http://localhost:8080/api`)
25 /// * `username` - The username to authenticate as
26 /// * `password` - The user's plaintext password
27 pub async fn connect(url: &str, username: &str, password: &str) -> Result<Self, ClientError> {
28 let client = reqwest::Client::new();
29 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
30 let transport = HttpTransport::new(url, auth_token);
31 Ok(Self { transport })
32 }
33
34 /// Connect to a Haystack server via HTTP with mutual TLS (mTLS) client
35 /// certificate authentication, then perform SCRAM authentication.
36 ///
37 /// Builds a custom `reqwest::Client` configured with the provided TLS
38 /// identity (client certificate + key) and optional CA certificate, then
39 /// runs the standard SCRAM handshake over that client.
40 ///
41 /// # Arguments
42 /// * `url` - The server API root (e.g. `https://localhost:8443/api`)
43 /// * `username` - The username to authenticate as
44 /// * `password` - The user's plaintext password
45 /// * `tls` - The mTLS configuration (cert, key, optional CA)
46 pub async fn connect_with_tls(
47 url: &str,
48 username: &str,
49 password: &str,
50 tls: &crate::tls::TlsConfig,
51 ) -> Result<Self, ClientError> {
52 // Combine cert + key into a single PEM buffer for reqwest::Identity
53 let mut combined_pem = tls.client_cert_pem.clone();
54 combined_pem.extend_from_slice(&tls.client_key_pem);
55
56 let identity = reqwest::Identity::from_pem(&combined_pem)
57 .map_err(|e| ClientError::Connection(format!("invalid client certificate: {e}")))?;
58
59 let mut builder = reqwest::Client::builder().identity(identity);
60
61 if let Some(ref ca) = tls.ca_cert_pem {
62 let cert = reqwest::Certificate::from_pem(ca)
63 .map_err(|e| ClientError::Connection(format!("invalid CA certificate: {e}")))?;
64 builder = builder.add_root_certificate(cert);
65 }
66
67 let client = builder
68 .build()
69 .map_err(|e| ClientError::Connection(format!("TLS client build failed: {e}")))?;
70
71 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
72 let transport = HttpTransport::new(url, auth_token);
73 Ok(Self { transport })
74 }
75}
76
77impl HaystackClient<WsTransport> {
78 /// Connect to a Haystack server via WebSocket.
79 ///
80 /// Performs SCRAM authentication over HTTP first to obtain an auth token,
81 /// then establishes a WebSocket connection using that token.
82 ///
83 /// # Arguments
84 /// * `url` - The server API root for HTTP auth (e.g. `http://localhost:8080/api`)
85 /// * `ws_url` - The WebSocket URL (e.g. `ws://localhost:8080/api/ws`)
86 /// * `username` - The username to authenticate as
87 /// * `password` - The user's plaintext password
88 pub async fn connect_ws(
89 url: &str,
90 ws_url: &str,
91 username: &str,
92 password: &str,
93 ) -> Result<Self, ClientError> {
94 // Authenticate via HTTP first to get the token
95 let client = reqwest::Client::new();
96 let auth_token = crate::auth::authenticate(&client, url, username, password).await?;
97
98 // Connect WebSocket with the token
99 let transport = WsTransport::connect(ws_url, &auth_token).await?;
100 Ok(Self { transport })
101 }
102}
103
104impl<T: Transport> HaystackClient<T> {
105 /// Create a client with an already-configured transport.
106 ///
107 /// Useful for testing or when you have a custom [`Transport`] implementation.
108 pub fn from_transport(transport: T) -> Self {
109 Self { transport }
110 }
111
112 /// Call a raw Haystack op with a request grid.
113 ///
114 /// `op` is the operation name (e.g. `"read"`, `"hisRead"`). The server
115 /// returns an error grid (as [`ClientError::Grid`]) if the op fails.
116 pub async fn call(&self, op: &str, req: &HGrid) -> Result<HGrid, ClientError> {
117 self.transport.call(op, req).await
118 }
119
120 // -----------------------------------------------------------------------
121 // Standard ops
122 // -----------------------------------------------------------------------
123
124 /// Call the `about` op. Returns a single-row grid with server metadata
125 /// including `vendorName`, `productName`, `productVersion`, and `tz`.
126 pub async fn about(&self) -> Result<HGrid, ClientError> {
127 self.call("about", &HGrid::new()).await
128 }
129
130 /// Call the `ops` op. Returns a grid listing every operation the server
131 /// supports, with `name`, `summary`, and `doc` columns.
132 pub async fn ops(&self) -> Result<HGrid, ClientError> {
133 self.call("ops", &HGrid::new()).await
134 }
135
136 /// Call the `formats` op. Returns a grid of MIME types the server can
137 /// read/write, with `mime`, `receive`, and `send` columns.
138 pub async fn formats(&self) -> Result<HGrid, ClientError> {
139 self.call("formats", &HGrid::new()).await
140 }
141
142 /// Call the `libs` op. Returns a grid of Xeto library modules installed
143 /// on the server, including `name` and `version` for each library.
144 pub async fn libs(&self) -> Result<HGrid, ClientError> {
145 self.call("libs", &HGrid::new()).await
146 }
147
148 /// Call the `read` op with a Haystack filter expression and optional limit.
149 ///
150 /// `filter` is a Haystack filter string (e.g. `"site"`, `"equip and siteRef==@s"`,
151 /// `"temp > 72"`). Returns a grid of matching entity dicts. Pass `limit` to
152 /// cap the number of results. Returns an empty grid if nothing matches.
153 pub async fn read(&self, filter: &str, limit: Option<usize>) -> Result<HGrid, ClientError> {
154 let mut row = HDict::new();
155 row.set("filter", Kind::Str(filter.to_string()));
156 if let Some(lim) = limit {
157 row.set("limit", Kind::Number(Number::unitless(lim as f64)));
158 }
159 let cols = if limit.is_some() {
160 vec![HCol::new("filter"), HCol::new("limit")]
161 } else {
162 vec![HCol::new("filter")]
163 };
164 let grid = HGrid::from_parts(HDict::new(), cols, vec![row]);
165 self.call("read", &grid).await
166 }
167
168 /// Call the `read` op with a list of entity ref ids.
169 ///
170 /// Each entry in `ids` is a ref value string (e.g. `"@site-1"`, `"@equip-2"`).
171 /// Returns one row per id. Rows for unknown ids contain only a `Null` marker.
172 pub async fn read_by_ids(&self, ids: &[&str]) -> Result<HGrid, ClientError> {
173 let rows: Vec<HDict> = ids
174 .iter()
175 .map(|id| {
176 let mut d = HDict::new();
177 d.set("id", Kind::Ref(HRef::from_val(*id)));
178 d
179 })
180 .collect();
181 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("id")], rows);
182 self.call("read", &grid).await
183 }
184
185 /// Call the `nav` op to walk the site/equip/point navigation tree.
186 ///
187 /// Pass `None` to get root-level nodes, or `Some(nav_id)` to expand a
188 /// specific node. Returns a grid of child navigation entries.
189 pub async fn nav(&self, nav_id: Option<&str>) -> Result<HGrid, ClientError> {
190 let mut row = HDict::new();
191 if let Some(id) = nav_id {
192 row.set("navId", Kind::Str(id.to_string()));
193 }
194 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("navId")], vec![row]);
195 self.call("nav", &grid).await
196 }
197
198 /// Call the `defs` op to query the server's tag/def ontology.
199 ///
200 /// `filter` is an optional Haystack filter (e.g. `"point"`, `"equip"`) to
201 /// narrow results. Returns a grid of matching def dicts.
202 pub async fn defs(&self, filter: Option<&str>) -> Result<HGrid, ClientError> {
203 let mut row = HDict::new();
204 if let Some(f) = filter {
205 row.set("filter", Kind::Str(f.to_string()));
206 }
207 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("filter")], vec![row]);
208 self.call("defs", &grid).await
209 }
210
211 /// Call the `watchSub` op to subscribe to real-time changes for a set of entities.
212 ///
213 /// `ids` are ref value strings (e.g. `["@point-1", "@point-2"]`). `lease` is
214 /// an optional duration string (e.g. `"1min"`, `"30sec"`, `"1hr"`) controlling
215 /// how long the server keeps the watch alive without polls. Returns a grid
216 /// whose meta contains the assigned `watchId`.
217 pub async fn watch_sub(&self, ids: &[&str], lease: Option<&str>) -> Result<HGrid, ClientError> {
218 let rows: Vec<HDict> = ids
219 .iter()
220 .map(|id| {
221 let mut d = HDict::new();
222 d.set("id", Kind::Ref(HRef::from_val(*id)));
223 d
224 })
225 .collect();
226 let mut meta = HDict::new();
227 if let Some(l) = lease {
228 meta.set("lease", Kind::Str(l.to_string()));
229 }
230 let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
231 self.call("watchSub", &grid).await
232 }
233
234 /// Call the `watchPoll` op to poll a watch for changed entity values.
235 ///
236 /// `watch_id` is the identifier returned by [`watch_sub`](Self::watch_sub).
237 /// Returns a grid of entities whose `curVal` has changed since the last poll.
238 pub async fn watch_poll(&self, watch_id: &str) -> Result<HGrid, ClientError> {
239 let mut meta = HDict::new();
240 meta.set("watchId", Kind::Str(watch_id.to_string()));
241 let grid = HGrid::from_parts(meta, vec![], vec![]);
242 self.call("watchPoll", &grid).await
243 }
244
245 /// Call the `watchUnsub` op to remove entities from an active watch.
246 ///
247 /// `watch_id` identifies the watch. `ids` are the ref value strings to
248 /// unsubscribe (e.g. `["@point-1"]`). The watch is closed automatically
249 /// when all entities have been removed.
250 pub async fn watch_unsub(&self, watch_id: &str, ids: &[&str]) -> Result<HGrid, ClientError> {
251 let rows: Vec<HDict> = ids
252 .iter()
253 .map(|id| {
254 let mut d = HDict::new();
255 d.set("id", Kind::Ref(HRef::from_val(*id)));
256 d
257 })
258 .collect();
259 let mut meta = HDict::new();
260 meta.set("watchId", Kind::Str(watch_id.to_string()));
261 let grid = HGrid::from_parts(meta, vec![HCol::new("id")], rows);
262 self.call("watchUnsub", &grid).await
263 }
264
265 /// Call the `pointWrite` op to write a value to a writable point.
266 ///
267 /// `id` is the point ref (e.g. `"@point-1"`). `level` is the priority
268 /// array level 1–17 per the Haystack spec (1=emergency, 8=manual,
269 /// 16=default, 17=fallback). `val` is the value to write at that level.
270 pub async fn point_write(&self, id: &str, level: u8, val: Kind) -> Result<HGrid, ClientError> {
271 let mut row = HDict::new();
272 row.set("id", Kind::Ref(HRef::from_val(id)));
273 row.set("level", Kind::Number(Number::unitless(level as f64)));
274 row.set("val", val);
275 let grid = HGrid::from_parts(
276 HDict::new(),
277 vec![HCol::new("id"), HCol::new("level"), HCol::new("val")],
278 vec![row],
279 );
280 self.call("pointWrite", &grid).await
281 }
282
283 /// Call the `hisRead` op to read time-series history for a point.
284 ///
285 /// `id` is the point ref (e.g. `"@sensor-1"`). `range` is a Haystack range
286 /// string: `"today"`, `"yesterday"`, a single date like `"2024-01-01"`, a
287 /// date span `"2024-01-01,2024-01-31"`, or a datetime with timezone like
288 /// `"2024-01-01T00:00:00-05:00 New_York"`. Returns a grid of `ts`/`val` rows.
289 pub async fn his_read(&self, id: &str, range: &str) -> Result<HGrid, ClientError> {
290 let mut row = HDict::new();
291 row.set("id", Kind::Ref(HRef::from_val(id)));
292 row.set("range", Kind::Str(range.to_string()));
293 let grid = HGrid::from_parts(
294 HDict::new(),
295 vec![HCol::new("id"), HCol::new("range")],
296 vec![row],
297 );
298 self.call("hisRead", &grid).await
299 }
300
301 /// Call the `hisWrite` op to write time-series samples for a point.
302 ///
303 /// `id` is the point ref. `items` must be dicts each containing a `ts`
304 /// (DateTime) and `val` tag. Returns an empty grid on success or an
305 /// error grid if the write is rejected.
306 pub async fn his_write(&self, id: &str, items: Vec<HDict>) -> Result<HGrid, ClientError> {
307 let mut meta = HDict::new();
308 meta.set("id", Kind::Ref(HRef::from_val(id)));
309 let grid = HGrid::from_parts(meta, vec![HCol::new("ts"), HCol::new("val")], items);
310 self.call("hisWrite", &grid).await
311 }
312
313 /// Call the `invokeAction` op to invoke a named action on an entity.
314 ///
315 /// `id` is the target entity ref. `action` is the action name. `args` is
316 /// an [`HDict`] of additional parameters for the action. Returns the
317 /// action's result grid.
318 pub async fn invoke_action(
319 &self,
320 id: &str,
321 action: &str,
322 args: HDict,
323 ) -> Result<HGrid, ClientError> {
324 let mut row = args;
325 row.set("id", Kind::Ref(HRef::from_val(id)));
326 row.set("action", Kind::Str(action.to_string()));
327 let grid = HGrid::from_parts(
328 HDict::new(),
329 vec![HCol::new("id"), HCol::new("action")],
330 vec![row],
331 );
332 self.call("invokeAction", &grid).await
333 }
334
335 /// Call the `close` op to close the current server session.
336 ///
337 /// This is distinct from [`close`](Self::close) which shuts down the
338 /// underlying transport connection.
339 pub async fn close_session(&self) -> Result<HGrid, ClientError> {
340 self.call("close", &HGrid::new()).await
341 }
342
343 // -----------------------------------------------------------------------
344 // Library & spec management ops
345 // -----------------------------------------------------------------------
346
347 /// List all Xeto specs on the server, optionally filtered by library name.
348 ///
349 /// Pass `None` to list specs across all libraries, or `Some("lib_name")`
350 /// to restrict to a single library. Returns a grid of spec dicts.
351 pub async fn specs(&self, lib: Option<&str>) -> Result<HGrid, ClientError> {
352 let mut row = HDict::new();
353 if let Some(lib_name) = lib {
354 row.set("lib", Kind::Str(lib_name.to_string()));
355 }
356 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("lib")], vec![row]);
357 self.call("specs", &grid).await
358 }
359
360 /// Get a single Xeto spec by its fully-qualified name.
361 ///
362 /// `qname` is the qualified spec name (e.g. `"ph::Site"`). Returns a
363 /// single-row grid with the spec definition.
364 pub async fn spec(&self, qname: &str) -> Result<HGrid, ClientError> {
365 let mut row = HDict::new();
366 row.set("qname", Kind::Str(qname.to_string()));
367 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("qname")], vec![row]);
368 self.call("spec", &grid).await
369 }
370
371 /// Load a Xeto library from source text into the server's ontology.
372 ///
373 /// `name` is the library name and `source` is the raw Xeto source code.
374 /// Returns an error grid if the source fails to parse or validate.
375 pub async fn load_lib(&self, name: &str, source: &str) -> Result<HGrid, ClientError> {
376 let mut row = HDict::new();
377 row.set("name", Kind::Str(name.to_string()));
378 row.set("source", Kind::Str(source.to_string()));
379 let grid = HGrid::from_parts(
380 HDict::new(),
381 vec![HCol::new("name"), HCol::new("source")],
382 vec![row],
383 );
384 self.call("loadLib", &grid).await
385 }
386
387 /// Unload a previously loaded Xeto library by name.
388 ///
389 /// Removes the library and its specs from the server's active ontology.
390 pub async fn unload_lib(&self, name: &str) -> Result<HGrid, ClientError> {
391 let mut row = HDict::new();
392 row.set("name", Kind::Str(name.to_string()));
393 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
394 self.call("unloadLib", &grid).await
395 }
396
397 /// Export a library as Xeto source text.
398 ///
399 /// Returns a grid whose first row contains the serialized Xeto source
400 /// for the named library.
401 pub async fn export_lib(&self, name: &str) -> Result<HGrid, ClientError> {
402 let mut row = HDict::new();
403 row.set("name", Kind::Str(name.to_string()));
404 let grid = HGrid::from_parts(HDict::new(), vec![HCol::new("name")], vec![row]);
405 self.call("exportLib", &grid).await
406 }
407
408 /// Validate entity dicts against the server's Xeto ontology.
409 ///
410 /// `entities` is a list of [`HDict`] records to check. Returns a grid
411 /// with validation results — each row reports errors for the corresponding
412 /// input entity. An empty result grid means all entities are valid.
413 pub async fn validate(&self, entities: Vec<HDict>) -> Result<HGrid, ClientError> {
414 // Build column set from all entities
415 let mut col_names: Vec<String> = Vec::new();
416 let mut seen = HashSet::new();
417 for entity in &entities {
418 for name in entity.tag_names() {
419 if seen.insert(name.to_string()) {
420 col_names.push(name.to_string());
421 }
422 }
423 }
424 col_names.sort();
425 let cols: Vec<HCol> = col_names.iter().map(|n| HCol::new(n.as_str())).collect();
426 let grid = HGrid::from_parts(HDict::new(), cols, entities);
427 self.call("validate", &grid).await
428 }
429
430 /// Close the underlying transport connection (HTTP or WebSocket).
431 ///
432 /// Call [`close_session`](Self::close_session) first if you want to
433 /// explicitly end the server-side session before disconnecting.
434 pub async fn close(&self) -> Result<(), ClientError> {
435 self.transport.close().await
436 }
437}