pulse_client/iq.rs
1//! `client.iq()` — B-106 Interactive Queries.
2//!
3//! Query the live state of streaming agents like a database from any async
4//! Rust service. The killer use case is a synchronous decision microservice
5//! (fraud, rate-limit, pricing) calling [`IQResource::get`] on every
6//! request and reading agent state from RAM with zero ingest-to-decision
7//! lag:
8//!
9//! ```no_run
10//! use pulse_client::PulseClient;
11//!
12//! # async fn run() -> Result<(), pulse_client::PulseError> {
13//! let client = PulseClient::builder()
14//! .base_url("http://localhost:9090")
15//! .token("ey...")
16//! .build()?;
17//!
18//! let state = client.iq().get("fraud-detector", "customer-42").await?;
19//! let tx_count = state["value"]["tx_count_60s"].as_u64().unwrap_or(0);
20//! if tx_count > 5 {
21//! // deny payment
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! All methods require the `AGENT_READ` permission (Owner, Platform Admin,
28//! Developer, Auditor personas by default — see B-105).
29//!
30//! Responses are returned as [`serde_json::Value`] so callers can paginate,
31//! inspect `truncated` / `limitApplied` / `totalScanned` metadata, and read
32//! fields without going through a wrapper layer. Strongly-typed structs can
33//! be layered on top in user code if desired; the SDK stays close to the
34//! wire.
35
36use reqwest::Method;
37use serde::Serialize;
38use serde_json::{json, Map, Value};
39
40use crate::client::PulseClient;
41use crate::error::PulseError;
42
43/// `client.iq()` — accessor for Interactive Queries.
44pub struct IQResource<'c> {
45 pub(crate) client: &'c PulseClient,
46}
47
48/// Optional range bounds + page size for [`IQResource::scan`] and
49/// [`IQResource::list_keys`].
50///
51/// Default = no range, limit 100. Limit > 1000 is clamped server-side
52/// (response carries `X-Pulse-Pagination-Clamped: true` header when
53/// clamped — not surfaced in the parsed body).
54#[derive(Debug, Clone, Default)]
55pub struct IQScanOptions {
56 /// Inclusive lower bound on the key range. `None` = beginning.
57 pub start: Option<String>,
58 /// Exclusive upper bound on the key range. `None` = end.
59 pub end: Option<String>,
60 /// Page size. `None` defaults to 100.
61 pub limit: Option<u32>,
62}
63
64impl IQScanOptions {
65 /// Returns default options (no range, limit 100).
66 pub fn new() -> Self {
67 Self::default()
68 }
69
70 /// Sets the inclusive lower bound.
71 pub fn start(mut self, start: impl Into<String>) -> Self {
72 self.start = Some(start.into());
73 self
74 }
75
76 /// Sets the exclusive upper bound.
77 pub fn end(mut self, end: impl Into<String>) -> Self {
78 self.end = Some(end.into());
79 self
80 }
81
82 /// Sets the page size.
83 pub fn limit(mut self, limit: u32) -> Self {
84 self.limit = Some(limit);
85 self
86 }
87}
88
89/// Optional inputs for [`IQResource::query`].
90///
91/// The `filter` is a recursive [`Value`] shaped per the IQFilterExpression
92/// schema: each node MUST carry exactly ONE of `field` (leaf), `and`
93/// (array of sub-expressions, all must match), `or` (array, any must
94/// match), or `not` (single sub-expression). Mixing in a single node
95/// returns HTTP 400.
96///
97/// Use the [`iq_leaf`], [`iq_and`], [`iq_or`], [`iq_not`] free functions
98/// to construct filter trees ergonomically.
99#[derive(Debug, Clone, Default)]
100pub struct IQQueryOptions {
101 pub start: Option<String>,
102 pub end: Option<String>,
103 pub limit: Option<u32>,
104 pub filter: Option<Value>,
105 pub projection: Option<Vec<String>>,
106 /// Field name to group on. `Some` switches the response shape from
107 /// flat `{entries, ...}` to grouped `{groups: [{groupKey, count}], ...}`.
108 /// Use `"$value"` for scalar states.
109 pub group_by: Option<String>,
110}
111
112impl IQQueryOptions {
113 pub fn new() -> Self {
114 Self::default()
115 }
116
117 pub fn start(mut self, s: impl Into<String>) -> Self {
118 self.start = Some(s.into());
119 self
120 }
121
122 pub fn end(mut self, s: impl Into<String>) -> Self {
123 self.end = Some(s.into());
124 self
125 }
126
127 pub fn limit(mut self, n: u32) -> Self {
128 self.limit = Some(n);
129 self
130 }
131
132 pub fn filter(mut self, f: Value) -> Self {
133 self.filter = Some(f);
134 self
135 }
136
137 pub fn projection(mut self, fields: Vec<String>) -> Self {
138 self.projection = Some(fields);
139 self
140 }
141
142 pub fn group_by(mut self, field: impl Into<String>) -> Self {
143 self.group_by = Some(field.into());
144 self
145 }
146}
147
148/// Builds a leaf filter node: `{"field": ..., "op": ..., "value": ...}`.
149/// Pass an empty `op` to omit it (e.g. for an `exists`-style test where
150/// the field's mere presence suffices).
151pub fn iq_leaf(field: &str, op: &str, value: impl Serialize) -> Value {
152 let mut m = Map::new();
153 m.insert("field".into(), Value::String(field.into()));
154 if !op.is_empty() {
155 m.insert("op".into(), Value::String(op.into()));
156 }
157 m.insert(
158 "value".into(),
159 serde_json::to_value(value).unwrap_or(Value::Null),
160 );
161 Value::Object(m)
162}
163
164/// Builds an AND filter combining all children.
165pub fn iq_and(children: Vec<Value>) -> Value {
166 json!({ "and": children })
167}
168
169/// Builds an OR filter combining all children.
170pub fn iq_or(children: Vec<Value>) -> Value {
171 json!({ "or": children })
172}
173
174/// Builds a NOT filter negating its child.
175pub fn iq_not(child: Value) -> Value {
176 json!({ "not": child })
177}
178
179impl<'c> IQResource<'c> {
180 /// `GET /api/pulse/iq/agents/{id}/state` — headline state summary.
181 ///
182 /// Returns the IQSummary [`Value`] — always carries `agentId`,
183 /// `queryable`, `backend`, `hotSize`, `hotBytes`, `coldSize`,
184 /// `coldBytes`, `lastCheckpointId`, `totalSize`. When the agent has
185 /// no live streaming backend: `queryable=false`, `backend="none"`,
186 /// numerics 0, `lastCheckpointId=-1`.
187 pub async fn summary(self, agent_id: &str) -> Result<Value, PulseError> {
188 let path = format!("/api/pulse/iq/agents/{}/state", encode_segment(agent_id));
189 self.client
190 .request(Method::GET, &path, None::<&()>, true)
191 .await
192 }
193
194 /// `GET /api/pulse/iq/agents/{id}/state/value/{key}` — point lookup.
195 ///
196 /// Returns the IQValue [`Value`] (`agentId`, `key`, `value` — `value`
197 /// can be any JSON type including `null`).
198 ///
199 /// # Errors
200 /// Returns [`PulseError::NotFound`] when the key is absent OR the
201 /// agent is not queryable. Inspect the variant's `body` field:
202 /// `error == "Key not found"` vs `error == "Agent has no queryable
203 /// state"` (with `reason` field) — to distinguish.
204 pub async fn get(self, agent_id: &str, key: &str) -> Result<Value, PulseError> {
205 let path = format!(
206 "/api/pulse/iq/agents/{}/state/value/{}",
207 encode_segment(agent_id),
208 encode_segment(key),
209 );
210 self.client
211 .request(Method::GET, &path, None::<&()>, true)
212 .await
213 }
214
215 /// `GET /api/pulse/iq/agents/{id}/state/scan` — paginated range scan.
216 ///
217 /// Inspect `truncated` to decide if more data exists; paginate by
218 /// setting `opts.start` on the next call to the last returned key
219 /// plus a sentinel suffix.
220 pub async fn scan(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
221 let path = format!(
222 "/api/pulse/iq/agents/{}/state/scan{}",
223 encode_segment(agent_id),
224 scan_query(&opts),
225 );
226 self.client
227 .request(Method::GET, &path, None::<&()>, true)
228 .await
229 }
230
231 /// `GET /api/pulse/iq/agents/{id}/state/keys` — keys-only range scan.
232 ///
233 /// Same shape as [`scan`](Self::scan) minus the values; `keys` field
234 /// is a JSON array of strings.
235 pub async fn list_keys(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
236 let path = format!(
237 "/api/pulse/iq/agents/{}/state/keys{}",
238 encode_segment(agent_id),
239 scan_query(&opts),
240 );
241 self.client
242 .request(Method::GET, &path, None::<&()>, true)
243 .await
244 }
245
246 /// `POST /api/pulse/iq/agents/{id}/state/query` — filtered / projected
247 /// / grouped query.
248 ///
249 /// When `opts.group_by` is set, the response shape is
250 /// `{groups: [{groupKey, count}], groupCount, ...}` instead of
251 /// `{entries: [...], count, ...}`.
252 ///
253 /// # Errors
254 /// - [`PulseError::Validation`] on invalid filter syntax (HTTP 400)
255 /// - [`PulseError::NotFound`] when the agent is not queryable
256 pub async fn query(self, agent_id: &str, opts: IQQueryOptions) -> Result<Value, PulseError> {
257 let path = format!(
258 "/api/pulse/iq/agents/{}/state/query",
259 encode_segment(agent_id),
260 );
261 let body = build_query_body(opts);
262 // Empty body → send None so the server defaults to a full scan
263 // (matches the in-tree handler's behaviour on missing body).
264 if body.is_object() && body.as_object().is_some_and(|m| m.is_empty()) {
265 self.client
266 .request::<()>(Method::POST, &path, None, true)
267 .await
268 } else {
269 self.client
270 .request(Method::POST, &path, Some(&body), true)
271 .await
272 }
273 }
274}
275
276/// Builds the `?limit=N&start=...&end=...` query suffix.
277///
278/// `limit` is always sent (defaulting to 100 when `opts.limit` is `None`)
279/// so the server gets a deterministic value. Missing `start`/`end` are
280/// omitted so the URL stays clean.
281fn scan_query(opts: &IQScanOptions) -> String {
282 let limit = opts.limit.unwrap_or(100);
283 let mut q = format!("?limit={limit}");
284 if let Some(start) = &opts.start {
285 q.push_str("&start=");
286 q.push_str(&encode_segment(start));
287 }
288 if let Some(end) = &opts.end {
289 q.push_str("&end=");
290 q.push_str(&encode_segment(end));
291 }
292 q
293}
294
295/// Flattens [`IQQueryOptions`] into the JSON body the server expects.
296/// Only includes fields the caller actually set so the wire payload is
297/// stable + diff-friendly.
298fn build_query_body(opts: IQQueryOptions) -> Value {
299 let mut m = Map::new();
300 if let Some(s) = opts.start {
301 m.insert("start".into(), Value::String(s));
302 }
303 if let Some(e) = opts.end {
304 m.insert("end".into(), Value::String(e));
305 }
306 if let Some(l) = opts.limit {
307 m.insert("limit".into(), Value::Number(l.into()));
308 }
309 if let Some(f) = opts.filter {
310 m.insert("filter".into(), f);
311 }
312 if let Some(p) = opts.projection {
313 m.insert(
314 "projection".into(),
315 Value::Array(p.into_iter().map(Value::String).collect()),
316 );
317 }
318 if let Some(g) = opts.group_by {
319 m.insert("groupBy".into(), Value::String(g));
320 }
321 Value::Object(m)
322}
323
324/// Percent-encodes a path segment aggressively — same semantics as
325/// Python's `urllib.quote(safe='')`, Java's `URLEncoder.encode` followed
326/// by `'+'`→`'%20'`, JS's `encodeURIComponent`, and Go's QueryEscape +
327/// `'+'`→`'%20'`. Keeps the wire format identical across all 5 Pulse
328/// SDKs so a key like `"user:123/orders"` produces the same URL bytes
329/// regardless of caller language.
330fn encode_segment(s: &str) -> String {
331 let mut out = String::with_capacity(s.len());
332 for &b in s.as_bytes() {
333 match b {
334 b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
335 out.push(b as char);
336 }
337 _ => {
338 out.push('%');
339 out.push(HEX[(b >> 4) as usize] as char);
340 out.push(HEX[(b & 0xF) as usize] as char);
341 }
342 }
343 }
344 out
345}
346
347const HEX: &[u8; 16] = b"0123456789ABCDEF";
348
349impl std::fmt::Debug for IQResource<'_> {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 f.debug_struct("IQResource").finish()
352 }
353}