axon-lang 2.11.0

AXON — the formal cognitive language: a deterministic, proof-carrying AI runtime. Native Rust lexer/parser/type-checker/IR generator (re-exported from axon-frontend) plus the runtime: typed channels (π-calculus mobility, capability extrusion), algebraic effects via Free Monad CPS handlers, lease kernel + reconcile loop, the Epistemic Security Kernel, Trust Types, Proof-Carrying Code (independently verifiable proof objects), and the closed-catalog extension mechanism. Crate publishes as `axon-lang`; library import is `use axon::*` so existing call sites keep working unchanged.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
//! §Fase 35.i (v1.30.0) — Pillar III: `retrieve` is a `Stream<Row>`.
//!
//! A `retrieve from S where φ` is the coinductive selection σ_φ(S) —
//! not an eager set. A pg-backed `axonstore` becomes a first-class
//! **stream producer**: rows flow lazily off a cursor, drained through
//! a bounded, cancel-aware loop. `retrieve from huge_table` never
//! materializes the whole result — it streams, exactly like an LLM
//! token stream, and stays inside a memory bound.
//!
//! # Joins the Fase 34 streaming surface
//!
//! The drain reuses the **closed [`BackpressurePolicy`] catalog** Fase
//! 34 ratified (`drop_oldest` / `degrade_quality` / `pause_upstream` /
//! `fail`) and the same `CancellationFlag` cancel discipline as the
//! `unified_stream_handler`. A DB row is not a `ToolChunk` — it has no
//! token text, no SHA-256 accumulator, no wire terminator — so the
//! row drain is row-shaped rather than literally the token handler;
//! it joins the streaming *model* (lazy source + closed policy +
//! cancel-aware drain), which is what makes it unified with the
//! algebraic-effect surface.
//!
//! # The four policies, on rows
//!
//! - `Fail` — error the moment the result exceeds `max_rows`. Forces
//!   the caller to treat an oversized result as an explicit failure.
//! - `DropOldest` — keep the most recent `max_rows`; older rows are
//!   counted in `dropped`. A bounded tail window.
//! - `PauseUpstream` — stop polling the cursor at `max_rows` (the
//!   cursor pauses, the connection is released); `truncated` flags
//!   that more rows existed. A bounded head window.
//! - `DegradeQuality` — the OSS identity degrader: drain every row,
//!   no bound, no degradation. The enterprise layer overrides with a
//!   real row degrader (reservoir sampling, column projection).
//!
//! Cancel-aware: the [`CancellationFlag`] is polled between every row;
//! a cancelled drain stops immediately and reports `cancelled`.
//!
//! # OSS (§6 — 35.i is fully OSS)
//!
//! The streaming surface — the lazy cursor + the closed policy catalog
//! + the cancel-aware drain — is entirely OSS.

use std::collections::VecDeque;

use futures::{Stream, StreamExt};
use serde_json::{json, Value as JsonValue};

use crate::cancel_token::CancellationFlag;
use crate::store::filter::SqlValue;
use crate::store::postgres_backend::{
    bind_value, build_select_sql, classify_sql_error, introspect_conn,
    map_pg_row, PostgresStoreBackend, StoreError, StoreRow,
};
use crate::stream_effect::BackpressurePolicy;

/// The default backpressure policy for a `retrieve` whose step carries
/// no explicit policy (`IRRetrieveStep` has no policy field in
/// v1.30.0). `PauseUpstream` is the safe default: the cursor streams
/// lazily (anti-OOM), the result is bounded, and an over-bound result
/// is *flagged* (`truncated`) rather than silently dropped or errored.
pub const DEFAULT_RETRIEVE_POLICY: BackpressurePolicy =
    BackpressurePolicy::PauseUpstream;

/// The default row bound for a streamed `retrieve`. Generous enough
/// for any realistic agent-store query; the point is that a pathological
/// `retrieve from billion_row_table` stays bounded.
pub const DEFAULT_MAX_ROWS: usize = 10_000;

// ════════════════════════════════════════════════════════════════════
//  Drain outcome
// ════════════════════════════════════════════════════════════════════

/// The result of draining a `retrieve` row stream under a policy.
#[derive(Debug, Clone, PartialEq, Default)]
pub struct RowStreamOutcome {
    /// The rows that survived the policy, in cursor order.
    pub rows: Vec<StoreRow>,
    /// Total rows the cursor yielded before the drain stopped.
    pub total_seen: usize,
    /// Rows discarded by a `DropOldest` policy.
    pub dropped: usize,
    /// `true` iff a `PauseUpstream` policy stopped the drain at the
    /// bound while the cursor still had rows.
    pub truncated: bool,
    /// `true` iff the cancellation flag fired mid-drain.
    pub cancelled: bool,
}

// ════════════════════════════════════════════════════════════════════
//  The bounded, cancel-aware drain (pure over any row stream)
// ════════════════════════════════════════════════════════════════════

/// Drain a row stream under a [`BackpressurePolicy`], bounded by
/// `max_rows` and cancel-aware.
///
/// Generic over the source stream so the policy + cancel logic is
/// exhaustively unit-testable with a synthetic in-memory stream — the
/// live Postgres cursor is just one such source ([`stream_retrieve`]).
///
/// A row that fails to decode (`Err`) aborts the drain with that error
/// — never a silent skip.
pub async fn drain_with_policy<S>(
    mut stream: S,
    policy: BackpressurePolicy,
    max_rows: usize,
    cancel: &CancellationFlag,
) -> Result<RowStreamOutcome, StoreError>
where
    S: Stream<Item = Result<StoreRow, StoreError>> + Unpin,
{
    let mut kept: VecDeque<StoreRow> = VecDeque::new();
    let mut outcome = RowStreamOutcome::default();

    while let Some(item) = stream.next().await {
        // Cancel is polled BEFORE consuming the row — a cancelled
        // drain stops immediately, mirroring `unified_stream_handler`.
        if cancel.is_cancelled() {
            outcome.cancelled = true;
            break;
        }
        let row = item?;
        outcome.total_seen += 1;

        match policy {
            BackpressurePolicy::Fail => {
                if kept.len() >= max_rows {
                    return Err(StoreError::Query {
                        op: "retrieve",
                        source: format!(
                            "result set exceeds the {max_rows}-row stream \
                             bound (backpressure policy: fail)"
                        ),
                    });
                }
                kept.push_back(row);
            }
            BackpressurePolicy::DropOldest => {
                kept.push_back(row);
                if kept.len() > max_rows {
                    kept.pop_front();
                    outcome.dropped += 1;
                }
            }
            BackpressurePolicy::PauseUpstream => {
                if kept.len() >= max_rows {
                    // Stop polling — the cursor pauses + is dropped.
                    outcome.truncated = true;
                    break;
                }
                kept.push_back(row);
            }
            BackpressurePolicy::DegradeQuality => {
                // OSS identity degrader — every row, unbounded, no
                // degradation. Enterprise overrides this arm.
                kept.push_back(row);
            }
        }
    }

    outcome.rows = kept.into_iter().collect();
    Ok(outcome)
}

// ════════════════════════════════════════════════════════════════════
//  stream_retrieve — the live Postgres cursor drain
// ════════════════════════════════════════════════════════════════════

/// Run `retrieve` as a lazy cursor stream: open a server-side cursor
/// over `SELECT * FROM table WHERE φ`, decode rows one at a time, and
/// drain them through [`drain_with_policy`]. The full result set is
/// **never** materialized by `sqlx` — rows flow off the cursor as the
/// drain pulls them.
///
/// Cancel-aware via `cancel`; bounded by `policy` + `max_rows`.
pub async fn stream_retrieve(
    backend: &PostgresStoreBackend,
    // §Fase 37.x.j (D1) — the connection source. See the equivalent
    // parameter on `PostgresStoreBackend::query()` for the rationale:
    // `StoreConn::Pool(&backend.pool())` for legacy callers,
    // `StoreConn::Pinned(conn)` for the flow-pinned execution where
    // the caller acquired the conn at flow start. Either variant routes
    // the cursor + the cache-MISS transaction through the same
    // physical Postgres backend connection.
    conn: &mut crate::store::store_conn::StoreConn<'_>,
    table: &str,
    where_expr: &str,
    policy: BackpressurePolicy,
    max_rows: usize,
    cancel: &CancellationFlag,
    // §Fase 37.d (D3) — resolves `${name}` in `where_expr` to `$N`
    // bind parameters (the Request Binding Contract on the filter path).
    bindings: &std::collections::HashMap<String, String>,
) -> Result<RowStreamOutcome, StoreError> {
    // §Fase 37.x.d (D3) — a cache HIT: the cursor drains on the conn,
    // no transaction (the cached resolution is correct and the SELECT
    // is schema-qualified, so it resolves on any session).
    if let Some(resolved) = backend.cached_schema(table) {
        let (sql, params): (String, Vec<SqlValue>) = build_select_sql(
            table,
            Some(resolved.schema.as_str()),
            where_expr,
            bindings,
            &resolved.column_types,
        )?;
        // §Fase 38.x.a (D1) — see `postgres_backend::introspect_conn` for
        // the full rationale on `.persistent(false)`. The unnamed PARSE
        // protocol is structurally collision-free behind transaction-mode
        // poolers; the named protocol leaks `sqlx_s_N` across logical
        // sessions when the physical conn is reused.
        let mut query = sqlx::query(&sql).persistent(false);
        for value in &params {
            query = bind_value(query, value);
        }
        // §Fase 37.x.j (D1) — `.fetch()` is the lazy cursor; the
        // Pool/Pinned dispatch happens inline here because the
        // returned `BoxStream` borrows the executor for its lifetime
        // and we can't unify the two stream types through a single
        // wrapper method without erasing the lifetime + boxing every
        // call site. Inline dispatch keeps the cursor's borrow
        // checker-friendly while still routing through the StoreConn.
        let drain_result = match conn {
            crate::store::store_conn::StoreConn::Pool(p) => {
                let cursor = query.fetch(*p).map(|item| {
                    item.map_err(|e| classify_sql_error("retrieve", e))
                        .and_then(|pg_row| map_pg_row(&pg_row))
                });
                drain_with_policy(cursor, policy, max_rows, cancel).await
            }
            crate::store::store_conn::StoreConn::Pinned(c) => {
                let cursor = query.fetch(&mut ***c).map(|item| {
                    item.map_err(|e| classify_sql_error("retrieve", e))
                        .and_then(|pg_row| map_pg_row(&pg_row))
                });
                drain_with_policy(cursor, policy, max_rows, cancel).await
            }
        };
        match drain_result {
            Ok(outcome) => return Ok(outcome),
            Err(e) if e.is_schema_drift() => {
                // §37.x.f (D9) — the cached schema is STALE; evict and
                // fall through to the miss path: the single retry,
                // with fresh introspection.
                backend.evict_schema(table);
            }
            Err(e) => return Err(e),
        }
    }

    // §Fase 37.x.d (D3) — a cache MISS: the schema introspection AND
    // the cursor drain run inside ONE transaction, so a transaction-
    // mode pooler pins one physical backend for both. The transaction
    // is held for the cursor's lifetime — bounded by `max_rows` (the
    // `PauseUpstream` default caps the drain), so the held pooler
    // backend is time-bounded; no pool starvation.
    // §Fase 37.x.j (D1) — `conn.begin()` runs on the same physical
    // backend as the cache-HIT attempt above when on the Pinned
    // variant; on the Pool variant it acquires a fresh logical
    // connection (legacy behavior).
    let mut tx = conn
        .begin()
        .await
        .map_err(|e| StoreError::Connect { source: e.to_string() })?;
    // §Fase 37.x.j.12 — ROLLBACK + propagate introspect error directly.
    // Pre-v1.40.3 the fall-through path here re-used the same `tx` with
    // a bare-table SELECT, so an introspect failure (privilege /
    // search_path / SSL / pooler-mode) cascaded as `relation X does not
    // exist` inside the stream-cursor path — exactly the masking class
    // closed at the 4 CRUD sites of `postgres_backend.rs` in v1.40.2,
    // but THIS site was missed. row_stream is the Pillar III lazy
    // cursor path; `transport: sse` retrieves exercise it, so a
    // streaming endpoint hit the same misleading cascade. Same fix
    // shape: explicit ROLLBACK + return the primary `introspect_err`.
    let resolved = match introspect_conn(&mut tx, table).await {
        Ok(r) => r,
        Err(introspect_err) => {
            tracing::warn!(
                target: "axon::store",
                table = %table,
                op = "introspect_in_tx_stream",
                error = %introspect_err,
                d_letter = "37.x.j.12",
                "store introspection failed inside the stream-cursor \
                 transaction; rolling back and propagating the primary \
                 error to the caller (no bare-table cascade)."
            );
            let _ = tx.rollback().await;
            return Err(introspect_err);
        }
    };
    let (sql, params): (String, Vec<SqlValue>) =
        build_select_sql(
            table,
            Some(resolved.schema.as_str()),
            where_expr,
            bindings,
            &resolved.column_types,
        )?;
    // §Fase 38.x.a (D1) — mandatory inside the `pool.begin()` tx.
    let mut query = sqlx::query(&sql).persistent(false);
    for value in &params {
        query = bind_value(query, value);
    }
    // The cursor borrows the transaction for the drain; it is scoped so
    // it is dropped before the transaction is committed.
    let outcome = {
        let cursor = query.fetch(&mut *tx).map(|item| {
            item.map_err(|e| classify_sql_error("retrieve", e))
                .and_then(|pg_row| map_pg_row(&pg_row))
        });
        drain_with_policy(cursor, policy, max_rows, cancel).await
    };
    tx.commit()
        .await
        .map_err(|e| StoreError::Connect { source: e.to_string() })?;
    backend.cache_schema(table, resolved);
    outcome
}

// ════════════════════════════════════════════════════════════════════
//  Streaming metadata for the retrieve envelope
// ════════════════════════════════════════════════════════════════════

/// Build the `"stream"` sub-object describing how a streamed
/// `retrieve` was drained — merged into the Pillar I epistemic
/// envelope (35.g) so the adopter sees both the trust grade AND the
/// streaming disposition of the result.
pub fn stream_metadata(
    policy: BackpressurePolicy,
    outcome: &RowStreamOutcome,
) -> JsonValue {
    json!({
        "policy": policy.slug(),
        "total_seen": outcome.total_seen,
        "dropped": outcome.dropped,
        "truncated": outcome.truncated,
        "cancelled": outcome.cancelled,
    })
}

// ════════════════════════════════════════════════════════════════════
//  Unit tests — the drain (synthetic streams, no database)
// ════════════════════════════════════════════════════════════════════

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::Value;

    fn row(id: i64) -> StoreRow {
        StoreRow {
            columns: vec![("id".to_string(), Value::from(id))],
        }
    }

    /// A synthetic Ok-row stream.
    fn ok_stream(
        n: usize,
    ) -> impl Stream<Item = Result<StoreRow, StoreError>> + Unpin {
        futures::stream::iter(
            (0..n as i64).map(|i| Ok(row(i))).collect::<Vec<_>>(),
        )
    }

    // ── Fail policy ──────────────────────────────────────────────────

    #[tokio::test]
    async fn fail_policy_allows_a_result_within_the_bound() {
        let outcome = drain_with_policy(
            ok_stream(5),
            BackpressurePolicy::Fail,
            10,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        assert_eq!(outcome.rows.len(), 5);
        assert_eq!(outcome.total_seen, 5);
    }

    #[tokio::test]
    async fn fail_policy_errors_when_the_result_exceeds_the_bound() {
        let result = drain_with_policy(
            ok_stream(50),
            BackpressurePolicy::Fail,
            10,
            &CancellationFlag::new(),
        )
        .await;
        assert!(matches!(result, Err(StoreError::Query { .. })));
    }

    // ── DropOldest policy ────────────────────────────────────────────

    #[tokio::test]
    async fn drop_oldest_keeps_the_most_recent_window() {
        let outcome = drain_with_policy(
            ok_stream(100),
            BackpressurePolicy::DropOldest,
            10,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        assert_eq!(outcome.rows.len(), 10, "bounded to the window");
        assert_eq!(outcome.dropped, 90);
        assert_eq!(outcome.total_seen, 100);
        // The window is the TAIL — rows 90..100.
        assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(90)));
        assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(99)));
    }

    // ── PauseUpstream policy ─────────────────────────────────────────

    #[tokio::test]
    async fn pause_upstream_truncates_at_the_bound() {
        let outcome = drain_with_policy(
            ok_stream(100),
            BackpressurePolicy::PauseUpstream,
            10,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        assert_eq!(outcome.rows.len(), 10);
        assert!(outcome.truncated, "more rows existed past the bound");
        // The window is the HEAD — rows 0..10.
        assert_eq!(outcome.rows.first().unwrap().get("id"), Some(&Value::from(0)));
        assert_eq!(outcome.rows.last().unwrap().get("id"), Some(&Value::from(9)));
    }

    #[tokio::test]
    async fn pause_upstream_within_the_bound_is_not_truncated() {
        let outcome = drain_with_policy(
            ok_stream(3),
            BackpressurePolicy::PauseUpstream,
            10,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        assert_eq!(outcome.rows.len(), 3);
        assert!(!outcome.truncated);
    }

    // ── DegradeQuality policy ────────────────────────────────────────

    #[tokio::test]
    async fn degrade_quality_is_the_oss_identity_drain() {
        let outcome = drain_with_policy(
            ok_stream(50),
            BackpressurePolicy::DegradeQuality,
            10,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        // OSS identity degrader — every row, the bound is not applied.
        assert_eq!(outcome.rows.len(), 50);
        assert_eq!(outcome.dropped, 0);
        assert!(!outcome.truncated);
    }

    // ── Cancellation ─────────────────────────────────────────────────

    #[tokio::test]
    async fn a_cancelled_flag_stops_the_drain_immediately() {
        let cancel = CancellationFlag::new();
        cancel.cancel();
        let outcome = drain_with_policy(
            ok_stream(100),
            BackpressurePolicy::DegradeQuality,
            1000,
            &cancel,
        )
        .await
        .unwrap();
        assert!(outcome.cancelled);
        assert!(outcome.rows.is_empty(), "no row consumed after cancel");
    }

    // ── Decode error aborts ──────────────────────────────────────────

    #[tokio::test]
    async fn a_row_decode_error_aborts_the_drain() {
        let items: Vec<Result<StoreRow, StoreError>> = vec![
            Ok(row(0)),
            Err(StoreError::Decode {
                column: "x".into(),
                pg_type: "INT4".into(),
                source: "boom".into(),
            }),
            Ok(row(2)),
        ];
        let result = drain_with_policy(
            futures::stream::iter(items),
            BackpressurePolicy::DegradeQuality,
            100,
            &CancellationFlag::new(),
        )
        .await;
        assert!(matches!(result, Err(StoreError::Decode { .. })));
    }

    // ── Empty result ─────────────────────────────────────────────────

    #[tokio::test]
    async fn an_empty_result_drains_cleanly() {
        let outcome = drain_with_policy(
            ok_stream(0),
            DEFAULT_RETRIEVE_POLICY,
            DEFAULT_MAX_ROWS,
            &CancellationFlag::new(),
        )
        .await
        .unwrap();
        assert!(outcome.rows.is_empty());
        assert_eq!(outcome.total_seen, 0);
        assert!(!outcome.truncated && !outcome.cancelled);
    }

    // ── stream_metadata ──────────────────────────────────────────────

    #[test]
    fn stream_metadata_carries_the_drain_disposition() {
        let outcome = RowStreamOutcome {
            rows: vec![row(1)],
            total_seen: 100,
            dropped: 99,
            truncated: false,
            cancelled: false,
        };
        let meta = stream_metadata(BackpressurePolicy::DropOldest, &outcome);
        assert_eq!(meta["policy"], "drop_oldest");
        assert_eq!(meta["total_seen"], 100);
        assert_eq!(meta["dropped"], 99);
        assert_eq!(meta["truncated"], false);
    }

    #[test]
    fn defaults_are_pause_upstream_and_a_sane_bound() {
        assert_eq!(DEFAULT_RETRIEVE_POLICY, BackpressurePolicy::PauseUpstream);
        assert!(DEFAULT_MAX_ROWS >= 1000);
    }
}