pulse_client/iq.rs
1//! `client.iq()` — B-106 Interactive Queries.
2//!
3//! Query the live state of streaming agents like a database from any async
4//! Rust service. The killer use case is a synchronous decision microservice
5//! (fraud, rate-limit, pricing) calling [`IQResource::get`] on every
6//! request and reading agent state from RAM with zero ingest-to-decision
7//! lag:
8//!
9//! ```no_run
10//! use pulse_client::PulseClient;
11//!
12//! # async fn run() -> Result<(), pulse_client::PulseError> {
13//! let client = PulseClient::builder()
14//! .base_url("http://localhost:9090")
15//! .token("ey...")
16//! .build()?;
17//!
18//! let state = client.iq().get("fraud-detector", "customer-42").await?;
19//! let tx_count = state["value"]["tx_count_60s"].as_u64().unwrap_or(0);
20//! if tx_count > 5 {
21//! // deny payment
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! All methods require the `AGENT_READ` permission (Owner, Platform Admin,
28//! Developer, Auditor personas by default — see B-105).
29//!
30//! Responses are returned as [`serde_json::Value`] so callers can paginate,
31//! inspect `truncated` / `limitApplied` / `totalScanned` metadata, and read
32//! fields without going through a wrapper layer. Strongly-typed structs can
33//! be layered on top in user code if desired; the SDK stays close to the
34//! wire.
35
36use reqwest::Method;
37use serde::Serialize;
38use serde_json::{json, Map, Value};
39
40use crate::client::PulseClient;
41use crate::error::PulseError;
42
43/// `client.iq()` — accessor for Interactive Queries.
44pub struct IQResource<'c> {
45 pub(crate) client: &'c PulseClient,
46}
47
48/// Optional range bounds + page size for [`IQResource::scan`] and
49/// [`IQResource::list_keys`].
50///
51/// Default = no range, limit 100. Limit > 1000 is clamped server-side
52/// (response carries `X-Pulse-Pagination-Clamped: true` header when
53/// clamped — not surfaced in the parsed body).
54#[derive(Debug, Clone, Default)]
55pub struct IQScanOptions {
56 /// Inclusive lower bound on the key range. `None` = beginning.
57 pub start: Option<String>,
58 /// Exclusive upper bound on the key range. `None` = end.
59 pub end: Option<String>,
60 /// Page size. `None` defaults to 100.
61 pub limit: Option<u32>,
62}
63
64impl IQScanOptions {
65 /// Returns default options (no range, limit 100).
66 pub fn new() -> Self {
67 Self::default()
68 }
69
70 /// Sets the inclusive lower bound.
71 pub fn start(mut self, start: impl Into<String>) -> Self {
72 self.start = Some(start.into());
73 self
74 }
75
76 /// Sets the exclusive upper bound.
77 pub fn end(mut self, end: impl Into<String>) -> Self {
78 self.end = Some(end.into());
79 self
80 }
81
82 /// Sets the page size.
83 pub fn limit(mut self, limit: u32) -> Self {
84 self.limit = Some(limit);
85 self
86 }
87}
88
89/// Optional inputs for [`IQResource::query`].
90///
91/// The `filter` is a recursive [`Value`] shaped per the IQFilterExpression
92/// schema: each node MUST carry exactly ONE of `field` (leaf), `and`
93/// (array of sub-expressions, all must match), `or` (array, any must
94/// match), or `not` (single sub-expression). Mixing in a single node
95/// returns HTTP 400.
96///
97/// Use the [`iq_leaf`], [`iq_and`], [`iq_or`], [`iq_not`] free functions
98/// to construct filter trees ergonomically.
99#[derive(Debug, Clone, Default)]
100pub struct IQQueryOptions {
101 pub start: Option<String>,
102 pub end: Option<String>,
103 pub limit: Option<u32>,
104 pub filter: Option<Value>,
105 pub projection: Option<Vec<String>>,
106 /// Field name to group on. `Some` switches the response shape from
107 /// flat `{entries, ...}` to grouped `{groups: [{groupKey, count}], ...}`.
108 /// Use `"$value"` for scalar states.
109 pub group_by: Option<String>,
110}
111
112impl IQQueryOptions {
113 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn start(mut self, s: impl Into<String>) -> Self {
118 self.start = Some(s.into());
119 self
120 }
121
122 pub fn end(mut self, s: impl Into<String>) -> Self {
123 self.end = Some(s.into());
124 self
125 }
126
127 pub fn limit(mut self, n: u32) -> Self {
128 self.limit = Some(n);
129 self
130 }
131
132 pub fn filter(mut self, f: Value) -> Self {
133 self.filter = Some(f);
134 self
135 }
136
137 pub fn projection(mut self, fields: Vec<String>) -> Self {
138 self.projection = Some(fields);
139 self
140 }
141
142 pub fn group_by(mut self, field: impl Into<String>) -> Self {
143 self.group_by = Some(field.into());
144 self
145 }
146}
147
148/// Builds a leaf filter node: `{"field": ..., "op": ..., "value": ...}`.
149/// Pass an empty `op` to omit it (e.g. for an `exists`-style test where
150/// the field's mere presence suffices).
151pub fn iq_leaf(field: &str, op: &str, value: impl Serialize) -> Value {
152 let mut m = Map::new();
153 m.insert("field".into(), Value::String(field.into()));
154 if !op.is_empty() {
155 m.insert("op".into(), Value::String(op.into()));
156 }
157 m.insert(
158 "value".into(),
159 serde_json::to_value(value).unwrap_or(Value::Null),
160 );
161 Value::Object(m)
162}
163
164/// Builds an AND filter combining all children.
165pub fn iq_and(children: Vec<Value>) -> Value {
166 json!({ "and": children })
167}
168
169/// Builds an OR filter combining all children.
170pub fn iq_or(children: Vec<Value>) -> Value {
171 json!({ "or": children })
172}
173
174/// Builds a NOT filter negating its child.
175pub fn iq_not(child: Value) -> Value {
176 json!({ "not": child })
177}
178
179impl<'c> IQResource<'c> {
180 /// `GET /api/pulse/iq/agents/{id}/state` — headline state summary.
181 ///
182 /// Returns the IQSummary [`Value`] — always carries `agentId`,
183 /// `queryable`, `backend`, `hotSize`, `hotBytes`, `coldSize`,
184 /// `coldBytes`, `lastCheckpointId`, `totalSize`. When the agent has
185 /// no live streaming backend: `queryable=false`, `backend="none"`,
186 /// numerics 0, `lastCheckpointId=-1`.
187 pub async fn summary(self, agent_id: &str) -> Result<Value, PulseError> {
188 let path = format!("/api/pulse/iq/agents/{}/state", encode_segment(agent_id));
189 self.client
190 .request(Method::GET, &path, None::<&()>, true)
191 .await
192 }
193
194 /// `GET /api/pulse/iq/agents/{id}/state/value/{key}` — point lookup.
195 ///
196 /// Returns the IQValue [`Value`] (`agentId`, `key`, `value` — `value`
197 /// can be any JSON type including `null`).
198 ///
199 /// # Errors
200 /// Returns [`PulseError::NotFound`] when the key is absent OR the
201 /// agent is not queryable. Inspect the variant's `body` field:
202 /// `error == "Key not found"` vs `error == "Agent has no queryable
203 /// state"` (with `reason` field) — to distinguish.
204 pub async fn get(self, agent_id: &str, key: &str) -> Result<Value, PulseError> {
205 self.get_as_of_inner(agent_id, key, None).await
206 }
207
208 /// `GET /api/pulse/iq/agents/{id}/state/value/{key}?as_of=<spec>` —
209 /// B-113 time-travel point lookup.
210 ///
211 /// Reads the value as it was at a past instant instead of the live value.
212 /// `as_of` accepts `now`, a relative offset (`-1h`, `-30m`, `-7d`), an
213 /// ISO-8601 instant, or epoch millis — passed through to the server
214 /// verbatim. The response then also carries `asOf` (resolved epoch ms)
215 /// alongside the usual `agentId`, `key`, `value`:
216 ///
217 /// ```no_run
218 /// # use pulse_client::PulseClient;
219 /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
220 /// let state = client.iq().get_as_of("user-sessions", "u42", "-1h").await?;
221 /// # Ok(())
222 /// # }
223 /// ```
224 ///
225 /// # Errors
226 /// Same as [`get`](Self::get) — [`PulseError::NotFound`] when the key is
227 /// absent (at that instant) OR the agent is not queryable.
228 pub async fn get_as_of(
229 self,
230 agent_id: &str,
231 key: &str,
232 as_of: &str,
233 ) -> Result<Value, PulseError> {
234 self.get_as_of_inner(agent_id, key, Some(as_of)).await
235 }
236
237 async fn get_as_of_inner(
238 self,
239 agent_id: &str,
240 key: &str,
241 as_of: Option<&str>,
242 ) -> Result<Value, PulseError> {
243 let mut path = format!(
244 "/api/pulse/iq/agents/{}/state/value/{}",
245 encode_segment(agent_id),
246 encode_segment(key),
247 );
248 if let Some(spec) = as_of {
249 path.push_str("?as_of=");
250 path.push_str(&encode_segment(spec));
251 }
252 self.client
253 .request(Method::GET, &path, None::<&()>, true)
254 .await
255 }
256
257 /// `GET /api/pulse/iq/agents/{id}/state/diff/{key}?from=&to=` — B-113
258 /// field-level state diff.
259 ///
260 /// Returns the delta of `key`'s state between two instants. `from` and
261 /// `to` accept the same specs as [`get_as_of`](Self::get_as_of); they
262 /// default server-side to `-1h` / `now` when blank, but this method
263 /// always sends them explicitly. The response carries
264 /// `{agentId, key, fromTs, toTs, changes}` where `changes` maps each
265 /// changed field to `{delta?, from, to}` (`delta` present for numeric
266 /// fields), or `{added}` / `{removed}`:
267 ///
268 /// ```no_run
269 /// # use pulse_client::PulseClient;
270 /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
271 /// let d = client.iq().diff("user-sessions", "u42", "-1h", "now").await?;
272 /// let delta = d["changes"]["cart_value"]["delta"].as_f64();
273 /// # Ok(())
274 /// # }
275 /// ```
276 pub async fn diff(
277 self,
278 agent_id: &str,
279 key: &str,
280 from: &str,
281 to: &str,
282 ) -> Result<Value, PulseError> {
283 let path = format!(
284 "/api/pulse/iq/agents/{}/state/diff/{}?from={}&to={}",
285 encode_segment(agent_id),
286 encode_segment(key),
287 encode_segment(from),
288 encode_segment(to),
289 );
290 self.client
291 .request(Method::GET, &path, None::<&()>, true)
292 .await
293 }
294
295 /// `GET /api/pulse/iq/agents/{id}/state/scan` — paginated range scan.
296 ///
297 /// Inspect `truncated` to decide if more data exists; paginate by
298 /// setting `opts.start` on the next call to the last returned key
299 /// plus a sentinel suffix.
300 pub async fn scan(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
301 let path = format!(
302 "/api/pulse/iq/agents/{}/state/scan{}",
303 encode_segment(agent_id),
304 scan_query(&opts),
305 );
306 self.client
307 .request(Method::GET, &path, None::<&()>, true)
308 .await
309 }
310
311 /// `GET /api/pulse/iq/agents/{id}/state/keys` — keys-only range scan.
312 ///
313 /// Same shape as [`scan`](Self::scan) minus the values; `keys` field
314 /// is a JSON array of strings.
315 pub async fn list_keys(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
316 let path = format!(
317 "/api/pulse/iq/agents/{}/state/keys{}",
318 encode_segment(agent_id),
319 scan_query(&opts),
320 );
321 self.client
322 .request(Method::GET, &path, None::<&()>, true)
323 .await
324 }
325
326 /// `POST /api/pulse/iq/agents/{id}/state/query` — filtered / projected
327 /// / grouped query.
328 ///
329 /// When `opts.group_by` is set, the response shape is
330 /// `{groups: [{groupKey, count}], groupCount, ...}` instead of
331 /// `{entries: [...], count, ...}`.
332 ///
333 /// # Errors
334 /// - [`PulseError::Validation`] on invalid filter syntax (HTTP 400)
335 /// - [`PulseError::NotFound`] when the agent is not queryable
336 pub async fn query(self, agent_id: &str, opts: IQQueryOptions) -> Result<Value, PulseError> {
337 let path = format!(
338 "/api/pulse/iq/agents/{}/state/query",
339 encode_segment(agent_id),
340 );
341 let body = build_query_body(opts);
342 // Empty body → send None so the server defaults to a full scan
343 // (matches the in-tree handler's behaviour on missing body).
344 if body.is_object() && body.as_object().is_some_and(|m| m.is_empty()) {
345 self.client
346 .request::<()>(Method::POST, &path, None, true)
347 .await
348 } else {
349 self.client
350 .request(Method::POST, &path, Some(&body), true)
351 .await
352 }
353 }
354}
355
356/// Builds the `?limit=N&start=...&end=...` query suffix.
357///
358/// `limit` is always sent (defaulting to 100 when `opts.limit` is `None`)
359/// so the server gets a deterministic value. Missing `start`/`end` are
360/// omitted so the URL stays clean.
361fn scan_query(opts: &IQScanOptions) -> String {
362 let limit = opts.limit.unwrap_or(100);
363 let mut q = format!("?limit={limit}");
364 if let Some(start) = &opts.start {
365 q.push_str("&start=");
366 q.push_str(&encode_segment(start));
367 }
368 if let Some(end) = &opts.end {
369 q.push_str("&end=");
370 q.push_str(&encode_segment(end));
371 }
372 q
373}
374
375/// Flattens [`IQQueryOptions`] into the JSON body the server expects.
376/// Only includes fields the caller actually set so the wire payload is
377/// stable + diff-friendly.
378fn build_query_body(opts: IQQueryOptions) -> Value {
379 let mut m = Map::new();
380 if let Some(s) = opts.start {
381 m.insert("start".into(), Value::String(s));
382 }
383 if let Some(e) = opts.end {
384 m.insert("end".into(), Value::String(e));
385 }
386 if let Some(l) = opts.limit {
387 m.insert("limit".into(), Value::Number(l.into()));
388 }
389 if let Some(f) = opts.filter {
390 m.insert("filter".into(), f);
391 }
392 if let Some(p) = opts.projection {
393 m.insert(
394 "projection".into(),
395 Value::Array(p.into_iter().map(Value::String).collect()),
396 );
397 }
398 if let Some(g) = opts.group_by {
399 m.insert("groupBy".into(), Value::String(g));
400 }
401 Value::Object(m)
402}
403
404/// Percent-encodes a path segment aggressively — same semantics as
405/// Python's `urllib.quote(safe='')`, Java's `URLEncoder.encode` followed
406/// by `'+'`→`'%20'`, JS's `encodeURIComponent`, and Go's QueryEscape +
407/// `'+'`→`'%20'`. Keeps the wire format identical across all 5 Pulse
408/// SDKs so a key like `"user:123/orders"` produces the same URL bytes
409/// regardless of caller language.
410fn encode_segment(s: &str) -> String {
411 let mut out = String::with_capacity(s.len());
412 for &b in s.as_bytes() {
413 match b {
414 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
415 out.push(b as char);
416 }
417 _ => {
418 out.push('%');
419 out.push(HEX[(b >> 4) as usize] as char);
420 out.push(HEX[(b & 0xF) as usize] as char);
421 }
422 }
423 }
424 out
425}
426
427const HEX: &[u8; 16] = b"0123456789ABCDEF";
428
429impl std::fmt::Debug for IQResource<'_> {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 f.debug_struct("IQResource").finish()
432 }
433}