Skip to main content

pulse_client/
iq.rs

1//! `client.iq()` — B-106 Interactive Queries.
2//!
3//! Query the live state of streaming agents like a database from any async
4//! Rust service. The killer use case is a synchronous decision microservice
5//! (fraud, rate-limit, pricing) calling [`IQResource::get`] on every
6//! request and reading agent state from RAM with zero ingest-to-decision
7//! lag:
8//!
9//! ```no_run
10//! use pulse_client::PulseClient;
11//!
12//! # async fn run() -> Result<(), pulse_client::PulseError> {
13//! let client = PulseClient::builder()
14//!     .base_url("http://localhost:9090")
15//!     .token("ey...")
16//!     .build()?;
17//!
18//! let state = client.iq().get("fraud-detector", "customer-42").await?;
19//! let tx_count = state["value"]["tx_count_60s"].as_u64().unwrap_or(0);
20//! if tx_count > 5 {
21//!     // deny payment
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! All methods require the `AGENT_READ` permission (Owner, Platform Admin,
28//! Developer, Auditor personas by default — see B-105).
29//!
30//! Responses are returned as [`serde_json::Value`] so callers can paginate,
31//! inspect `truncated` / `limitApplied` / `totalScanned` metadata, and read
32//! fields without going through a wrapper layer. Strongly-typed structs can
33//! be layered on top in user code if desired; the SDK stays close to the
34//! wire.
35
36use reqwest::Method;
37use serde::Serialize;
38use serde_json::{json, Map, Value};
39
40use crate::client::PulseClient;
41use crate::error::PulseError;
42
43/// `client.iq()` — accessor for Interactive Queries.
44pub struct IQResource<'c> {
45    pub(crate) client: &'c PulseClient,
46}
47
48/// Optional range bounds + page size for [`IQResource::scan`] and
49/// [`IQResource::list_keys`].
50///
51/// Default = no range, limit 100. Limit > 1000 is clamped server-side
52/// (response carries `X-Pulse-Pagination-Clamped: true` header when
53/// clamped — not surfaced in the parsed body).
54#[derive(Debug, Clone, Default)]
55pub struct IQScanOptions {
56    /// Inclusive lower bound on the key range. `None` = beginning.
57    pub start: Option<String>,
58    /// Exclusive upper bound on the key range. `None` = end.
59    pub end: Option<String>,
60    /// Page size. `None` defaults to 100.
61    pub limit: Option<u32>,
62}
63
64impl IQScanOptions {
65    /// Returns default options (no range, limit 100).
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Sets the inclusive lower bound.
71    pub fn start(mut self, start: impl Into<String>) -> Self {
72        self.start = Some(start.into());
73        self
74    }
75
76    /// Sets the exclusive upper bound.
77    pub fn end(mut self, end: impl Into<String>) -> Self {
78        self.end = Some(end.into());
79        self
80    }
81
82    /// Sets the page size.
83    pub fn limit(mut self, limit: u32) -> Self {
84        self.limit = Some(limit);
85        self
86    }
87}
88
89/// Optional inputs for [`IQResource::query`].
90///
91/// The `filter` is a recursive [`Value`] shaped per the IQFilterExpression
92/// schema: each node MUST carry exactly ONE of `field` (leaf), `and`
93/// (array of sub-expressions, all must match), `or` (array, any must
94/// match), or `not` (single sub-expression). Mixing in a single node
95/// returns HTTP 400.
96///
97/// Use the [`iq_leaf`], [`iq_and`], [`iq_or`], [`iq_not`] free functions
98/// to construct filter trees ergonomically.
99#[derive(Debug, Clone, Default)]
100pub struct IQQueryOptions {
101    pub start: Option<String>,
102    pub end: Option<String>,
103    pub limit: Option<u32>,
104    pub filter: Option<Value>,
105    pub projection: Option<Vec<String>>,
106    /// Field name to group on. `Some` switches the response shape from
107    /// flat `{entries, ...}` to grouped `{groups: [{groupKey, count}], ...}`.
108    /// Use `"$value"` for scalar states.
109    pub group_by: Option<String>,
110}
111
112impl IQQueryOptions {
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    pub fn start(mut self, s: impl Into<String>) -> Self {
118        self.start = Some(s.into());
119        self
120    }
121
122    pub fn end(mut self, s: impl Into<String>) -> Self {
123        self.end = Some(s.into());
124        self
125    }
126
127    pub fn limit(mut self, n: u32) -> Self {
128        self.limit = Some(n);
129        self
130    }
131
132    pub fn filter(mut self, f: Value) -> Self {
133        self.filter = Some(f);
134        self
135    }
136
137    pub fn projection(mut self, fields: Vec<String>) -> Self {
138        self.projection = Some(fields);
139        self
140    }
141
142    pub fn group_by(mut self, field: impl Into<String>) -> Self {
143        self.group_by = Some(field.into());
144        self
145    }
146}
147
148/// Builds a leaf filter node: `{"field": ..., "op": ..., "value": ...}`.
149/// Pass an empty `op` to omit it (e.g. for an `exists`-style test where
150/// the field's mere presence suffices).
151pub fn iq_leaf(field: &str, op: &str, value: impl Serialize) -> Value {
152    let mut m = Map::new();
153    m.insert("field".into(), Value::String(field.into()));
154    if !op.is_empty() {
155        m.insert("op".into(), Value::String(op.into()));
156    }
157    m.insert(
158        "value".into(),
159        serde_json::to_value(value).unwrap_or(Value::Null),
160    );
161    Value::Object(m)
162}
163
164/// Builds an AND filter combining all children.
165pub fn iq_and(children: Vec<Value>) -> Value {
166    json!({ "and": children })
167}
168
169/// Builds an OR filter combining all children.
170pub fn iq_or(children: Vec<Value>) -> Value {
171    json!({ "or": children })
172}
173
174/// Builds a NOT filter negating its child.
175pub fn iq_not(child: Value) -> Value {
176    json!({ "not": child })
177}
178
179impl<'c> IQResource<'c> {
180    /// `GET /api/pulse/iq/agents/{id}/state` — headline state summary.
181    ///
182    /// Returns the IQSummary [`Value`] — always carries `agentId`,
183    /// `queryable`, `backend`, `hotSize`, `hotBytes`, `coldSize`,
184    /// `coldBytes`, `lastCheckpointId`, `totalSize`. When the agent has
185    /// no live streaming backend: `queryable=false`, `backend="none"`,
186    /// numerics 0, `lastCheckpointId=-1`.
187    pub async fn summary(self, agent_id: &str) -> Result<Value, PulseError> {
188        let path = format!("/api/pulse/iq/agents/{}/state", encode_segment(agent_id));
189        self.client
190            .request(Method::GET, &path, None::<&()>, true)
191            .await
192    }
193
194    /// `GET /api/pulse/iq/agents/{id}/state/value/{key}` — point lookup.
195    ///
196    /// Returns the IQValue [`Value`] (`agentId`, `key`, `value` — `value`
197    /// can be any JSON type including `null`).
198    ///
199    /// # Errors
200    /// Returns [`PulseError::NotFound`] when the key is absent OR the
201    /// agent is not queryable. Inspect the variant's `body` field:
202    /// `error == "Key not found"` vs `error == "Agent has no queryable
203    /// state"` (with `reason` field) — to distinguish.
204    pub async fn get(self, agent_id: &str, key: &str) -> Result<Value, PulseError> {
205        self.get_as_of_inner(agent_id, key, None).await
206    }
207
208    /// `GET /api/pulse/iq/agents/{id}/state/value/{key}?as_of=<spec>` —
209    /// B-113 time-travel point lookup.
210    ///
211    /// Reads the value as it was at a past instant instead of the live value.
212    /// `as_of` accepts `now`, a relative offset (`-1h`, `-30m`, `-7d`), an
213    /// ISO-8601 instant, or epoch millis — passed through to the server
214    /// verbatim. The response then also carries `asOf` (resolved epoch ms)
215    /// alongside the usual `agentId`, `key`, `value`:
216    ///
217    /// ```no_run
218    /// # use pulse_client::PulseClient;
219    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
220    /// let state = client.iq().get_as_of("user-sessions", "u42", "-1h").await?;
221    /// # Ok(())
222    /// # }
223    /// ```
224    ///
225    /// # Errors
226    /// Same as [`get`](Self::get) — [`PulseError::NotFound`] when the key is
227    /// absent (at that instant) OR the agent is not queryable.
228    pub async fn get_as_of(
229        self,
230        agent_id: &str,
231        key: &str,
232        as_of: &str,
233    ) -> Result<Value, PulseError> {
234        self.get_as_of_inner(agent_id, key, Some(as_of)).await
235    }
236
237    async fn get_as_of_inner(
238        self,
239        agent_id: &str,
240        key: &str,
241        as_of: Option<&str>,
242    ) -> Result<Value, PulseError> {
243        let mut path = format!(
244            "/api/pulse/iq/agents/{}/state/value/{}",
245            encode_segment(agent_id),
246            encode_segment(key),
247        );
248        if let Some(spec) = as_of {
249            path.push_str("?as_of=");
250            path.push_str(&encode_segment(spec));
251        }
252        self.client
253            .request(Method::GET, &path, None::<&()>, true)
254            .await
255    }
256
257    /// `GET /api/pulse/iq/agents/{id}/state/diff/{key}?from=&to=` — B-113
258    /// field-level state diff.
259    ///
260    /// Returns the delta of `key`'s state between two instants. `from` and
261    /// `to` accept the same specs as [`get_as_of`](Self::get_as_of); they
262    /// default server-side to `-1h` / `now` when blank, but this method
263    /// always sends them explicitly. The response carries
264    /// `{agentId, key, fromTs, toTs, changes}` where `changes` maps each
265    /// changed field to `{delta?, from, to}` (`delta` present for numeric
266    /// fields), or `{added}` / `{removed}`:
267    ///
268    /// ```no_run
269    /// # use pulse_client::PulseClient;
270    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
271    /// let d = client.iq().diff("user-sessions", "u42", "-1h", "now").await?;
272    /// let delta = d["changes"]["cart_value"]["delta"].as_f64();
273    /// # Ok(())
274    /// # }
275    /// ```
276    pub async fn diff(
277        self,
278        agent_id: &str,
279        key: &str,
280        from: &str,
281        to: &str,
282    ) -> Result<Value, PulseError> {
283        let path = format!(
284            "/api/pulse/iq/agents/{}/state/diff/{}?from={}&to={}",
285            encode_segment(agent_id),
286            encode_segment(key),
287            encode_segment(from),
288            encode_segment(to),
289        );
290        self.client
291            .request(Method::GET, &path, None::<&()>, true)
292            .await
293    }
294
295    /// `GET /api/pulse/iq/agents/{id}/state/scan` — paginated range scan.
296    ///
297    /// Inspect `truncated` to decide if more data exists; paginate by
298    /// setting `opts.start` on the next call to the last returned key
299    /// plus a sentinel suffix.
300    pub async fn scan(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
301        let path = format!(
302            "/api/pulse/iq/agents/{}/state/scan{}",
303            encode_segment(agent_id),
304            scan_query(&opts),
305        );
306        self.client
307            .request(Method::GET, &path, None::<&()>, true)
308            .await
309    }
310
311    /// `GET /api/pulse/iq/agents/{id}/state/keys` — keys-only range scan.
312    ///
313    /// Same shape as [`scan`](Self::scan) minus the values; `keys` field
314    /// is a JSON array of strings.
315    pub async fn list_keys(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
316        let path = format!(
317            "/api/pulse/iq/agents/{}/state/keys{}",
318            encode_segment(agent_id),
319            scan_query(&opts),
320        );
321        self.client
322            .request(Method::GET, &path, None::<&()>, true)
323            .await
324    }
325
326    /// `POST /api/pulse/iq/agents/{id}/state/query` — filtered / projected
327    /// / grouped query.
328    ///
329    /// When `opts.group_by` is set, the response shape is
330    /// `{groups: [{groupKey, count}], groupCount, ...}` instead of
331    /// `{entries: [...], count, ...}`.
332    ///
333    /// # Errors
334    /// - [`PulseError::Validation`] on invalid filter syntax (HTTP 400)
335    /// - [`PulseError::NotFound`] when the agent is not queryable
336    pub async fn query(self, agent_id: &str, opts: IQQueryOptions) -> Result<Value, PulseError> {
337        let path = format!(
338            "/api/pulse/iq/agents/{}/state/query",
339            encode_segment(agent_id),
340        );
341        let body = build_query_body(opts);
342        // Empty body → send None so the server defaults to a full scan
343        // (matches the in-tree handler's behaviour on missing body).
344        if body.is_object() && body.as_object().is_some_and(|m| m.is_empty()) {
345            self.client
346                .request::<()>(Method::POST, &path, None, true)
347                .await
348        } else {
349            self.client
350                .request(Method::POST, &path, Some(&body), true)
351                .await
352        }
353    }
354}
355
356/// Builds the `?limit=N&start=...&end=...` query suffix.
357///
358/// `limit` is always sent (defaulting to 100 when `opts.limit` is `None`)
359/// so the server gets a deterministic value. Missing `start`/`end` are
360/// omitted so the URL stays clean.
361fn scan_query(opts: &IQScanOptions) -> String {
362    let limit = opts.limit.unwrap_or(100);
363    let mut q = format!("?limit={limit}");
364    if let Some(start) = &opts.start {
365        q.push_str("&start=");
366        q.push_str(&encode_segment(start));
367    }
368    if let Some(end) = &opts.end {
369        q.push_str("&end=");
370        q.push_str(&encode_segment(end));
371    }
372    q
373}
374
375/// Flattens [`IQQueryOptions`] into the JSON body the server expects.
376/// Only includes fields the caller actually set so the wire payload is
377/// stable + diff-friendly.
378fn build_query_body(opts: IQQueryOptions) -> Value {
379    let mut m = Map::new();
380    if let Some(s) = opts.start {
381        m.insert("start".into(), Value::String(s));
382    }
383    if let Some(e) = opts.end {
384        m.insert("end".into(), Value::String(e));
385    }
386    if let Some(l) = opts.limit {
387        m.insert("limit".into(), Value::Number(l.into()));
388    }
389    if let Some(f) = opts.filter {
390        m.insert("filter".into(), f);
391    }
392    if let Some(p) = opts.projection {
393        m.insert(
394            "projection".into(),
395            Value::Array(p.into_iter().map(Value::String).collect()),
396        );
397    }
398    if let Some(g) = opts.group_by {
399        m.insert("groupBy".into(), Value::String(g));
400    }
401    Value::Object(m)
402}
403
404/// Percent-encodes a path segment aggressively — same semantics as
405/// Python's `urllib.quote(safe='')`, Java's `URLEncoder.encode` followed
406/// by `'+'`→`'%20'`, JS's `encodeURIComponent`, and Go's QueryEscape +
407/// `'+'`→`'%20'`. Keeps the wire format identical across all 5 Pulse
408/// SDKs so a key like `"user:123/orders"` produces the same URL bytes
409/// regardless of caller language.
410fn encode_segment(s: &str) -> String {
411    let mut out = String::with_capacity(s.len());
412    for &b in s.as_bytes() {
413        match b {
414            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
415                out.push(b as char);
416            }
417            _ => {
418                out.push('%');
419                out.push(HEX[(b >> 4) as usize] as char);
420                out.push(HEX[(b & 0xF) as usize] as char);
421            }
422        }
423    }
424    out
425}
426
427const HEX: &[u8; 16] = b"0123456789ABCDEF";
428
429impl std::fmt::Debug for IQResource<'_> {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        f.debug_struct("IQResource").finish()
432    }
433}