Skip to main content

axon/store/
row_stream.rs

1//! §Fase 35.i (v1.30.0) — Pillar III: `retrieve` is a `Stream<Row>`.
2//!
3//! A `retrieve from S where φ` is the coinductive selection σ_φ(S) —
4//! not an eager set. A pg-backed `axonstore` becomes a first-class
5//! **stream producer**: rows flow lazily off a cursor, drained through
6//! a bounded, cancel-aware loop. `retrieve from huge_table` never
7//! materializes the whole result — it streams, exactly like an LLM
8//! token stream, and stays inside a memory bound.
9//!
10//! # Joins the Fase 34 streaming surface
11//!
12//! The drain reuses the **closed [`BackpressurePolicy`] catalog** Fase
13//! 34 ratified (`drop_oldest` / `degrade_quality` / `pause_upstream` /
14//! `fail`) and the same `CancellationFlag` cancel discipline as the
15//! `unified_stream_handler`. A DB row is not a `ToolChunk` — it has no
16//! token text, no SHA-256 accumulator, no wire terminator — so the
17//! row drain is row-shaped rather than literally the token handler;
18//! it joins the streaming *model* (lazy source + closed policy +
19//! cancel-aware drain), which is what makes it unified with the
20//! algebraic-effect surface.
21//!
22//! # The four policies, on rows
23//!
24//! - `Fail` — error the moment the result exceeds `max_rows`. Forces
25//!   the caller to treat an oversized result as an explicit failure.
26//! - `DropOldest` — keep the most recent `max_rows`; older rows are
27//!   counted in `dropped`. A bounded tail window.
28//! - `PauseUpstream` — stop polling the cursor at `max_rows` (the
29//!   cursor pauses, the connection is released); `truncated` flags
30//!   that more rows existed. A bounded head window.
31//! - `DegradeQuality` — the OSS identity degrader: drain every row,
32//!   no bound, no degradation. The enterprise layer overrides with a
33//!   real row degrader (reservoir sampling, column projection).
34//!
35//! Cancel-aware: the [`CancellationFlag`] is polled between every row;
36//! a cancelled drain stops immediately and reports `cancelled`.
37//!
38//! # OSS (§6 — 35.i is fully OSS)
39//!
40//! The streaming surface — the lazy cursor + the closed policy catalog
41//! + the cancel-aware drain — is entirely OSS.
42
43use std::collections::VecDeque;
44
45use futures::{Stream, StreamExt};
46use serde_json::{json, Value as JsonValue};
47
48use crate::cancel_token::CancellationFlag;
49use crate::store::filter::SqlValue;
50use crate::store::postgres_backend::{
51    bind_value, build_select_sql, classify_sql_error, introspect_conn,
52    map_pg_row, PostgresStoreBackend, StoreError, StoreRow,
53};
54use crate::stream_effect::BackpressurePolicy;
55
56/// The default backpressure policy for a `retrieve` whose step carries
57/// no explicit policy (`IRRetrieveStep` has no policy field in
58/// v1.30.0). `PauseUpstream` is the safe default: the cursor streams
59/// lazily (anti-OOM), the result is bounded, and an over-bound result
60/// is *flagged* (`truncated`) rather than silently dropped or errored.
61pub const DEFAULT_RETRIEVE_POLICY: BackpressurePolicy =
62    BackpressurePolicy::PauseUpstream;
63
64/// The default row bound for a streamed `retrieve`. Generous enough
65/// for any realistic agent-store query; the point is that a pathological
66/// `retrieve from billion_row_table` stays bounded.
67pub const DEFAULT_MAX_ROWS: usize = 10_000;
68
69// ════════════════════════════════════════════════════════════════════
70//  Drain outcome
71// ════════════════════════════════════════════════════════════════════
72
73/// The result of draining a `retrieve` row stream under a policy.
74#[derive(Debug, Clone, PartialEq, Default)]
75pub struct RowStreamOutcome {
76    /// The rows that survived the policy, in cursor order.
77    pub rows: Vec<StoreRow>,
78    /// Total rows the cursor yielded before the drain stopped.
79    pub total_seen: usize,
80    /// Rows discarded by a `DropOldest` policy.
81    pub dropped: usize,
82    /// `true` iff a `PauseUpstream` policy stopped the drain at the
83    /// bound while the cursor still had rows.
84    pub truncated: bool,
85    /// `true` iff the cancellation flag fired mid-drain.
86    pub cancelled: bool,
87}
88
89// ════════════════════════════════════════════════════════════════════
90//  The bounded, cancel-aware drain (pure over any row stream)
91// ════════════════════════════════════════════════════════════════════
92
93/// Drain a row stream under a [`BackpressurePolicy`], bounded by
94/// `max_rows` and cancel-aware.
95///
96/// Generic over the source stream so the policy + cancel logic is
97/// exhaustively unit-testable with a synthetic in-memory stream — the
98/// live Postgres cursor is just one such source ([`stream_retrieve`]).
99///
100/// A row that fails to decode (`Err`) aborts the drain with that error
101/// — never a silent skip.
102pub async fn drain_with_policy<S>(
103    mut stream: S,
104    policy: BackpressurePolicy,
105    max_rows: usize,
106    cancel: &CancellationFlag,
107) -> Result<RowStreamOutcome, StoreError>
108where
109    S: Stream<Item = Result<StoreRow, StoreError>> + Unpin,
110{
111    let mut kept: VecDeque<StoreRow> = VecDeque::new();
112    let mut outcome = RowStreamOutcome::default();
113
114    while let Some(item) = stream.next().await {
115        // Cancel is polled BEFORE consuming the row — a cancelled
116        // drain stops immediately, mirroring `unified_stream_handler`.
117        if cancel.is_cancelled() {
118            outcome.cancelled = true;
119            break;
120        }
121        let row = item?;
122        outcome.total_seen += 1;
123
124        match policy {
125            BackpressurePolicy::Fail => {
126                if kept.len() >= max_rows {
127                    return Err(StoreError::Query {
128                        op: "retrieve",
129                        source: format!(
130                            "result set exceeds the {max_rows}-row stream \
131                             bound (backpressure policy: fail)"
132                        ),
133                    });
134                }
135                kept.push_back(row);
136            }
137            BackpressurePolicy::DropOldest => {
138                kept.push_back(row);
139                if kept.len() > max_rows {
140                    kept.pop_front();
141                    outcome.dropped += 1;
142                }
143            }
144            BackpressurePolicy::PauseUpstream => {
145                if kept.len() >= max_rows {
146                    // Stop polling — the cursor pauses + is dropped.
147                    outcome.truncated = true;
148                    break;
149                }
150                kept.push_back(row);
151            }
152            BackpressurePolicy::DegradeQuality => {
153                // OSS identity degrader — every row, unbounded, no
154                // degradation. Enterprise overrides this arm.
155                kept.push_back(row);
156            }
157        }
158    }
159
160    outcome.rows = kept.into_iter().collect();
161    Ok(outcome)
162}
163
164// ════════════════════════════════════════════════════════════════════
165//  stream_retrieve — the live Postgres cursor drain
166// ════════════════════════════════════════════════════════════════════
167
168/// Run `retrieve` as a lazy cursor stream: open a server-side cursor
169/// over `SELECT * FROM table WHERE φ`, decode rows one at a time, and
170/// drain them through [`drain_with_policy`]. The full result set is
171/// **never** materialized by `sqlx` — rows flow off the cursor as the
172/// drain pulls them.
173///
174/// Cancel-aware via `cancel`; bounded by `policy` + `max_rows`.
175pub async fn stream_retrieve(
176    backend: &PostgresStoreBackend,
177    // §Fase 37.x.j (D1) — the connection source. See the equivalent
178    // parameter on `PostgresStoreBackend::query()` for the rationale:
179    // `StoreConn::Pool(&backend.pool())` for legacy callers,
180    // `StoreConn::Pinned(conn)` for the flow-pinned execution where
181    // the caller acquired the conn at flow start. Either variant routes
182    // the cursor + the cache-MISS transaction through the same
183    // physical Postgres backend connection.
184    conn: &mut crate::store::store_conn::StoreConn<'_>,
185    table: &str,
186    where_expr: &str,
187    policy: BackpressurePolicy,
188    max_rows: usize,
189    cancel: &CancellationFlag,
190    // §Fase 37.d (D3) — resolves `${name}` in `where_expr` to `$N`
191    // bind parameters (the Request Binding Contract on the filter path).
192    bindings: &std::collections::HashMap<String, String>,
193) -> Result<RowStreamOutcome, StoreError> {
194    // §Fase 37.x.d (D3) — a cache HIT: the cursor drains on the conn,
195    // no transaction (the cached resolution is correct and the SELECT
196    // is schema-qualified, so it resolves on any session).
197    if let Some(resolved) = backend.cached_schema(table) {
198        let (sql, params): (String, Vec<SqlValue>) = build_select_sql(
199            table,
200            Some(resolved.schema.as_str()),
201            where_expr,
202            bindings,
203            &resolved.column_types,
204        )?;
205        // §Fase 38.x.a (D1) — see `postgres_backend::introspect_conn` for
206        // the full rationale on `.persistent(false)`. The unnamed PARSE
207        // protocol is structurally collision-free behind transaction-mode
208        // poolers; the named protocol leaks `sqlx_s_N` across logical
209        // sessions when the physical conn is reused.
210        let mut query = sqlx::query(&sql).persistent(false);
211        for value in &params {
212            query = bind_value(query, value);
213        }
214        // §Fase 37.x.j (D1) — `.fetch()` is the lazy cursor; the
215        // Pool/Pinned dispatch happens inline here because the
216        // returned `BoxStream` borrows the executor for its lifetime
217        // and we can't unify the two stream types through a single
218        // wrapper method without erasing the lifetime + boxing every
219        // call site. Inline dispatch keeps the cursor's borrow
220        // checker-friendly while still routing through the StoreConn.
221        let drain_result = match conn {
222            crate::store::store_conn::StoreConn::Pool(p) => {
223                let cursor = query.fetch(*p).map(|item| {
224                    item.map_err(|e| classify_sql_error("retrieve", e))
225                        .and_then(|pg_row| map_pg_row(&pg_row))
226                });
227                drain_with_policy(cursor, policy, max_rows, cancel).await
228            }
229            crate::store::store_conn::StoreConn::Pinned(c) => {
230                let cursor = query.fetch(&mut ***c).map(|item| {
231                    item.map_err(|e| classify_sql_error("retrieve", e))
232                        .and_then(|pg_row| map_pg_row(&pg_row))
233                });
234                drain_with_policy(cursor, policy, max_rows, cancel).await
235            }
236        };
237        match drain_result {
238            Ok(outcome) => return Ok(outcome),
239            Err(e) if e.is_schema_drift() => {
240                // §37.x.f (D9) — the cached schema is STALE; evict and
241                // fall through to the miss path: the single retry,
242                // with fresh introspection.
243                backend.evict_schema(table);
244            }
245            Err(e) => return Err(e),
246        }
247    }
248
249    // §Fase 37.x.d (D3) — a cache MISS: the schema introspection AND
250    // the cursor drain run inside ONE transaction, so a transaction-
251    // mode pooler pins one physical backend for both. The transaction
252    // is held for the cursor's lifetime — bounded by `max_rows` (the
253    // `PauseUpstream` default caps the drain), so the held pooler
254    // backend is time-bounded; no pool starvation.
255    // §Fase 37.x.j (D1) — `conn.begin()` runs on the same physical
256    // backend as the cache-HIT attempt above when on the Pinned
257    // variant; on the Pool variant it acquires a fresh logical
258    // connection (legacy behavior).
259    let mut tx = conn
260        .begin()
261        .await
262        .map_err(|e| StoreError::Connect { source: e.to_string() })?;
263    // §Fase 37.x.j.12 — ROLLBACK + propagate introspect error directly.
264    // Pre-v1.40.3 the fall-through path here re-used the same `tx` with
265    // a bare-table SELECT, so an introspect failure (privilege /
266    // search_path / SSL / pooler-mode) cascaded as `relation X does not
267    // exist` inside the stream-cursor path — exactly the masking class
268    // closed at the 4 CRUD sites of `postgres_backend.rs` in v1.40.2,
269    // but THIS site was missed. row_stream is the Pillar III lazy
270    // cursor path; `transport: sse` retrieves exercise it, so a
271    // streaming endpoint hit the same misleading cascade. Same fix
272    // shape: explicit ROLLBACK + return the primary `introspect_err`.
273    let resolved = match introspect_conn(&mut tx, table).await {
274        Ok(r) => r,
275        Err(introspect_err) => {
276            tracing::warn!(
277                target: "axon::store",
278                table = %table,
279                op = "introspect_in_tx_stream",
280                error = %introspect_err,
281                d_letter = "37.x.j.12",
282                "store introspection failed inside the stream-cursor \
283                 transaction; rolling back and propagating the primary \
284                 error to the caller (no bare-table cascade)."
285            );
286            let _ = tx.rollback().await;
287            return Err(introspect_err);
288        }
289    };
290    let (sql, params): (String, Vec<SqlValue>) =
291        build_select_sql(
292            table,
293            Some(resolved.schema.as_str()),
294            where_expr,
295            bindings,
296            &resolved.column_types,
297        )?;
298    // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
299    let mut query = sqlx::query(&sql).persistent(false);
300    for value in &params {
301        query = bind_value(query, value);
302    }
303    // The cursor borrows the transaction for the drain; it is scoped so
304    // it is dropped before the transaction is committed.
305    let outcome = {
306        let cursor = query.fetch(&mut *tx).map(|item| {
307            item.map_err(|e| classify_sql_error("retrieve", e))
308                .and_then(|pg_row| map_pg_row(&pg_row))
309        });
310        drain_with_policy(cursor, policy, max_rows, cancel).await
311    };
312    tx.commit()
313        .await
314        .map_err(|e| StoreError::Connect { source: e.to_string() })?;
315    backend.cache_schema(table, resolved);
316    outcome
317}
318
319// ════════════════════════════════════════════════════════════════════
320//  Streaming metadata for the retrieve envelope
321// ════════════════════════════════════════════════════════════════════
322
323/// Build the `"stream"` sub-object describing how a streamed
324/// `retrieve` was drained — merged into the Pillar I epistemic
325/// envelope (35.g) so the adopter sees both the trust grade AND the
326/// streaming disposition of the result.
327pub fn stream_metadata(
328    policy: BackpressurePolicy,
329    outcome: &RowStreamOutcome,
330) -> JsonValue {
331    json!({
332        "policy": policy.slug(),
333        "total_seen": outcome.total_seen,
334        "dropped": outcome.dropped,
335        "truncated": outcome.truncated,
336        "cancelled": outcome.cancelled,
337    })
338}
339
340// ════════════════════════════════════════════════════════════════════
341//  Unit tests — the drain (synthetic streams, no database)
342// ════════════════════════════════════════════════════════════════════
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347    use serde_json::Value;
348
349    fn row(id: i64) -> StoreRow {
350        StoreRow {
351            columns: vec![("id".to_string(), Value::from(id))],
352        }
353    }
354
355    /// A synthetic Ok-row stream.
356    fn ok_stream(
357        n: usize,
358    ) -> impl Stream<Item = Result<StoreRow, StoreError>> + Unpin {
359        futures::stream::iter(
360            (0..n as i64).map(|i| Ok(row(i))).collect::<Vec<_>>(),
361        )
362    }
363
364    // ── Fail policy ──────────────────────────────────────────────────
365
366    #[tokio::test]
367    async fn fail_policy_allows_a_result_within_the_bound() {
368        let outcome = drain_with_policy(
369            ok_stream(5),
370            BackpressurePolicy::Fail,
371            10,
372            &CancellationFlag::new(),
373        )
374        .await
375        .unwrap();
376        assert_eq!(outcome.rows.len(), 5);
377        assert_eq!(outcome.total_seen, 5);
378    }
379
380    #[tokio::test]
381    async fn fail_policy_errors_when_the_result_exceeds_the_bound() {
382        let result = drain_with_policy(
383            ok_stream(50),
384            BackpressurePolicy::Fail,
385            10,
386            &CancellationFlag::new(),
387        )
388        .await;
389        assert!(matches!(result, Err(StoreError::Query { .. })));
390    }
391
392    // ── DropOldest policy ────────────────────────────────────────────
393
394    #[tokio::test]
395    async fn drop_oldest_keeps_the_most_recent_window() {
396        let outcome = drain_with_policy(
397            ok_stream(100),
398            BackpressurePolicy::DropOldest,
399            10,
400            &CancellationFlag::new(),
401        )
402        .await
403        .unwrap();
404        assert_eq!(outcome.rows.len(), 10, "bounded to the window");
405        assert_eq!(outcome.dropped, 90);
406        assert_eq!(outcome.total_seen, 100);
407        // The window is the TAIL — rows 90..100.
408        assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(90)));
409        assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(99)));
410    }
411
412    // ── PauseUpstream policy ─────────────────────────────────────────
413
414    #[tokio::test]
415    async fn pause_upstream_truncates_at_the_bound() {
416        let outcome = drain_with_policy(
417            ok_stream(100),
418            BackpressurePolicy::PauseUpstream,
419            10,
420            &CancellationFlag::new(),
421        )
422        .await
423        .unwrap();
424        assert_eq!(outcome.rows.len(), 10);
425        assert!(outcome.truncated, "more rows existed past the bound");
426        // The window is the HEAD — rows 0..10.
427        assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(0)));
428        assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(9)));
429    }
430
431    #[tokio::test]
432    async fn pause_upstream_within_the_bound_is_not_truncated() {
433        let outcome = drain_with_policy(
434            ok_stream(3),
435            BackpressurePolicy::PauseUpstream,
436            10,
437            &CancellationFlag::new(),
438        )
439        .await
440        .unwrap();
441        assert_eq!(outcome.rows.len(), 3);
442        assert!(!outcome.truncated);
443    }
444
445    // ── DegradeQuality policy ────────────────────────────────────────
446
447    #[tokio::test]
448    async fn degrade_quality_is_the_oss_identity_drain() {
449        let outcome = drain_with_policy(
450            ok_stream(50),
451            BackpressurePolicy::DegradeQuality,
452            10,
453            &CancellationFlag::new(),
454        )
455        .await
456        .unwrap();
457        // OSS identity degrader — every row, the bound is not applied.
458        assert_eq!(outcome.rows.len(), 50);
459        assert_eq!(outcome.dropped, 0);
460        assert!(!outcome.truncated);
461    }
462
463    // ── Cancellation ─────────────────────────────────────────────────
464
465    #[tokio::test]
466    async fn a_cancelled_flag_stops_the_drain_immediately() {
467        let cancel = CancellationFlag::new();
468        cancel.cancel();
469        let outcome = drain_with_policy(
470            ok_stream(100),
471            BackpressurePolicy::DegradeQuality,
472            1000,
473            &cancel,
474        )
475        .await
476        .unwrap();
477        assert!(outcome.cancelled);
478        assert!(outcome.rows.is_empty(), "no row consumed after cancel");
479    }
480
481    // ── Decode error aborts ──────────────────────────────────────────
482
483    #[tokio::test]
484    async fn a_row_decode_error_aborts_the_drain() {
485        let items: Vec<Result<StoreRow, StoreError>> = vec![
486            Ok(row(0)),
487            Err(StoreError::Decode {
488                column: "x".into(),
489                pg_type: "INT4".into(),
490                source: "boom".into(),
491            }),
492            Ok(row(2)),
493        ];
494        let result = drain_with_policy(
495            futures::stream::iter(items),
496            BackpressurePolicy::DegradeQuality,
497            100,
498            &CancellationFlag::new(),
499        )
500        .await;
501        assert!(matches!(result, Err(StoreError::Decode { .. })));
502    }
503
504    // ── Empty result ─────────────────────────────────────────────────
505
506    #[tokio::test]
507    async fn an_empty_result_drains_cleanly() {
508        let outcome = drain_with_policy(
509            ok_stream(0),
510            DEFAULT_RETRIEVE_POLICY,
511            DEFAULT_MAX_ROWS,
512            &CancellationFlag::new(),
513        )
514        .await
515        .unwrap();
516        assert!(outcome.rows.is_empty());
517        assert_eq!(outcome.total_seen, 0);
518        assert!(!outcome.truncated && !outcome.cancelled);
519    }
520
521    // ── stream_metadata ──────────────────────────────────────────────
522
523    #[test]
524    fn stream_metadata_carries_the_drain_disposition() {
525        let outcome = RowStreamOutcome {
526            rows: vec![row(1)],
527            total_seen: 100,
528            dropped: 99,
529            truncated: false,
530            cancelled: false,
531        };
532        let meta = stream_metadata(BackpressurePolicy::DropOldest, &outcome);
533        assert_eq!(meta["policy"], "drop_oldest");
534        assert_eq!(meta["total_seen"], 100);
535        assert_eq!(meta["dropped"], 99);
536        assert_eq!(meta["truncated"], false);
537    }
538
539    #[test]
540    fn defaults_are_pause_upstream_and_a_sane_bound() {
541        assert_eq!(DEFAULT_RETRIEVE_POLICY, BackpressurePolicy::PauseUpstream);
542        assert!(DEFAULT_MAX_ROWS >= 1000);
543    }
544}