axon/store/row_stream.rs
1//! §Fase 35.i (v1.30.0) — Pillar III: `retrieve` is a `Stream<Row>`.
2//!
3//! A `retrieve from S where φ` is the coinductive selection σ_φ(S) —
4//! not an eager set. A pg-backed `axonstore` becomes a first-class
5//! **stream producer**: rows flow lazily off a cursor, drained through
6//! a bounded, cancel-aware loop. `retrieve from huge_table` never
7//! materializes the whole result — it streams, exactly like an LLM
8//! token stream, and stays inside a memory bound.
9//!
10//! # Joins the Fase 34 streaming surface
11//!
12//! The drain reuses the **closed [`BackpressurePolicy`] catalog** Fase
13//! 34 ratified (`drop_oldest` / `degrade_quality` / `pause_upstream` /
14//! `fail`) and the same `CancellationFlag` cancel discipline as the
15//! `unified_stream_handler`. A DB row is not a `ToolChunk` — it has no
16//! token text, no SHA-256 accumulator, no wire terminator — so the
17//! row drain is row-shaped rather than literally the token handler;
18//! it joins the streaming *model* (lazy source + closed policy +
19//! cancel-aware drain), which is what makes it unified with the
20//! algebraic-effect surface.
21//!
22//! # The four policies, on rows
23//!
24//! - `Fail` — error the moment the result exceeds `max_rows`. Forces
25//! the caller to treat an oversized result as an explicit failure.
26//! - `DropOldest` — keep the most recent `max_rows`; older rows are
27//! counted in `dropped`. A bounded tail window.
28//! - `PauseUpstream` — stop polling the cursor at `max_rows` (the
29//! cursor pauses, the connection is released); `truncated` flags
30//! that more rows existed. A bounded head window.
31//! - `DegradeQuality` — the OSS identity degrader: drain every row,
32//! no bound, no degradation. The enterprise layer overrides with a
33//! real row degrader (reservoir sampling, column projection).
34//!
35//! Cancel-aware: the [`CancellationFlag`] is polled between every row;
36//! a cancelled drain stops immediately and reports `cancelled`.
37//!
38//! # OSS (§6 — 35.i is fully OSS)
39//!
40//! The streaming surface — the lazy cursor + the closed policy catalog
41//! + the cancel-aware drain — is entirely OSS.
42
43use std::collections::VecDeque;
44
45use futures::{Stream, StreamExt};
46use serde_json::{json, Value as JsonValue};
47
48use crate::cancel_token::CancellationFlag;
49use crate::store::filter::SqlValue;
50use crate::store::postgres_backend::{
51 bind_value, build_select_sql, classify_sql_error, introspect_conn,
52 map_pg_row, PostgresStoreBackend, StoreError, StoreRow,
53};
54use crate::stream_effect::BackpressurePolicy;
55
56/// The default backpressure policy for a `retrieve` whose step carries
57/// no explicit policy (`IRRetrieveStep` has no policy field in
58/// v1.30.0). `PauseUpstream` is the safe default: the cursor streams
59/// lazily (anti-OOM), the result is bounded, and an over-bound result
60/// is *flagged* (`truncated`) rather than silently dropped or errored.
61pub const DEFAULT_RETRIEVE_POLICY: BackpressurePolicy =
62 BackpressurePolicy::PauseUpstream;
63
64/// The default row bound for a streamed `retrieve`. Generous enough
65/// for any realistic agent-store query; the point is that a pathological
66/// `retrieve from billion_row_table` stays bounded.
67pub const DEFAULT_MAX_ROWS: usize = 10_000;
68
69// ════════════════════════════════════════════════════════════════════
70// Drain outcome
71// ════════════════════════════════════════════════════════════════════
72
73/// The result of draining a `retrieve` row stream under a policy.
74#[derive(Debug, Clone, PartialEq, Default)]
75pub struct RowStreamOutcome {
76 /// The rows that survived the policy, in cursor order.
77 pub rows: Vec<StoreRow>,
78 /// Total rows the cursor yielded before the drain stopped.
79 pub total_seen: usize,
80 /// Rows discarded by a `DropOldest` policy.
81 pub dropped: usize,
82 /// `true` iff a `PauseUpstream` policy stopped the drain at the
83 /// bound while the cursor still had rows.
84 pub truncated: bool,
85 /// `true` iff the cancellation flag fired mid-drain.
86 pub cancelled: bool,
87}
88
89// ════════════════════════════════════════════════════════════════════
90// The bounded, cancel-aware drain (pure over any row stream)
91// ════════════════════════════════════════════════════════════════════
92
93/// Drain a row stream under a [`BackpressurePolicy`], bounded by
94/// `max_rows` and cancel-aware.
95///
96/// Generic over the source stream so the policy + cancel logic is
97/// exhaustively unit-testable with a synthetic in-memory stream — the
98/// live Postgres cursor is just one such source ([`stream_retrieve`]).
99///
100/// A row that fails to decode (`Err`) aborts the drain with that error
101/// — never a silent skip.
102pub async fn drain_with_policy<S>(
103 mut stream: S,
104 policy: BackpressurePolicy,
105 max_rows: usize,
106 cancel: &CancellationFlag,
107) -> Result<RowStreamOutcome, StoreError>
108where
109 S: Stream<Item = Result<StoreRow, StoreError>> + Unpin,
110{
111 let mut kept: VecDeque<StoreRow> = VecDeque::new();
112 let mut outcome = RowStreamOutcome::default();
113
114 while let Some(item) = stream.next().await {
115 // Cancel is polled BEFORE consuming the row — a cancelled
116 // drain stops immediately, mirroring `unified_stream_handler`.
117 if cancel.is_cancelled() {
118 outcome.cancelled = true;
119 break;
120 }
121 let row = item?;
122 outcome.total_seen += 1;
123
124 match policy {
125 BackpressurePolicy::Fail => {
126 if kept.len() >= max_rows {
127 return Err(StoreError::Query {
128 op: "retrieve",
129 source: format!(
130 "result set exceeds the {max_rows}-row stream \
131 bound (backpressure policy: fail)"
132 ),
133 });
134 }
135 kept.push_back(row);
136 }
137 BackpressurePolicy::DropOldest => {
138 kept.push_back(row);
139 if kept.len() > max_rows {
140 kept.pop_front();
141 outcome.dropped += 1;
142 }
143 }
144 BackpressurePolicy::PauseUpstream => {
145 if kept.len() >= max_rows {
146 // Stop polling — the cursor pauses + is dropped.
147 outcome.truncated = true;
148 break;
149 }
150 kept.push_back(row);
151 }
152 BackpressurePolicy::DegradeQuality => {
153 // OSS identity degrader — every row, unbounded, no
154 // degradation. Enterprise overrides this arm.
155 kept.push_back(row);
156 }
157 }
158 }
159
160 outcome.rows = kept.into_iter().collect();
161 Ok(outcome)
162}
163
164// ════════════════════════════════════════════════════════════════════
165// stream_retrieve — the live Postgres cursor drain
166// ════════════════════════════════════════════════════════════════════
167
168/// Run `retrieve` as a lazy cursor stream: open a server-side cursor
169/// over `SELECT * FROM table WHERE φ`, decode rows one at a time, and
170/// drain them through [`drain_with_policy`]. The full result set is
171/// **never** materialized by `sqlx` — rows flow off the cursor as the
172/// drain pulls them.
173///
174/// Cancel-aware via `cancel`; bounded by `policy` + `max_rows`.
175pub async fn stream_retrieve(
176 backend: &PostgresStoreBackend,
177 // §Fase 37.x.j (D1) — the connection source. See the equivalent
178 // parameter on `PostgresStoreBackend::query()` for the rationale:
179 // `StoreConn::Pool(&backend.pool())` for legacy callers,
180 // `StoreConn::Pinned(conn)` for the flow-pinned execution where
181 // the caller acquired the conn at flow start. Either variant routes
182 // the cursor + the cache-MISS transaction through the same
183 // physical Postgres backend connection.
184 conn: &mut crate::store::store_conn::StoreConn<'_>,
185 table: &str,
186 where_expr: &str,
187 policy: BackpressurePolicy,
188 max_rows: usize,
189 cancel: &CancellationFlag,
190 // §Fase 37.d (D3) — resolves `${name}` in `where_expr` to `$N`
191 // bind parameters (the Request Binding Contract on the filter path).
192 bindings: &std::collections::HashMap<String, String>,
193) -> Result<RowStreamOutcome, StoreError> {
194 // §Fase 37.x.d (D3) — a cache HIT: the cursor drains on the conn,
195 // no transaction (the cached resolution is correct and the SELECT
196 // is schema-qualified, so it resolves on any session).
197 if let Some(resolved) = backend.cached_schema(table) {
198 let (sql, params): (String, Vec<SqlValue>) = build_select_sql(
199 table,
200 Some(resolved.schema.as_str()),
201 where_expr,
202 bindings,
203 &resolved.column_types,
204 )?;
205 // §Fase 38.x.a (D1) — see `postgres_backend::introspect_conn` for
206 // the full rationale on `.persistent(false)`. The unnamed PARSE
207 // protocol is structurally collision-free behind transaction-mode
208 // poolers; the named protocol leaks `sqlx_s_N` across logical
209 // sessions when the physical conn is reused.
210 let mut query = sqlx::query(&sql).persistent(false);
211 for value in ¶ms {
212 query = bind_value(query, value);
213 }
214 // §Fase 37.x.j (D1) — `.fetch()` is the lazy cursor; the
215 // Pool/Pinned dispatch happens inline here because the
216 // returned `BoxStream` borrows the executor for its lifetime
217 // and we can't unify the two stream types through a single
218 // wrapper method without erasing the lifetime + boxing every
219 // call site. Inline dispatch keeps the cursor's borrow
220 // checker-friendly while still routing through the StoreConn.
221 let drain_result = match conn {
222 crate::store::store_conn::StoreConn::Pool(p) => {
223 let cursor = query.fetch(*p).map(|item| {
224 item.map_err(|e| classify_sql_error("retrieve", e))
225 .and_then(|pg_row| map_pg_row(&pg_row))
226 });
227 drain_with_policy(cursor, policy, max_rows, cancel).await
228 }
229 crate::store::store_conn::StoreConn::Pinned(c) => {
230 let cursor = query.fetch(&mut ***c).map(|item| {
231 item.map_err(|e| classify_sql_error("retrieve", e))
232 .and_then(|pg_row| map_pg_row(&pg_row))
233 });
234 drain_with_policy(cursor, policy, max_rows, cancel).await
235 }
236 };
237 match drain_result {
238 Ok(outcome) => return Ok(outcome),
239 Err(e) if e.is_schema_drift() => {
240 // §37.x.f (D9) — the cached schema is STALE; evict and
241 // fall through to the miss path: the single retry,
242 // with fresh introspection.
243 backend.evict_schema(table);
244 }
245 Err(e) => return Err(e),
246 }
247 }
248
249 // §Fase 37.x.d (D3) — a cache MISS: the schema introspection AND
250 // the cursor drain run inside ONE transaction, so a transaction-
251 // mode pooler pins one physical backend for both. The transaction
252 // is held for the cursor's lifetime — bounded by `max_rows` (the
253 // `PauseUpstream` default caps the drain), so the held pooler
254 // backend is time-bounded; no pool starvation.
255 // §Fase 37.x.j (D1) — `conn.begin()` runs on the same physical
256 // backend as the cache-HIT attempt above when on the Pinned
257 // variant; on the Pool variant it acquires a fresh logical
258 // connection (legacy behavior).
259 let mut tx = conn
260 .begin()
261 .await
262 .map_err(|e| StoreError::Connect { source: e.to_string() })?;
263 // §Fase 37.x.j.12 — ROLLBACK + propagate introspect error directly.
264 // Pre-v1.40.3 the fall-through path here re-used the same `tx` with
265 // a bare-table SELECT, so an introspect failure (privilege /
266 // search_path / SSL / pooler-mode) cascaded as `relation X does not
267 // exist` inside the stream-cursor path — exactly the masking class
268 // closed at the 4 CRUD sites of `postgres_backend.rs` in v1.40.2,
269 // but THIS site was missed. row_stream is the Pillar III lazy
270 // cursor path; `transport: sse` retrieves exercise it, so a
271 // streaming endpoint hit the same misleading cascade. Same fix
272 // shape: explicit ROLLBACK + return the primary `introspect_err`.
273 let resolved = match introspect_conn(&mut tx, table).await {
274 Ok(r) => r,
275 Err(introspect_err) => {
276 tracing::warn!(
277 target: "axon::store",
278 table = %table,
279 op = "introspect_in_tx_stream",
280 error = %introspect_err,
281 d_letter = "37.x.j.12",
282 "store introspection failed inside the stream-cursor \
283 transaction; rolling back and propagating the primary \
284 error to the caller (no bare-table cascade)."
285 );
286 let _ = tx.rollback().await;
287 return Err(introspect_err);
288 }
289 };
290 let (sql, params): (String, Vec<SqlValue>) =
291 build_select_sql(
292 table,
293 Some(resolved.schema.as_str()),
294 where_expr,
295 bindings,
296 &resolved.column_types,
297 )?;
298 // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
299 let mut query = sqlx::query(&sql).persistent(false);
300 for value in ¶ms {
301 query = bind_value(query, value);
302 }
303 // The cursor borrows the transaction for the drain; it is scoped so
304 // it is dropped before the transaction is committed.
305 let outcome = {
306 let cursor = query.fetch(&mut *tx).map(|item| {
307 item.map_err(|e| classify_sql_error("retrieve", e))
308 .and_then(|pg_row| map_pg_row(&pg_row))
309 });
310 drain_with_policy(cursor, policy, max_rows, cancel).await
311 };
312 tx.commit()
313 .await
314 .map_err(|e| StoreError::Connect { source: e.to_string() })?;
315 backend.cache_schema(table, resolved);
316 outcome
317}
318
319// ════════════════════════════════════════════════════════════════════
320// Streaming metadata for the retrieve envelope
321// ════════════════════════════════════════════════════════════════════
322
323/// Build the `"stream"` sub-object describing how a streamed
324/// `retrieve` was drained — merged into the Pillar I epistemic
325/// envelope (35.g) so the adopter sees both the trust grade AND the
326/// streaming disposition of the result.
327pub fn stream_metadata(
328 policy: BackpressurePolicy,
329 outcome: &RowStreamOutcome,
330) -> JsonValue {
331 json!({
332 "policy": policy.slug(),
333 "total_seen": outcome.total_seen,
334 "dropped": outcome.dropped,
335 "truncated": outcome.truncated,
336 "cancelled": outcome.cancelled,
337 })
338}
339
340// ════════════════════════════════════════════════════════════════════
341// Unit tests — the drain (synthetic streams, no database)
342// ════════════════════════════════════════════════════════════════════
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347 use serde_json::Value;
348
349 fn row(id: i64) -> StoreRow {
350 StoreRow {
351 columns: vec![("id".to_string(), Value::from(id))],
352 }
353 }
354
355 /// A synthetic Ok-row stream.
356 fn ok_stream(
357 n: usize,
358 ) -> impl Stream<Item = Result<StoreRow, StoreError>> + Unpin {
359 futures::stream::iter(
360 (0..n as i64).map(|i| Ok(row(i))).collect::<Vec<_>>(),
361 )
362 }
363
364 // ── Fail policy ──────────────────────────────────────────────────
365
366 #[tokio::test]
367 async fn fail_policy_allows_a_result_within_the_bound() {
368 let outcome = drain_with_policy(
369 ok_stream(5),
370 BackpressurePolicy::Fail,
371 10,
372 &CancellationFlag::new(),
373 )
374 .await
375 .unwrap();
376 assert_eq!(outcome.rows.len(), 5);
377 assert_eq!(outcome.total_seen, 5);
378 }
379
380 #[tokio::test]
381 async fn fail_policy_errors_when_the_result_exceeds_the_bound() {
382 let result = drain_with_policy(
383 ok_stream(50),
384 BackpressurePolicy::Fail,
385 10,
386 &CancellationFlag::new(),
387 )
388 .await;
389 assert!(matches!(result, Err(StoreError::Query { .. })));
390 }
391
392 // ── DropOldest policy ────────────────────────────────────────────
393
394 #[tokio::test]
395 async fn drop_oldest_keeps_the_most_recent_window() {
396 let outcome = drain_with_policy(
397 ok_stream(100),
398 BackpressurePolicy::DropOldest,
399 10,
400 &CancellationFlag::new(),
401 )
402 .await
403 .unwrap();
404 assert_eq!(outcome.rows.len(), 10, "bounded to the window");
405 assert_eq!(outcome.dropped, 90);
406 assert_eq!(outcome.total_seen, 100);
407 // The window is the TAIL — rows 90..100.
408 assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(90)));
409 assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(99)));
410 }
411
412 // ── PauseUpstream policy ─────────────────────────────────────────
413
414 #[tokio::test]
415 async fn pause_upstream_truncates_at_the_bound() {
416 let outcome = drain_with_policy(
417 ok_stream(100),
418 BackpressurePolicy::PauseUpstream,
419 10,
420 &CancellationFlag::new(),
421 )
422 .await
423 .unwrap();
424 assert_eq!(outcome.rows.len(), 10);
425 assert!(outcome.truncated, "more rows existed past the bound");
426 // The window is the HEAD — rows 0..10.
427 assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(0)));
428 assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(9)));
429 }
430
431 #[tokio::test]
432 async fn pause_upstream_within_the_bound_is_not_truncated() {
433 let outcome = drain_with_policy(
434 ok_stream(3),
435 BackpressurePolicy::PauseUpstream,
436 10,
437 &CancellationFlag::new(),
438 )
439 .await
440 .unwrap();
441 assert_eq!(outcome.rows.len(), 3);
442 assert!(!outcome.truncated);
443 }
444
445 // ── DegradeQuality policy ────────────────────────────────────────
446
447 #[tokio::test]
448 async fn degrade_quality_is_the_oss_identity_drain() {
449 let outcome = drain_with_policy(
450 ok_stream(50),
451 BackpressurePolicy::DegradeQuality,
452 10,
453 &CancellationFlag::new(),
454 )
455 .await
456 .unwrap();
457 // OSS identity degrader — every row, the bound is not applied.
458 assert_eq!(outcome.rows.len(), 50);
459 assert_eq!(outcome.dropped, 0);
460 assert!(!outcome.truncated);
461 }
462
463 // ── Cancellation ─────────────────────────────────────────────────
464
465 #[tokio::test]
466 async fn a_cancelled_flag_stops_the_drain_immediately() {
467 let cancel = CancellationFlag::new();
468 cancel.cancel();
469 let outcome = drain_with_policy(
470 ok_stream(100),
471 BackpressurePolicy::DegradeQuality,
472 1000,
473 &cancel,
474 )
475 .await
476 .unwrap();
477 assert!(outcome.cancelled);
478 assert!(outcome.rows.is_empty(), "no row consumed after cancel");
479 }
480
481 // ── Decode error aborts ──────────────────────────────────────────
482
483 #[tokio::test]
484 async fn a_row_decode_error_aborts_the_drain() {
485 let items: Vec<Result<StoreRow, StoreError>> = vec![
486 Ok(row(0)),
487 Err(StoreError::Decode {
488 column: "x".into(),
489 pg_type: "INT4".into(),
490 source: "boom".into(),
491 }),
492 Ok(row(2)),
493 ];
494 let result = drain_with_policy(
495 futures::stream::iter(items),
496 BackpressurePolicy::DegradeQuality,
497 100,
498 &CancellationFlag::new(),
499 )
500 .await;
501 assert!(matches!(result, Err(StoreError::Decode { .. })));
502 }
503
504 // ── Empty result ─────────────────────────────────────────────────
505
506 #[tokio::test]
507 async fn an_empty_result_drains_cleanly() {
508 let outcome = drain_with_policy(
509 ok_stream(0),
510 DEFAULT_RETRIEVE_POLICY,
511 DEFAULT_MAX_ROWS,
512 &CancellationFlag::new(),
513 )
514 .await
515 .unwrap();
516 assert!(outcome.rows.is_empty());
517 assert_eq!(outcome.total_seen, 0);
518 assert!(!outcome.truncated && !outcome.cancelled);
519 }
520
521 // ── stream_metadata ──────────────────────────────────────────────
522
523 #[test]
524 fn stream_metadata_carries_the_drain_disposition() {
525 let outcome = RowStreamOutcome {
526 rows: vec![row(1)],
527 total_seen: 100,
528 dropped: 99,
529 truncated: false,
530 cancelled: false,
531 };
532 let meta = stream_metadata(BackpressurePolicy::DropOldest, &outcome);
533 assert_eq!(meta["policy"], "drop_oldest");
534 assert_eq!(meta["total_seen"], 100);
535 assert_eq!(meta["dropped"], 99);
536 assert_eq!(meta["truncated"], false);
537 }
538
539 #[test]
540 fn defaults_are_pause_upstream_and_a_sane_bound() {
541 assert_eq!(DEFAULT_RETRIEVE_POLICY, BackpressurePolicy::PauseUpstream);
542 assert!(DEFAULT_MAX_ROWS >= 1000);
543 }
544}