pulse-client 2.6.1

Official Rust client for StreamFlow Pulse — AI Agent Platform
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
//! `client.iq()` — B-106 Interactive Queries.
//!
//! Query the live state of streaming agents like a database from any async
//! Rust service. The killer use case is a synchronous decision microservice
//! (fraud, rate-limit, pricing) calling [`IQResource::get`] on every
//! request and reading agent state from RAM with zero ingest-to-decision
//! lag:
//!
//! ```no_run
//! use pulse_client::PulseClient;
//!
//! # async fn run() -> Result<(), pulse_client::PulseError> {
//! let client = PulseClient::builder()
//!     .base_url("http://localhost:9090")
//!     .token("ey...")
//!     .build()?;
//!
//! let state = client.iq().get("fraud-detector", "customer-42").await?;
//! let tx_count = state["value"]["tx_count_60s"].as_u64().unwrap_or(0);
//! if tx_count > 5 {
//!     // deny payment
//! }
//! # Ok(())
//! # }
//! ```
//!
//! All methods require the `AGENT_READ` permission (Owner, Platform Admin,
//! Developer, Auditor personas by default — see B-105).
//!
//! Responses are returned as [`serde_json::Value`] so callers can paginate,
//! inspect `truncated` / `limitApplied` / `totalScanned` metadata, and read
//! fields without going through a wrapper layer. Strongly-typed structs can
//! be layered on top in user code if desired; the SDK stays close to the
//! wire.

use reqwest::Method;
use serde::Serialize;
use serde_json::{json, Map, Value};

use crate::client::PulseClient;
use crate::error::PulseError;

/// `client.iq()` — accessor for Interactive Queries.
pub struct IQResource<'c> {
    pub(crate) client: &'c PulseClient,
}

/// Optional range bounds + page size for [`IQResource::scan`] and
/// [`IQResource::list_keys`].
///
/// Default = no range, limit 100. Limit > 1000 is clamped server-side
/// (response carries `X-Pulse-Pagination-Clamped: true` header when
/// clamped — not surfaced in the parsed body).
#[derive(Debug, Clone, Default)]
pub struct IQScanOptions {
    /// Inclusive lower bound on the key range. `None` = beginning.
    pub start: Option<String>,
    /// Exclusive upper bound on the key range. `None` = end.
    pub end: Option<String>,
    /// Page size. `None` defaults to 100.
    pub limit: Option<u32>,
}

impl IQScanOptions {
    /// Returns default options (no range, limit 100).
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the inclusive lower bound.
    pub fn start(mut self, start: impl Into<String>) -> Self {
        self.start = Some(start.into());
        self
    }

    /// Sets the exclusive upper bound.
    pub fn end(mut self, end: impl Into<String>) -> Self {
        self.end = Some(end.into());
        self
    }

    /// Sets the page size.
    pub fn limit(mut self, limit: u32) -> Self {
        self.limit = Some(limit);
        self
    }
}

/// Optional inputs for [`IQResource::query`].
///
/// The `filter` is a recursive [`Value`] shaped per the IQFilterExpression
/// schema: each node MUST carry exactly ONE of `field` (leaf), `and`
/// (array of sub-expressions, all must match), `or` (array, any must
/// match), or `not` (single sub-expression). Mixing in a single node
/// returns HTTP 400.
///
/// Use the [`iq_leaf`], [`iq_and`], [`iq_or`], [`iq_not`] free functions
/// to construct filter trees ergonomically.
#[derive(Debug, Clone, Default)]
pub struct IQQueryOptions {
    pub start: Option<String>,
    pub end: Option<String>,
    pub limit: Option<u32>,
    pub filter: Option<Value>,
    pub projection: Option<Vec<String>>,
    /// Field name to group on. `Some` switches the response shape from
    /// flat `{entries, ...}` to grouped `{groups: [{groupKey, count}], ...}`.
    /// Use `"$value"` for scalar states.
    pub group_by: Option<String>,
}

impl IQQueryOptions {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn start(mut self, s: impl Into<String>) -> Self {
        self.start = Some(s.into());
        self
    }

    pub fn end(mut self, s: impl Into<String>) -> Self {
        self.end = Some(s.into());
        self
    }

    pub fn limit(mut self, n: u32) -> Self {
        self.limit = Some(n);
        self
    }

    pub fn filter(mut self, f: Value) -> Self {
        self.filter = Some(f);
        self
    }

    pub fn projection(mut self, fields: Vec<String>) -> Self {
        self.projection = Some(fields);
        self
    }

    pub fn group_by(mut self, field: impl Into<String>) -> Self {
        self.group_by = Some(field.into());
        self
    }
}

/// Builds a leaf filter node: `{"field": ..., "op": ..., "value": ...}`.
/// Pass an empty `op` to omit it (e.g. for an `exists`-style test where
/// the field's mere presence suffices).
pub fn iq_leaf(field: &str, op: &str, value: impl Serialize) -> Value {
    let mut m = Map::new();
    m.insert("field".into(), Value::String(field.into()));
    if !op.is_empty() {
        m.insert("op".into(), Value::String(op.into()));
    }
    m.insert(
        "value".into(),
        serde_json::to_value(value).unwrap_or(Value::Null),
    );
    Value::Object(m)
}

/// Builds an AND filter combining all children.
pub fn iq_and(children: Vec<Value>) -> Value {
    json!({ "and": children })
}

/// Builds an OR filter combining all children.
pub fn iq_or(children: Vec<Value>) -> Value {
    json!({ "or": children })
}

/// Builds a NOT filter negating its child.
pub fn iq_not(child: Value) -> Value {
    json!({ "not": child })
}

impl<'c> IQResource<'c> {
    /// `GET /api/pulse/iq/agents/{id}/state` — headline state summary.
    ///
    /// Returns the IQSummary [`Value`] — always carries `agentId`,
    /// `queryable`, `backend`, `hotSize`, `hotBytes`, `coldSize`,
    /// `coldBytes`, `lastCheckpointId`, `totalSize`. When the agent has
    /// no live streaming backend: `queryable=false`, `backend="none"`,
    /// numerics 0, `lastCheckpointId=-1`.
    pub async fn summary(self, agent_id: &str) -> Result<Value, PulseError> {
        let path = format!("/api/pulse/iq/agents/{}/state", encode_segment(agent_id));
        self.client
            .request(Method::GET, &path, None::<&()>, true)
            .await
    }

    /// `GET /api/pulse/iq/agents/{id}/state/value/{key}` — point lookup.
    ///
    /// Returns the IQValue [`Value`] (`agentId`, `key`, `value` — `value`
    /// can be any JSON type including `null`).
    ///
    /// # Errors
    /// Returns [`PulseError::NotFound`] when the key is absent OR the
    /// agent is not queryable. Inspect the variant's `body` field:
    /// `error == "Key not found"` vs `error == "Agent has no queryable
    /// state"` (with `reason` field) — to distinguish.
    pub async fn get(self, agent_id: &str, key: &str) -> Result<Value, PulseError> {
        self.get_as_of_inner(agent_id, key, None).await
    }

    /// `GET /api/pulse/iq/agents/{id}/state/value/{key}?as_of=<spec>` —
    /// B-113 time-travel point lookup.
    ///
    /// Reads the value as it was at a past instant instead of the live value.
    /// `as_of` accepts `now`, a relative offset (`-1h`, `-30m`, `-7d`), an
    /// ISO-8601 instant, or epoch millis — passed through to the server
    /// verbatim. The response then also carries `asOf` (resolved epoch ms)
    /// alongside the usual `agentId`, `key`, `value`:
    ///
    /// ```no_run
    /// # use pulse_client::PulseClient;
    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
    /// let state = client.iq().get_as_of("user-sessions", "u42", "-1h").await?;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// # Errors
    /// Same as [`get`](Self::get) — [`PulseError::NotFound`] when the key is
    /// absent (at that instant) OR the agent is not queryable.
    pub async fn get_as_of(
        self,
        agent_id: &str,
        key: &str,
        as_of: &str,
    ) -> Result<Value, PulseError> {
        self.get_as_of_inner(agent_id, key, Some(as_of)).await
    }

    async fn get_as_of_inner(
        self,
        agent_id: &str,
        key: &str,
        as_of: Option<&str>,
    ) -> Result<Value, PulseError> {
        let mut path = format!(
            "/api/pulse/iq/agents/{}/state/value/{}",
            encode_segment(agent_id),
            encode_segment(key),
        );
        if let Some(spec) = as_of {
            path.push_str("?as_of=");
            path.push_str(&encode_segment(spec));
        }
        self.client
            .request(Method::GET, &path, None::<&()>, true)
            .await
    }

    /// `GET /api/pulse/iq/agents/{id}/state/diff/{key}?from=&to=` — B-113
    /// field-level state diff.
    ///
    /// Returns the delta of `key`'s state between two instants. `from` and
    /// `to` accept the same specs as [`get_as_of`](Self::get_as_of); they
    /// default server-side to `-1h` / `now` when blank, but this method
    /// always sends them explicitly. The response carries
    /// `{agentId, key, fromTs, toTs, changes}` where `changes` maps each
    /// changed field to `{delta?, from, to}` (`delta` present for numeric
    /// fields), or `{added}` / `{removed}`:
    ///
    /// ```no_run
    /// # use pulse_client::PulseClient;
    /// # async fn run(client: &PulseClient) -> Result<(), pulse_client::PulseError> {
    /// let d = client.iq().diff("user-sessions", "u42", "-1h", "now").await?;
    /// let delta = d["changes"]["cart_value"]["delta"].as_f64();
    /// # Ok(())
    /// # }
    /// ```
    pub async fn diff(
        self,
        agent_id: &str,
        key: &str,
        from: &str,
        to: &str,
    ) -> Result<Value, PulseError> {
        let path = format!(
            "/api/pulse/iq/agents/{}/state/diff/{}?from={}&to={}",
            encode_segment(agent_id),
            encode_segment(key),
            encode_segment(from),
            encode_segment(to),
        );
        self.client
            .request(Method::GET, &path, None::<&()>, true)
            .await
    }

    /// `GET /api/pulse/iq/agents/{id}/state/scan` — paginated range scan.
    ///
    /// Inspect `truncated` to decide if more data exists; paginate by
    /// setting `opts.start` on the next call to the last returned key
    /// plus a sentinel suffix.
    pub async fn scan(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
        let path = format!(
            "/api/pulse/iq/agents/{}/state/scan{}",
            encode_segment(agent_id),
            scan_query(&opts),
        );
        self.client
            .request(Method::GET, &path, None::<&()>, true)
            .await
    }

    /// `GET /api/pulse/iq/agents/{id}/state/keys` — keys-only range scan.
    ///
    /// Same shape as [`scan`](Self::scan) minus the values; `keys` field
    /// is a JSON array of strings.
    pub async fn list_keys(self, agent_id: &str, opts: IQScanOptions) -> Result<Value, PulseError> {
        let path = format!(
            "/api/pulse/iq/agents/{}/state/keys{}",
            encode_segment(agent_id),
            scan_query(&opts),
        );
        self.client
            .request(Method::GET, &path, None::<&()>, true)
            .await
    }

    /// `POST /api/pulse/iq/agents/{id}/state/query` — filtered / projected
    /// / grouped query.
    ///
    /// When `opts.group_by` is set, the response shape is
    /// `{groups: [{groupKey, count}], groupCount, ...}` instead of
    /// `{entries: [...], count, ...}`.
    ///
    /// # Errors
    /// - [`PulseError::Validation`] on invalid filter syntax (HTTP 400)
    /// - [`PulseError::NotFound`] when the agent is not queryable
    pub async fn query(self, agent_id: &str, opts: IQQueryOptions) -> Result<Value, PulseError> {
        let path = format!(
            "/api/pulse/iq/agents/{}/state/query",
            encode_segment(agent_id),
        );
        let body = build_query_body(opts);
        // Empty body → send None so the server defaults to a full scan
        // (matches the in-tree handler's behaviour on missing body).
        if body.is_object() && body.as_object().is_some_and(|m| m.is_empty()) {
            self.client
                .request::<()>(Method::POST, &path, None, true)
                .await
        } else {
            self.client
                .request(Method::POST, &path, Some(&body), true)
                .await
        }
    }
}

/// Builds the `?limit=N&start=...&end=...` query suffix.
///
/// `limit` is always sent (defaulting to 100 when `opts.limit` is `None`)
/// so the server gets a deterministic value. Missing `start`/`end` are
/// omitted so the URL stays clean.
fn scan_query(opts: &IQScanOptions) -> String {
    let limit = opts.limit.unwrap_or(100);
    let mut q = format!("?limit={limit}");
    if let Some(start) = &opts.start {
        q.push_str("&start=");
        q.push_str(&encode_segment(start));
    }
    if let Some(end) = &opts.end {
        q.push_str("&end=");
        q.push_str(&encode_segment(end));
    }
    q
}

/// Flattens [`IQQueryOptions`] into the JSON body the server expects.
/// Only includes fields the caller actually set so the wire payload is
/// stable + diff-friendly.
fn build_query_body(opts: IQQueryOptions) -> Value {
    let mut m = Map::new();
    if let Some(s) = opts.start {
        m.insert("start".into(), Value::String(s));
    }
    if let Some(e) = opts.end {
        m.insert("end".into(), Value::String(e));
    }
    if let Some(l) = opts.limit {
        m.insert("limit".into(), Value::Number(l.into()));
    }
    if let Some(f) = opts.filter {
        m.insert("filter".into(), f);
    }
    if let Some(p) = opts.projection {
        m.insert(
            "projection".into(),
            Value::Array(p.into_iter().map(Value::String).collect()),
        );
    }
    if let Some(g) = opts.group_by {
        m.insert("groupBy".into(), Value::String(g));
    }
    Value::Object(m)
}

/// Percent-encodes a path segment aggressively — same semantics as
/// Python's `urllib.quote(safe='')`, Java's `URLEncoder.encode` followed
/// by `'+'`→`'%20'`, JS's `encodeURIComponent`, and Go's QueryEscape +
/// `'+'`→`'%20'`. Keeps the wire format identical across all 5 Pulse
/// SDKs so a key like `"user:123/orders"` produces the same URL bytes
/// regardless of caller language.
fn encode_segment(s: &str) -> String {
    let mut out = String::with_capacity(s.len());
    for &b in s.as_bytes() {
        match b {
            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
                out.push(b as char);
            }
            _ => {
                out.push('%');
                out.push(HEX[(b >> 4) as usize] as char);
                out.push(HEX[(b & 0xF) as usize] as char);
            }
        }
    }
    out
}

const HEX: &[u8; 16] = b"0123456789ABCDEF";

impl std::fmt::Debug for IQResource<'_> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("IQResource").finish()
    }
}