Skip to main content

pulse_client/
iq.rs

1//! `client.iq()` — B-106 Interactive Queries.
2//!
3//! Query the live state of streaming agents like a database from any async
4//! Rust service. The killer use case is a synchronous decision microservice
5//! (fraud, rate-limit, pricing) calling [`IQResource::get`] on every
6//! request and reading agent state from RAM with zero ingest-to-decision
7//! lag:
8//!
9//! ```no_run
10//! use pulse_client::PulseClient;
11//!
12//! # async fn run() -> Result<(), pulse_client::PulseError> {
13//! let client = PulseClient::builder()
14//!     .base_url("http://localhost:9090")
15//!     .token("ey...")
16//!     .build()?;
17//!
18//! let state = client.iq().get("fraud-detector", "customer-42").await?;
19//! let tx_count = state["value"]["tx_count_60s"].as_u64().unwrap_or(0);
20//! if tx_count > 5 {
21//!     // deny payment
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! All methods require the `AGENT_READ` permission (Owner, Platform Admin,
28//! Developer, Auditor personas by default — see B-105).
29//!
30//! Responses are returned as [`serde_json::Value`] so callers can paginate,
31//! inspect `truncated` / `limitApplied` / `totalScanned` metadata, and read
32//! fields without going through a wrapper layer. Strongly-typed structs can
33//! be layered on top in user code if desired; the SDK stays close to the
34//! wire.
35
36use reqwest::Method;
37use serde::Serialize;
38use serde_json::{json, Map, Value};
39
40use crate::client::PulseClient;
41use crate::error::PulseError;
42
43/// `client.iq()` — accessor for Interactive Queries.
44pub struct IQResource<'c> {
45    pub(crate) client: &'c PulseClient,
46}
47
48/// Optional range bounds + page size for [`IQResource::scan`] and
49/// [`IQResource::list_keys`].
50///
51/// Default = no range, limit 100. Limit > 1000 is clamped server-side
52/// (response carries `X-Pulse-Pagination-Clamped: true` header when
53/// clamped — not surfaced in the parsed body).
54#[derive(Debug, Clone, Default)]
55pub struct IQScanOptions {
56    /// Inclusive lower bound on the key range. `None` = beginning.
57    pub start: Option<String>,
58    /// Exclusive upper bound on the key range. `None` = end.
59    pub end: Option<String>,
60    /// Page size. `None` defaults to 100.
61    pub limit: Option<u32>,
62}
63
64impl IQScanOptions {
65    /// Returns default options (no range, limit 100).
66    pub fn new() -> Self {
67        Self::default()
68    }
69
70    /// Sets the inclusive lower bound.
71    pub fn start(mut self, start: impl Into<String>) -> Self {
72        self.start = Some(start.into());
73        self
74    }
75
76    /// Sets the exclusive upper bound.
77    pub fn end(mut self, end: impl Into<String>) -> Self {
78        self.end = Some(end.into());
79        self
80    }
81
82    /// Sets the page size.
83    pub fn limit(mut self, limit: u32) -> Self {
84        self.limit = Some(limit);
85        self
86    }
87}
88
89/// Optional inputs for [`IQResource::query`].
90///
91/// The `filter` is a recursive [`Value`] shaped per the IQFilterExpression
92/// schema: each node MUST carry exactly ONE of `field` (leaf), `and`
93/// (array of sub-expressions, all must match), `or` (array, any must
94/// match), or `not` (single sub-expression). Mixing in a single node
95/// returns HTTP 400.
96///
97/// Use the [`iq_leaf`], [`iq_and`], [`iq_or`], [`iq_not`] free functions
98/// to construct filter trees ergonomically.
99#[derive(Debug, Clone, Default)]
100pub struct IQQueryOptions {
101    pub start: Option<String>,
102    pub end: Option<String>,
103    pub limit: Option<u32>,
104    pub filter: Option<Value>,
105    pub projection: Option<Vec<String>>,
106    /// Field name to group on. `Some` switches the response shape from
107    /// flat `{entries, ...}` to grouped `{groups: [{groupKey, count}], ...}`.
108    /// Use `"$value"` for scalar states.
109    pub group_by: Option<String>,
110}
111
112impl IQQueryOptions {
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    pub fn start(mut self, s: impl Into<String>) -> Self {
118        self.start = Some(s.into());
119        self
120    }
121
122    pub fn end(mut self, s: impl Into<String>) -> Self {
123        self.end = Some(s.into());
124        self
125    }
126
127    pub fn limit(mut self, n: u32) -> Self {
128        self.limit = Some(n);
129        self
130    }
131
132    pub fn filter(mut self, f: Value) -> Self {
133        self.filter = Some(f);
134        self
135    }
136
137    pub fn projection(mut self, fields: Vec<String>) -> Self {
138        self.projection = Some(fields);
139        self
140    }
141
142    pub fn group_by(mut self, field: impl Into<String>) -> Self {
143        self.group_by = Some(field.into());
144        self
145    }
146}
147
148/// Builds a leaf filter node: `{"field": ..., "op": ..., "value": ...}`.
149/// Pass an empty `op` to omit it (e.g. for an `exists`-style test where
150/// the field's mere presence suffices).
151pub fn iq_leaf(field: &str, op: &str, value: impl Serialize) -> Value {
152    let mut m = Map::new();
153    m.insert("field".into(), Value::String(field.into()));
154    if !op.is_empty() {
155        m.insert("op".into(), Value::String(op.into()));
156    }
157    m.insert(
158        "value".into(),
159        serde_json::to_value(value).unwrap_or(Value::Null),
160    );
161    Value::Object(m)
162}
163
164/// Builds an AND filter combining all children.
165pub fn iq_and(children: Vec<Value>) -> Value {
166    json!({ "and": children })
167}
168
169/// Builds an OR filter combining all children.
170pub fn iq_or(children: Vec<Value>) -> Value {
171    json!({ "or": children })
172}
173
174/// Builds a NOT filter negating its child.
175pub fn iq_not(child: Value) -> Value {
176    json!({ "not": child })
177}
178
179impl<'c> IQResource<'c> {
180    /// `GET /api/pulse/iq/agents/{id}/state` — headline state summary.
181    ///
182    /// Returns the IQSummary [`Value`] — always carries `agentId`,
183    /// `queryable`, `backend`, `hotSize`, `hotBytes`, `coldSize`,
184    /// `coldBytes`, `lastCheckpointId`, `totalSize`. When the agent has
185    /// no live streaming backend: `queryable=false`, `backend="none"`,
186    /// numerics 0, `lastCheckpointId=-1`.
187    pub async fn summary(self, agent_id: &str) -> Result<Value, PulseError> {
188        let path = format!("/api/pulse/iq/agents/{}/state", encode_segment(agent_id));
189        self.client
190            .request(Method::GET, &path, None::<&()>, true)
191            .await
192    }
193
194    /// `GET /api/pulse/iq/agents/{id}/state/value/{key}` — point lookup.
195    ///
196    /// Returns the IQValue [`Value`] (`agentId`, `key`, `value` — `value`
197    /// can be any JSON type including `null`).
198    ///
199    /// # Errors
200    /// Returns [`PulseError::NotFound`] when the key is absent OR the
201    /// agent is not queryable. Inspect the variant's `body` field:
202    /// `error == "Key not found"` vs `error == "Agent has no queryable
203    /// state"` (with `reason` field) — to distinguish.
204    pub async fn get(self, agent_id: &str, key: &str) -> Result<Value, PulseError> {
205        let path = format!(
206            "/api/pulse/iq/agents/{}/state/value/{}",
207            encode_segment(agent_id),
208            encode_segment(key),
209        );
210        self.client
211            .request(Method::GET, &path, None::<&()>, true)
212            .await
213    }
214
215    /// `GET /api/pulse/iq/agents/{id}/state/scan` — paginated range scan.
216    ///
217    /// Inspect `truncated` to decide if more data exists; paginate by
218    /// setting `opts.start` on the next call to the last returned key
219    /// plus a sentinel suffix.
220    pub async fn scan(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
221        let path = format!(
222            "/api/pulse/iq/agents/{}/state/scan{}",
223            encode_segment(agent_id),
224            scan_query(&opts),
225        );
226        self.client
227            .request(Method::GET, &path, None::<&()>, true)
228            .await
229    }
230
231    /// `GET /api/pulse/iq/agents/{id}/state/keys` — keys-only range scan.
232    ///
233    /// Same shape as [`scan`](Self::scan) minus the values; `keys` field
234    /// is a JSON array of strings.
235    pub async fn list_keys(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
236        let path = format!(
237            "/api/pulse/iq/agents/{}/state/keys{}",
238            encode_segment(agent_id),
239            scan_query(&opts),
240        );
241        self.client
242            .request(Method::GET, &path, None::<&()>, true)
243            .await
244    }
245
246    /// `POST /api/pulse/iq/agents/{id}/state/query` — filtered / projected
247    /// / grouped query.
248    ///
249    /// When `opts.group_by` is set, the response shape is
250    /// `{groups: [{groupKey, count}], groupCount, ...}` instead of
251    /// `{entries: [...], count, ...}`.
252    ///
253    /// # Errors
254    /// - [`PulseError::Validation`] on invalid filter syntax (HTTP 400)
255    /// - [`PulseError::NotFound`] when the agent is not queryable
256    pub async fn query(self, agent_id: &str, opts: IQQueryOptions) -> Result<Value, PulseError> {
257        let path = format!(
258            "/api/pulse/iq/agents/{}/state/query",
259            encode_segment(agent_id),
260        );
261        let body = build_query_body(opts);
262        // Empty body → send None so the server defaults to a full scan
263        // (matches the in-tree handler's behaviour on missing body).
264        if body.is_object() && body.as_object().is_some_and(|m| m.is_empty()) {
265            self.client
266                .request::<()>(Method::POST, &path, None, true)
267                .await
268        } else {
269            self.client
270                .request(Method::POST, &path, Some(&body), true)
271                .await
272        }
273    }
274}
275
276/// Builds the `?limit=N&start=...&end=...` query suffix.
277///
278/// `limit` is always sent (defaulting to 100 when `opts.limit` is `None`)
279/// so the server gets a deterministic value. Missing `start`/`end` are
280/// omitted so the URL stays clean.
281fn scan_query(opts: &IQScanOptions) -> String {
282    let limit = opts.limit.unwrap_or(100);
283    let mut q = format!("?limit={limit}");
284    if let Some(start) = &opts.start {
285        q.push_str("&start=");
286        q.push_str(&encode_segment(start));
287    }
288    if let Some(end) = &opts.end {
289        q.push_str("&end=");
290        q.push_str(&encode_segment(end));
291    }
292    q
293}
294
295/// Flattens [`IQQueryOptions`] into the JSON body the server expects.
296/// Only includes fields the caller actually set so the wire payload is
297/// stable + diff-friendly.
298fn build_query_body(opts: IQQueryOptions) -> Value {
299    let mut m = Map::new();
300    if let Some(s) = opts.start {
301        m.insert("start".into(), Value::String(s));
302    }
303    if let Some(e) = opts.end {
304        m.insert("end".into(), Value::String(e));
305    }
306    if let Some(l) = opts.limit {
307        m.insert("limit".into(), Value::Number(l.into()));
308    }
309    if let Some(f) = opts.filter {
310        m.insert("filter".into(), f);
311    }
312    if let Some(p) = opts.projection {
313        m.insert(
314            "projection".into(),
315            Value::Array(p.into_iter().map(Value::String).collect()),
316        );
317    }
318    if let Some(g) = opts.group_by {
319        m.insert("groupBy".into(), Value::String(g));
320    }
321    Value::Object(m)
322}
323
324/// Percent-encodes a path segment aggressively — same semantics as
325/// Python's `urllib.quote(safe='')`, Java's `URLEncoder.encode` followed
326/// by `'+'`→`'%20'`, JS's `encodeURIComponent`, and Go's QueryEscape +
327/// `'+'`→`'%20'`. Keeps the wire format identical across all 5 Pulse
328/// SDKs so a key like `"user:123/orders"` produces the same URL bytes
329/// regardless of caller language.
330fn encode_segment(s: &str) -> String {
331    let mut out = String::with_capacity(s.len());
332    for &b in s.as_bytes() {
333        match b {
334            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
335                out.push(b as char);
336            }
337            _ => {
338                out.push('%');
339                out.push(HEX[(b >> 4) as usize] as char);
340                out.push(HEX[(b & 0xF) as usize] as char);
341            }
342        }
343    }
344    out
345}
346
347const HEX: &[u8; 16] = b"0123456789ABCDEF";
348
349impl std::fmt::Debug for IQResource<'_> {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        f.debug_struct("IQResource").finish()
352    }
353}