Skip to main content

reddb_server/server/
output_stream.rs

1//! Output streaming primitives for issue #760 (PRD #759 / ADR 0029) — the
2//! "shim slice" of bidirectional streaming.
3//!
4//! Scope of this module:
5//!   - [`StreamConfig`]: capture the `stream.*` namespace from `red_config`
6//!     at lease open and freeze it for the lease's lifetime (acceptance
7//!     criterion: KV mutations mid-stream do not retroactively change
8//!     behaviour).
9//!   - [`StreamLease`]: an internal, unforwarded handle bound to a
10//!     snapshot LSN and a frozen config. No external surface yet (S2 will
11//!     add quotas + per-principal accounting).
12//!   - [`open_stream`]: refuses with `stream_in_transaction_unsupported`
13//!     when the caller already has an active `BEGIN` on the session.
14//!   - [`ChunkProducer`]: page-aligned (N × 16 KiB) production buffer
15//!     that flushes on the first of byte / row / latency cap.
16//!   - [`Clock`]: trait-injected time source so TTL expiry is testable.
17//!
18//! The HTTP NDJSON wire framing built on top of these primitives lives in
19//! `handlers_query::handle_query_ndjson_stream` and is dispatched from
20//! `routing::try_route_streaming`.
21
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33/// Engine page size — the production buffer is always a multiple of this.
34/// Matches `storage::engine::PAGE_SIZE`.
35pub const PAGE_SIZE: usize = 16 * 1024;
36
37/// Injectable time source. Production code uses [`SystemClock`]; tests
38/// drive TTL expiry with [`FakeClock`] so they don't depend on wall time.
39pub trait Clock: Send + Sync {
40    fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47    fn now_ms(&self) -> u64 {
48        SystemTime::now()
49            .duration_since(UNIX_EPOCH)
50            .map(|d| d.as_millis() as u64)
51            .unwrap_or(0)
52    }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57    now_ms: AtomicU64,
58}
59
60impl FakeClock {
61    pub fn new(now_ms: u64) -> Self {
62        Self {
63            now_ms: AtomicU64::new(now_ms),
64        }
65    }
66
67    pub fn advance(&self, ms: u64) {
68        self.now_ms.fetch_add(ms, Ordering::SeqCst);
69    }
70}
71
72impl Clock for FakeClock {
73    fn now_ms(&self) -> u64 {
74        self.now_ms.load(Ordering::SeqCst)
75    }
76}
77
78/// Frozen snapshot of the `stream.*` namespace at lease open. Acceptance
79/// criterion: a `red_config` mutation while a stream is running does not
80/// retroactively change the running stream's behaviour — that is, the
81/// per-lease config is value-typed and not a back-reference.
82#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84    pub snapshot_ttl_ms: u64,
85    pub chunk_default_pages: usize,
86    pub chunk_min_pages: usize,
87    pub chunk_max_pages: usize,
88    pub chunk_max_rows: usize,
89    pub chunk_max_latency_ms: u64,
90    /// Process-wide concurrent stream cap (issue #761 / S2).
91    /// Acquiring the (N+1)th slot is refused with
92    /// `server_stream_capacity_exhausted`.
93    pub max_global_streams: usize,
94    /// Per-principal concurrent stream cap (issue #761 / S2).
95    /// Acquiring the (M+1)th slot for a single principal is refused
96    /// with `principal_stream_quota_exhausted` even when the global
97    /// counter still has room.
98    pub max_per_principal_streams: usize,
99    /// Issue #765 / S6 — default end-to-end verification mode for input
100    /// streams when the open frame does not request one
101    /// (`stream.integrity.default_verify`, `"none"` per ADR 0029).
102    pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106    fn default() -> Self {
107        Self::DEFAULT
108    }
109}
110
111impl StreamConfig {
112    /// Defaults match ADR 0029. `snapshot_ttl_ms` is the only key
113    /// observable mid-stream; the chunk caps only affect framing.
114    pub const DEFAULT: StreamConfig = StreamConfig {
115        snapshot_ttl_ms: 60_000,
116        chunk_default_pages: 4,
117        chunk_min_pages: 1,
118        chunk_max_pages: 64,
119        chunk_max_rows: 1000,
120        chunk_max_latency_ms: 50,
121        max_global_streams: 256,
122        max_per_principal_streams: 32,
123        default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124    };
125
126    /// Read every `stream.*` key from `red_config` over the runtime's KV
127    /// store. Missing keys fall back to [`Self::DEFAULT`]. Unparseable or
128    /// out-of-range values fall back silently — bad config never gets to
129    /// terminate a request that would otherwise succeed.
130    pub fn load(runtime: &RedDBRuntime) -> Self {
131        let db = runtime.db();
132        let read_u64 = |key: &str| -> Option<u64> {
133            match db.get_kv(RED_CONFIG_COLLECTION, key) {
134                Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135                Some((Value::UnsignedInteger(v), _)) => Some(v),
136                Some((Value::Text(text), _)) => text.parse().ok(),
137                _ => None,
138            }
139        };
140
141        let mut cfg = Self::DEFAULT;
142        if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143            cfg.snapshot_ttl_ms = v;
144        }
145        if let Some(v) = read_u64("stream.chunk.default_pages") {
146            cfg.chunk_default_pages = v as usize;
147        }
148        if let Some(v) = read_u64("stream.chunk.min_pages") {
149            cfg.chunk_min_pages = v as usize;
150        }
151        if let Some(v) = read_u64("stream.chunk.max_pages") {
152            cfg.chunk_max_pages = v as usize;
153        }
154        if let Some(v) = read_u64("stream.chunk.max_rows") {
155            cfg.chunk_max_rows = v as usize;
156        }
157        if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158            cfg.chunk_max_latency_ms = v;
159        }
160        if let Some(v) = read_u64("stream.max_global") {
161            cfg.max_global_streams = v as usize;
162        }
163        if let Some(v) = read_u64("stream.max_per_principal") {
164            cfg.max_per_principal_streams = v as usize;
165        }
166        // Issue #765 / S6 — opt-in integrity default. Stored as a Text key
167        // (`"none"` / `"sha256"`); anything else falls back to `None`.
168        if let Some((Value::Text(text), _)) =
169            db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170        {
171            cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172        }
173        cfg.normalize();
174        cfg
175    }
176
177    /// Clamp interrelated fields to a self-consistent state. Floors come
178    /// from ADR 0029 ("hard floor 1 page"); ceilings prevent zero-row /
179    /// zero-byte caps that would prevent the producer from ever flushing.
180    fn normalize(&mut self) {
181        if self.chunk_min_pages == 0 {
182            self.chunk_min_pages = 1;
183        }
184        if self.chunk_max_pages < self.chunk_min_pages {
185            self.chunk_max_pages = self.chunk_min_pages;
186        }
187        if self.chunk_default_pages < self.chunk_min_pages {
188            self.chunk_default_pages = self.chunk_min_pages;
189        }
190        if self.chunk_default_pages > self.chunk_max_pages {
191            self.chunk_default_pages = self.chunk_max_pages;
192        }
193        if self.chunk_max_rows == 0 {
194            self.chunk_max_rows = 1;
195        }
196        if self.max_global_streams == 0 {
197            self.max_global_streams = 1;
198        }
199        if self.max_per_principal_streams == 0 {
200            self.max_per_principal_streams = 1;
201        }
202    }
203
204    /// Page-aligned production buffer size in bytes. Acceptance criterion:
205    /// "production buffer is always N × 16 KiB".
206    pub fn production_buffer_bytes(&self) -> usize {
207        self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208    }
209}
210
211/// Monotonic, process-local lease id. The id is internal — the bearer
212/// token still authenticates the open, the lease only identifies the
213/// stream for audit and termination (ADR 0029 "Authorization").
214///
215/// Issue #767 / S8 — the internal id is **never** forwarded on the
216/// wire. The client receives [`StreamLease::lease_handle`], a 128-bit
217/// opaque random value, in `OpenAck` / the `{"end": …}` envelope.
218static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220/// Issue #767 / S8 — wire-visible lease handle, 128 bits drawn from
221/// the OS CSPRNG. Hex-encoded so it survives JSON / header transport.
222/// Opacity property: a client cannot derive the internal lease id
223/// from the handle (the two are independent), and the handle is not
224/// sequential across opens.
225pub const LEASE_HANDLE_BYTES: usize = 16;
226
227/// Generate an opaque 128-bit lease handle (32-char hex). Per ADR 0029,
228/// the handle is the only identifier the wire sees; the internal
229/// `StreamLease::id` stays server-side.
230pub fn generate_lease_handle() -> String {
231    let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233        // CSPRNG failure is exceedingly rare on supported platforms;
234        // fall back to a high-entropy mix so the open path never
235        // terminates a stream that would otherwise succeed. The
236        // fallback is *not* secure unguessable — production paths
237        // will hit /dev/urandom successfully.
238        let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239        let now = crate::utils::now_unix_nanos().to_le_bytes();
240        bytes[..8].copy_from_slice(&lo);
241        bytes[8..].copy_from_slice(&now);
242    }
243    crate::utils::to_hex(&bytes)
244}
245
246/// Issue #767 / S8 — non-validating JWT `exp` claim extractor. Returns
247/// the expiry time in milliseconds since the epoch when `token` is a
248/// JWT carrying an integer `exp` claim, otherwise `None`.
249///
250/// The extractor does **not** validate the JWT signature, issuer, or
251/// audience — those gates run during `is_authorized` at OpenStream.
252/// This helper is consulted *after* the bearer has been accepted, to
253/// decide whether the lease will outlive the bearer credential and an
254/// audit event should be emitted accordingly.
255pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256    let parts: Vec<&str> = token.split('.').collect();
257    if parts.len() != 3 {
258        return None;
259    }
260    let payload = base64url_decode_padded(parts[1])?;
261    let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262    let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263    if exp_secs <= 0 {
264        return None;
265    }
266    Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270    let mut s = String::with_capacity(input.len() + 4);
271    for ch in input.chars() {
272        match ch {
273            '-' => s.push('+'),
274            '_' => s.push('/'),
275            _ => s.push(ch),
276        }
277    }
278    while !s.len().is_multiple_of(4) {
279        s.push('=');
280    }
281    crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284/// Issue #767 / S8 — wire-visible reason for `stream.closed`. The
285/// audit emit site decides the variant once the stream's exit shape
286/// has been observed.
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289    Ok,
290    Cancelled,
291    Error,
292    SnapshotExpired,
293    CapacityRefused,
294    IntegrityFailed,
295}
296
297impl CloseReason {
298    pub fn as_str(&self) -> &'static str {
299        match self {
300            CloseReason::Ok => "ok",
301            CloseReason::Cancelled => "cancelled",
302            CloseReason::Error => "error",
303            CloseReason::SnapshotExpired => "snapshot_expired",
304            CloseReason::CapacityRefused => "capacity_refused",
305            CloseReason::IntegrityFailed => "integrity_failed",
306        }
307    }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312    pub id: u64,
313    /// Issue #767 / S8 — opaque handle handed to the client. 32-char
314    /// hex (128 bits of CSPRNG output). The internal `id` is not
315    /// derivable from this value.
316    pub lease_handle: String,
317    pub snapshot_lsn: u64,
318    pub opened_at_ms: u64,
319    pub config: StreamConfig,
320}
321
322impl StreamLease {
323    /// `true` once `snapshot_ttl_ms` has elapsed since the lease was opened.
324    /// The shim slice materialises the result first, so expiry in practice
325    /// only fires for streams that take longer than `ttl_ms` to drain to
326    /// the client. The check is wired in so the wire envelope can carry
327    /// `snapshot_expired` exactly the way later slices (pull-based
328    /// executors) will need it.
329    pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330        now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331    }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336    /// Acceptance criterion #4: `OpenStream` against a session with an
337    /// active `BEGIN` is refused with this code. The wire shape carries
338    /// it back as `{"error": {"code": "stream_in_transaction_unsupported"}}`.
339    TransactionActive,
340}
341
342impl OpenStreamError {
343    pub fn code(&self) -> &'static str {
344        match self {
345            OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346        }
347    }
348
349    pub fn message(&self) -> &'static str {
350        match self {
351            OpenStreamError::TransactionActive => {
352                "cannot open output stream while a transaction is active on this session"
353            }
354        }
355    }
356}
357
358/// Issue a lease. The caller is responsible for binding `snapshot_lsn`
359/// to the same MVCC view the underlying executor will read from; in the
360/// shim slice this is `runtime.cdc_current_lsn()` captured before
361/// `execute_query`.
362pub fn open_stream(
363    config: StreamConfig,
364    snapshot_lsn: u64,
365    in_transaction: bool,
366    clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368    if in_transaction {
369        return Err(OpenStreamError::TransactionActive);
370    }
371    Ok(StreamLease {
372        id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373        lease_handle: generate_lease_handle(),
374        snapshot_lsn,
375        opened_at_ms: clock.now_ms(),
376        config,
377    })
378}
379
380/// Page-aligned chunk producer. The producer accumulates byte-encoded
381/// rows in an N × 16 KiB buffer; on the first of byte / row / latency
382/// cap it forwards the buffer to the supplied flush closure, which the
383/// transport layer turns into a chunked-encoding frame.
384///
385/// The struct does not know about HTTP, NDJSON, or chunked transfer —
386/// it is wire-agnostic so the gRPC and RedWire paths can reuse it.
387pub struct ChunkProducer<'a> {
388    buf: Vec<u8>,
389    rows: usize,
390    window_started_ms: u64,
391    cap_bytes: usize,
392    cap_rows: usize,
393    cap_latency_ms: u64,
394    clock: &'a dyn Clock,
395    total_flushes: u64,
396    total_bytes: u64,
397    total_rows: u64,
398    last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403    Byte,
404    Row,
405    Latency,
406    Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410    pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411        let cap_bytes = config.production_buffer_bytes();
412        Self {
413            buf: Vec::with_capacity(cap_bytes),
414            rows: 0,
415            window_started_ms: clock.now_ms(),
416            cap_bytes,
417            cap_rows: config.chunk_max_rows,
418            cap_latency_ms: config.chunk_max_latency_ms,
419            clock,
420            total_flushes: 0,
421            total_bytes: 0,
422            total_rows: 0,
423            last_flush_reason: None,
424        }
425    }
426
427    /// Append one already-encoded line (NDJSON: bytes + `\n`). Returns
428    /// `true` if the append triggered a flush.
429    pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430    where
431        F: FnMut(&[u8]) -> std::io::Result<()>,
432    {
433        self.buf.extend_from_slice(line);
434        self.rows += 1;
435        self.total_rows += 1;
436
437        if self.buf.len() >= self.cap_bytes {
438            self.flush(flush, FlushReason::Byte)?;
439            return Ok(true);
440        }
441        if self.rows >= self.cap_rows {
442            self.flush(flush, FlushReason::Row)?;
443            return Ok(true);
444        }
445        let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446        if elapsed >= self.cap_latency_ms {
447            self.flush(flush, FlushReason::Latency)?;
448            return Ok(true);
449        }
450        Ok(false)
451    }
452
453    /// Issue #768 / S9 — drive a pull-based line source into the
454    /// production buffer. The producer *pulls* one encoded line at a
455    /// time from `source` and routes it through [`push_line`], so the
456    /// resident working set is the page-aligned buffer plus the single
457    /// line currently in hand — never the full result set. Pair this
458    /// with the pull-based scan iterators
459    /// (`parallel_scan::parallel_scan_rows`,
460    /// `bitmap_scan::execute_bitmap_scan_stream`) whose records are
461    /// encoded lazily by `encode`.
462    ///
463    /// Returns the number of lines consumed. Flush caps (byte / row /
464    /// latency) fire mid-drain exactly as they would for hand-driven
465    /// [`push_line`] calls, so first-line latency stays bounded by
466    /// `chunk.max_latency_ms` regardless of how many lines the source
467    /// will ultimately yield.
468    ///
469    /// [`push_line`]: ChunkProducer::push_line
470    pub fn drive_lines<S, R, Enc, F>(
471        &mut self,
472        source: S,
473        mut encode: Enc,
474        flush: &mut F,
475    ) -> std::io::Result<u64>
476    where
477        S: IntoIterator<Item = R>,
478        Enc: FnMut(&R) -> Vec<u8>,
479        F: FnMut(&[u8]) -> std::io::Result<()>,
480    {
481        let mut count = 0u64;
482        for record in source {
483            let line = encode(&record);
484            self.push_line(&line, flush)?;
485            count += 1;
486        }
487        Ok(count)
488    }
489
490    /// Force-flush any buffered bytes — used after the final NDJSON line
491    /// (`{"end": …}`) to push the tail of the buffer before closing the
492    /// connection.
493    pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494    where
495        F: FnMut(&[u8]) -> std::io::Result<()>,
496    {
497        if !self.buf.is_empty() {
498            self.flush(flush, FlushReason::Terminal)?;
499        }
500        Ok(())
501    }
502
503    fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504    where
505        F: FnMut(&[u8]) -> std::io::Result<()>,
506    {
507        flush(&self.buf)?;
508        self.total_bytes += self.buf.len() as u64;
509        self.total_flushes += 1;
510        self.last_flush_reason = Some(reason);
511        self.buf.clear();
512        self.rows = 0;
513        self.window_started_ms = self.clock.now_ms();
514        Ok(())
515    }
516
517    pub fn total_flushes(&self) -> u64 {
518        self.total_flushes
519    }
520    pub fn total_bytes(&self) -> u64 {
521        self.total_bytes
522    }
523    pub fn total_rows(&self) -> u64 {
524        self.total_rows
525    }
526    pub fn last_flush_reason(&self) -> Option<FlushReason> {
527        self.last_flush_reason
528    }
529}
530
531/// HTTP chunked transfer encoding helpers. We do not pull in `hyper` for
532/// the streaming path; the existing SSE handler also hand-rolls its own
533/// HTTP framing, and matching that style keeps the diff narrow.
534pub fn write_chunked_response_header<W: std::io::Write>(
535    writer: &mut W,
536    status: u16,
537    content_type: &str,
538) -> std::io::Result<()> {
539    let header = format!(
540        "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n",
541        status,
542        crate::server::transport::status_text(status),
543        content_type,
544    );
545    writer.write_all(header.as_bytes())?;
546    writer.flush()
547}
548
549/// Emit one HTTP chunk (`<hex-size>\r\n<bytes>\r\n`). A zero-length
550/// payload is silently dropped — the terminator chunk lives in
551/// [`write_chunked_terminator`].
552pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
553    if bytes.is_empty() {
554        return Ok(());
555    }
556    let size = format!("{:x}\r\n", bytes.len());
557    writer.write_all(size.as_bytes())?;
558    writer.write_all(bytes)?;
559    writer.write_all(b"\r\n")?;
560    writer.flush()
561}
562
563/// Final `0\r\n\r\n` chunk that terminates a chunked body.
564pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
565    writer.write_all(b"0\r\n\r\n")?;
566    writer.flush()
567}
568
569/// Issue #761 / S2 — process-wide stream capacity registry. Holds two
570/// counters: a global concurrent-stream count and a per-principal map.
571/// Both are decremented when the [`StreamCapacityGuard`] handed back
572/// from a successful `try_acquire` is dropped, so the release path
573/// covers every normal exit (success, mid-stream error, snapshot
574/// expiry, client disconnect that drops the writer chain, panic
575/// unwind through the stack frame holding the guard).
576#[derive(Debug, Default)]
577pub struct StreamCapacityRegistry {
578    inner: Mutex<CapacityInner>,
579}
580
581#[derive(Debug, Default)]
582struct CapacityInner {
583    global_count: usize,
584    per_principal: HashMap<String, usize>,
585}
586
587/// Failure modes of [`StreamCapacityRegistry::try_acquire`]. Each
588/// variant carries the cap that fired and the live counter value at
589/// refusal time so clients can back off intelligently (the HTTP layer
590/// surfaces these inside the structured 429 body).
591#[derive(Debug, Clone, PartialEq, Eq)]
592pub enum AcquireError {
593    /// `stream.max_global` exceeded. Per acceptance criterion #1.
594    GlobalExhausted { limit: usize, current: usize },
595    /// `stream.max_per_principal` exceeded for `principal`. Per
596    /// acceptance criterion #2. The principal is surfaced verbatim;
597    /// callers must escape it on the wire.
598    PrincipalExhausted {
599        principal: String,
600        limit: usize,
601        current: usize,
602    },
603}
604
605impl AcquireError {
606    pub fn code(&self) -> &'static str {
607        match self {
608            AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
609            AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
610        }
611    }
612
613    pub fn message(&self) -> String {
614        match self {
615            AcquireError::GlobalExhausted { limit, current } => {
616                format!("server stream capacity exhausted (limit {limit}, current {current})")
617            }
618            AcquireError::PrincipalExhausted {
619                principal,
620                limit,
621                current,
622            } => format!(
623                "principal {principal} stream quota exhausted (limit {limit}, current {current})"
624            ),
625        }
626    }
627}
628
629impl StreamCapacityRegistry {
630    pub fn new() -> Arc<Self> {
631        Arc::new(Self::default())
632    }
633
634    /// Attempt to acquire one slot. Both caps are checked under the
635    /// same lock, so a concurrent acquire+release pair cannot over-
636    /// issue beyond either ceiling (acceptance criterion #5).
637    pub fn try_acquire(
638        self: &Arc<Self>,
639        principal: &str,
640        max_global: usize,
641        max_per_principal: usize,
642    ) -> Result<StreamCapacityGuard, AcquireError> {
643        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
644        if inner.global_count >= max_global {
645            return Err(AcquireError::GlobalExhausted {
646                limit: max_global,
647                current: inner.global_count,
648            });
649        }
650        let current = inner.per_principal.get(principal).copied().unwrap_or(0);
651        if current >= max_per_principal {
652            return Err(AcquireError::PrincipalExhausted {
653                principal: principal.to_string(),
654                limit: max_per_principal,
655                current,
656            });
657        }
658        inner.global_count += 1;
659        inner
660            .per_principal
661            .insert(principal.to_string(), current + 1);
662        Ok(StreamCapacityGuard {
663            registry: Arc::clone(self),
664            principal: principal.to_string(),
665            released: false,
666        })
667    }
668
669    fn release(&self, principal: &str) {
670        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
671        if inner.global_count > 0 {
672            inner.global_count -= 1;
673        }
674        if let Some(count) = inner.per_principal.get_mut(principal) {
675            if *count > 0 {
676                *count -= 1;
677            }
678            if *count == 0 {
679                inner.per_principal.remove(principal);
680            }
681        }
682    }
683
684    /// Visible for tests and audit handlers.
685    pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
686        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
687        (inner.global_count, inner.per_principal.clone())
688    }
689}
690
691/// RAII slot returned by [`StreamCapacityRegistry::try_acquire`].
692/// Decrements both counters on drop — acceptance criterion #4
693/// ("releasing a slot on stream end (any reason) decrements both
694/// counters atomically").
695#[must_use = "dropping the guard immediately releases the stream slot"]
696#[derive(Debug)]
697pub struct StreamCapacityGuard {
698    registry: Arc<StreamCapacityRegistry>,
699    principal: String,
700    released: bool,
701}
702
703impl StreamCapacityGuard {
704    pub fn principal(&self) -> &str {
705        &self.principal
706    }
707}
708
709impl Drop for StreamCapacityGuard {
710    fn drop(&mut self) {
711        if !self.released {
712            self.registry.release(&self.principal);
713            self.released = true;
714        }
715    }
716}
717
718// ──────── Issue #766 / S7 — resume coordinator ────────
719
720/// Resumability assessment of a query plan. The shim slice runs a
721/// textual classifier over the SQL string: a query is resumable iff
722/// it has a stable total order over a unique key. By default we
723/// promise RID ASC; an explicit `ORDER BY rid` (or `ORDER BY rid ASC`)
724/// is also resumable. Anything that aggregates / groups / windows or
725/// orders on a non-unique column is not.
726pub fn assess_resumability(query: &str) -> bool {
727    let upper = query.to_uppercase();
728    let trimmed = upper.trim_start();
729    if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
730        return false;
731    }
732    const FORBIDDEN: &[&str] = &[
733        " GROUP BY ",
734        " HAVING ",
735        " DISTINCT ",
736        "DISTINCT ",
737        "COUNT(",
738        "SUM(",
739        "AVG(",
740        "MIN(",
741        "MAX(",
742        "ARRAY_AGG(",
743        "JSON_AGG(",
744        "OVER(",
745        " OVER (",
746        " JOIN ",
747    ];
748    for kw in FORBIDDEN {
749        if upper.contains(kw) {
750            return false;
751        }
752    }
753    if let Some(idx) = upper.find("ORDER BY") {
754        let tail = &upper[idx + "ORDER BY".len()..];
755        // Strip trailing LIMIT and statement terminator.
756        let mut clause = tail.to_string();
757        if let Some(lim) = clause.find(" LIMIT ") {
758            clause.truncate(lim);
759        }
760        if let Some(semi) = clause.find(';') {
761            clause.truncate(semi);
762        }
763        let clause = clause.trim();
764        if !matches!(clause, "RID" | "RID ASC") {
765            return false;
766        }
767    }
768    true
769}
770
771/// Resume-eligibility ledger. Holds `(snapshot_lsn → opened_at_ms,
772/// ttl_ms)` so a resume request can be checked against TTL without
773/// trusting the wall clock on the client. The shim slice does not
774/// implement true MVCC pinning — the registry's role is to make
775/// `snapshot_expired` deterministic and testable.
776#[derive(Debug, Default)]
777pub struct LeaseRegistry {
778    inner: Mutex<HashMap<u64, LeaseEntry>>,
779}
780
781#[derive(Debug, Clone, Copy)]
782struct LeaseEntry {
783    opened_at_ms: u64,
784    ttl_ms: u64,
785}
786
787#[derive(Debug, Clone, Copy, PartialEq, Eq)]
788pub enum LeaseLookup {
789    /// No lease ever recorded for this snapshot LSN.
790    Unknown,
791    /// Lease exists but its TTL has elapsed.
792    Expired,
793    /// Lease exists and is still within TTL.
794    Live,
795}
796
797impl LeaseRegistry {
798    pub fn new() -> Arc<Self> {
799        Arc::new(Self::default())
800    }
801
802    /// Record a freshly-opened lease. Idempotent — re-inserting the
803    /// same snapshot_lsn refreshes the timestamp (the client cannot
804    /// observe lease identity through the snapshot LSN alone, so this
805    /// matches "the latest open wins" semantics).
806    pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
807        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
808        inner.insert(
809            snapshot_lsn,
810            LeaseEntry {
811                opened_at_ms,
812                ttl_ms,
813            },
814        );
815    }
816
817    /// Resume-time lookup. Returns whether the lease is unknown,
818    /// expired, or still live as of `now_ms`.
819    pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
820        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
821        match inner.get(&snapshot_lsn) {
822            None => LeaseLookup::Unknown,
823            Some(entry) => {
824                if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
825                    LeaseLookup::Expired
826                } else {
827                    LeaseLookup::Live
828                }
829            }
830        }
831    }
832
833    /// Visible for tests / audit.
834    #[doc(hidden)]
835    pub fn len(&self) -> usize {
836        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
837    }
838}
839
840/// Issue #807 / PRD #750 — opaque cursor token. 192 bits drawn from the
841/// OS CSPRNG, hex-encoded so it survives JSON transport. The token is the
842/// only cursor identifier the wire sees; the pinned snapshot, scope, and
843/// query stay server-side in the [`CursorRegistry`]. Opacity property: a
844/// client cannot derive the pinned snapshot LSN or another principal's
845/// token from the value, and tokens are not sequential across opens.
846pub const CURSOR_TOKEN_BYTES: usize = 24;
847
848static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
849
850/// Generate an opaque 192-bit cursor token (48-char hex). Falls back to a
851/// high-entropy time/counter mix if the CSPRNG is unavailable so a fresh
852/// open never fails to mint a token (the fallback is not unguessable, but
853/// production paths reach `/dev/urandom` successfully).
854pub fn generate_cursor_token() -> String {
855    let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
856    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
857        let salt = NEXT_CURSOR_SALT
858            .fetch_add(1, Ordering::SeqCst)
859            .to_le_bytes();
860        let now = crate::utils::now_unix_nanos().to_le_bytes();
861        bytes[..8].copy_from_slice(&salt);
862        bytes[8..16].copy_from_slice(&now);
863    }
864    crate::utils::to_hex(&bytes)
865}
866
867/// Server-side cursor record. Pins the read snapshot and the query so a
868/// resume replays the same point-in-time view, and binds the entry to the
869/// `(tenant, principal)` that opened it for authorization scoping.
870#[derive(Debug, Clone)]
871struct CursorEntry {
872    snapshot_lsn: u64,
873    tenant: String,
874    principal: String,
875    query: String,
876    entity_types: Option<Vec<String>>,
877    capabilities: Option<Vec<String>>,
878    opened_at_ms: u64,
879    ttl_ms: u64,
880    /// Issue #808 / 750d — shared cancel token observed by the executor for
881    /// this stream. The cancel endpoint and client-disconnect detection
882    /// raise it; the engine iterators poll it. Cloned out to the handler at
883    /// open so the live stream and a later cancel request share one flag.
884    cancel_token: CancelToken,
885    /// Issue #808 / 750d — tombstone marker. Set once the stream is
886    /// cancelled (explicitly or by disconnect); a tombstoned cursor can
887    /// never be resumed, only reported as cleanly cancelled to its owner.
888    cancelled: bool,
889}
890
891/// Pinned read state returned to the handler when a resume token resolves
892/// cleanly. The handler re-executes `query` against `snapshot_lsn` to
893/// re-stream the same view the original cursor referenced.
894#[derive(Debug, Clone, PartialEq, Eq)]
895pub struct CursorResume {
896    pub snapshot_lsn: u64,
897    pub query: String,
898    pub entity_types: Option<Vec<String>>,
899    pub capabilities: Option<Vec<String>>,
900    pub expires_at_ms: u64,
901}
902
903/// Why a resume token was refused. The handler maps every variant to a
904/// structured wire error. [`CursorReject::NotFound`] deliberately covers
905/// both "never issued" and "owned by a different tenant/principal" so an
906/// unauthorized caller cannot tell a foreign cursor from a nonexistent one.
907#[derive(Debug, Clone, Copy, PartialEq, Eq)]
908pub enum CursorReject {
909    /// Token unknown, or known to a different tenant/principal (masked so
910    /// existence never leaks to an unauthorized caller).
911    NotFound,
912    /// Token owned by this caller but its TTL has elapsed.
913    Expired,
914    /// Issue #808 / 750d — token owned by this caller but the stream was
915    /// cancelled (explicitly or by client disconnect) and tombstoned. A
916    /// resume is refused with a clean, structured error rather than
917    /// replaying a stream the client already abandoned.
918    Cancelled,
919}
920
921/// Issue #807 / PRD #750 — process-wide cursor registry for `/query/stream`.
922///
923/// On a fresh stream the server mints an opaque token, pins the read
924/// snapshot LSN, and records it here scoped to the requesting
925/// `(tenant, principal)` with a TTL. A later resume request presents the
926/// token; [`resolve`](CursorRegistry::resolve) returns the pinned query +
927/// snapshot only when the token is live AND the caller's scope matches.
928///
929/// In-memory is acceptable for this first cut (per the issue): the registry
930/// is shared via `Arc` so cloned server handles enforce against one map.
931#[derive(Debug, Default)]
932pub struct CursorRegistry {
933    inner: Mutex<HashMap<String, CursorEntry>>,
934}
935
936impl CursorRegistry {
937    pub fn new() -> Arc<Self> {
938        Arc::new(Self::default())
939    }
940
941    /// Mint + record a cursor for a freshly-opened stream, returning the
942    /// opaque token handed to the client. The entry expires at
943    /// `opened_at_ms + ttl_ms` (saturating).
944    #[allow(clippy::too_many_arguments)]
945    pub fn register(
946        &self,
947        snapshot_lsn: u64,
948        tenant: &str,
949        principal: &str,
950        query: &str,
951        entity_types: Option<Vec<String>>,
952        capabilities: Option<Vec<String>>,
953        opened_at_ms: u64,
954        ttl_ms: u64,
955    ) -> String {
956        let token = generate_cursor_token();
957        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
958        inner.insert(
959            token.clone(),
960            CursorEntry {
961                snapshot_lsn,
962                tenant: tenant.to_string(),
963                principal: principal.to_string(),
964                query: query.to_string(),
965                entity_types,
966                capabilities,
967                opened_at_ms,
968                ttl_ms,
969                cancel_token: CancelToken::new(),
970                cancelled: false,
971            },
972        );
973        token
974    }
975
976    /// Issue #808 / 750d — clone out the shared cancel token for a cursor
977    /// the caller just minted, so the live streaming handler can poll the
978    /// same flag a later cancel request raises. No scope check: the only
979    /// caller is the handler that owns the freshly-registered token.
980    pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
981        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
982        inner.get(token).map(|e| e.cancel_token.clone())
983    }
984
985    /// Issue #808 / 750d — cancel + tombstone a cursor on behalf of
986    /// `(tenant, principal)`. Raises the shared cancel token (so an
987    /// in-flight stream observing it stops) and marks the entry cancelled
988    /// so a later resume is refused with [`CursorReject::Cancelled`].
989    ///
990    /// Scope is checked exactly as in [`resolve`](CursorRegistry::resolve):
991    /// a token owned by a different tenant/principal — or one that never
992    /// existed — returns [`CursorReject::NotFound`] so an unauthorized
993    /// caller cannot cancel, or even confirm the existence of, a foreign
994    /// stream. Cancelling an already-cancelled cursor is idempotent.
995    pub fn cancel(
996        &self,
997        token: &str,
998        tenant: &str,
999        principal: &str,
1000    ) -> Result<CancelToken, CursorReject> {
1001        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1002        let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1003        if entry.tenant != tenant || entry.principal != principal {
1004            return Err(CursorReject::NotFound);
1005        }
1006        entry.cancelled = true;
1007        entry.cancel_token.cancel();
1008        Ok(entry.cancel_token.clone())
1009    }
1010
1011    /// Resolve a resume token for `(tenant, principal)` as of `now_ms`.
1012    ///
1013    /// Scope is checked **before** expiry: a token owned by a different
1014    /// tenant or principal resolves to [`CursorReject::NotFound`] —
1015    /// identical to a token that was never issued — so an unauthorized
1016    /// caller learns nothing about its existence. Only the rightful owner
1017    /// can observe [`CursorReject::Expired`].
1018    pub fn resolve(
1019        &self,
1020        token: &str,
1021        tenant: &str,
1022        principal: &str,
1023        now_ms: u64,
1024    ) -> Result<CursorResume, CursorReject> {
1025        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1026        let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1027        // Scope gate first — a mismatch must be indistinguishable from
1028        // absence so a cross-tenant/cross-principal probe cannot confirm
1029        // the token exists.
1030        if entry.tenant != tenant || entry.principal != principal {
1031            return Err(CursorReject::NotFound);
1032        }
1033        // Issue #808 / 750d — a tombstoned (cancelled) cursor is refused
1034        // with a dedicated reason before the TTL check, so the owner learns
1035        // the stream was cancelled rather than seeing a generic expiry.
1036        if entry.cancelled {
1037            return Err(CursorReject::Cancelled);
1038        }
1039        if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1040            return Err(CursorReject::Expired);
1041        }
1042        Ok(CursorResume {
1043            snapshot_lsn: entry.snapshot_lsn,
1044            query: entry.query.clone(),
1045            entity_types: entry.entity_types.clone(),
1046            capabilities: entry.capabilities.clone(),
1047            expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1048        })
1049    }
1050
1051    /// Visible for tests / audit.
1052    #[doc(hidden)]
1053    pub fn len(&self) -> usize {
1054        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1055    }
1056
1057    #[doc(hidden)]
1058    pub fn is_empty(&self) -> bool {
1059        self.len() == 0
1060    }
1061}
1062
1063/// Incremental SHA-256 hasher over emitted row lines, hex-encoded on
1064/// finalize. The wire contract is that the server hashes the exact
1065/// byte sequence of each row line (without trailing newline) in the
1066/// order the rows are emitted; the client stores the resulting digest
1067/// from the `end` envelope and replays it on resume.
1068#[derive(Debug, Default)]
1069pub struct PrefixHasher {
1070    inner: Option<sha2::Sha256>,
1071    rows: u64,
1072}
1073
1074impl PrefixHasher {
1075    pub fn new() -> Self {
1076        use sha2::Digest;
1077        Self {
1078            inner: Some(sha2::Sha256::new()),
1079            rows: 0,
1080        }
1081    }
1082
1083    pub fn update(&mut self, line: &[u8]) {
1084        use sha2::Digest;
1085        if let Some(h) = self.inner.as_mut() {
1086            h.update(line);
1087        }
1088        self.rows += 1;
1089    }
1090
1091    pub fn rows(&self) -> u64 {
1092        self.rows
1093    }
1094
1095    /// Hex-encoded digest of everything fed so far. Consumes the
1096    /// hasher (a `PrefixHasher` is single-use).
1097    pub fn finalize_hex(mut self) -> String {
1098        use sha2::Digest;
1099        let hasher = self
1100            .inner
1101            .take()
1102            .expect("PrefixHasher::finalize_hex called twice");
1103        let digest = hasher.finalize();
1104        let mut out = String::with_capacity(64);
1105        for b in digest.iter() {
1106            out.push_str(&format!("{b:02x}"));
1107        }
1108        out
1109    }
1110}
1111
1112/// Issue #767 / S8 — audit emission helpers. Each helper builds an
1113/// `AuditEvent` shaped to the brief and forwards it to the runtime's
1114/// audit log. The helpers are intentionally side-effect-only (no
1115/// return); audit emission must never terminate a stream that would
1116/// otherwise succeed.
1117pub fn audit_stream_opened(
1118    runtime: &RedDBRuntime,
1119    lease_handle: &str,
1120    principal: &str,
1121    snapshot_lsn: u64,
1122    query_hash: &str,
1123) {
1124    use crate::json::{Map, Value as JsonValue};
1125    let mut detail = Map::new();
1126    detail.insert(
1127        "lease_handle".to_string(),
1128        JsonValue::String(lease_handle.to_string()),
1129    );
1130    detail.insert(
1131        "snapshot_lsn".to_string(),
1132        JsonValue::Number(snapshot_lsn as f64),
1133    );
1134    detail.insert(
1135        "query_hash".to_string(),
1136        JsonValue::String(query_hash.to_string()),
1137    );
1138    let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1139        .principal(principal)
1140        .resource(lease_handle.to_string())
1141        .outcome(crate::runtime::audit_log::Outcome::Success)
1142        .detail(JsonValue::Object(detail))
1143        .build();
1144    runtime.audit_log().record_event(event);
1145}
1146
1147pub fn audit_stream_closed(
1148    runtime: &RedDBRuntime,
1149    lease_handle: &str,
1150    principal: &str,
1151    reason: CloseReason,
1152    row_count: u64,
1153    bytes_written: u64,
1154) {
1155    use crate::json::{Map, Value as JsonValue};
1156    let mut stats = Map::new();
1157    stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1158    stats.insert(
1159        "bytes_written".to_string(),
1160        JsonValue::Number(bytes_written as f64),
1161    );
1162    let mut detail = Map::new();
1163    detail.insert(
1164        "lease_handle".to_string(),
1165        JsonValue::String(lease_handle.to_string()),
1166    );
1167    detail.insert(
1168        "reason".to_string(),
1169        JsonValue::String(reason.as_str().to_string()),
1170    );
1171    detail.insert("stats".to_string(), JsonValue::Object(stats));
1172    let outcome = match reason {
1173        CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1174        CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1175        _ => crate::runtime::audit_log::Outcome::Error,
1176    };
1177    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1178        .principal(principal)
1179        .resource(lease_handle.to_string())
1180        .outcome(outcome)
1181        .detail(JsonValue::Object(detail))
1182        .build();
1183    runtime.audit_log().record_event(event);
1184}
1185
1186pub fn audit_token_expired_during_lease(
1187    runtime: &RedDBRuntime,
1188    lease_handle: &str,
1189    principal: &str,
1190    token_expiry_ms: u64,
1191) {
1192    use crate::json::{Map, Value as JsonValue};
1193    let mut detail = Map::new();
1194    detail.insert(
1195        "lease_handle".to_string(),
1196        JsonValue::String(lease_handle.to_string()),
1197    );
1198    detail.insert(
1199        "token_expiry".to_string(),
1200        JsonValue::Number(token_expiry_ms as f64),
1201    );
1202    detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1203    let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1204        .principal(principal)
1205        .resource(lease_handle.to_string())
1206        .outcome(crate::runtime::audit_log::Outcome::Success)
1207        .detail(JsonValue::Object(detail))
1208        .build();
1209    runtime.audit_log().record_event(event);
1210}
1211
1212/// Capacity refusal emits a `stream.closed` event with no lease handle
1213/// (the open never produced one) and `reason: capacity_refused`. The
1214/// brief lists capacity_refused alongside the other close reasons; we
1215/// keep the wire shape identical so downstream audit-log consumers
1216/// don't have to special-case it.
1217pub fn audit_stream_capacity_refused(
1218    runtime: &RedDBRuntime,
1219    principal: &str,
1220    code: &str,
1221    limit: usize,
1222    current: usize,
1223) {
1224    use crate::json::{Map, Value as JsonValue};
1225    let mut detail = Map::new();
1226    detail.insert(
1227        "reason".to_string(),
1228        JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1229    );
1230    detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1231    detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1232    detail.insert("current".to_string(), JsonValue::Number(current as f64));
1233    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1234        .principal(principal)
1235        .outcome(crate::runtime::audit_log::Outcome::Denied)
1236        .detail(JsonValue::Object(detail))
1237        .build();
1238    runtime.audit_log().record_event(event);
1239}
1240
1241/// Borrow the lease registry's clock by default — the routing handler
1242/// uses this, tests inject their own.
1243pub fn system_clock() -> Arc<dyn Clock> {
1244    static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1245    Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250    use super::*;
1251
1252    #[test]
1253    fn open_stream_refuses_when_session_has_active_transaction() {
1254        let clock = FakeClock::new(0);
1255        let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1256        assert_eq!(err, OpenStreamError::TransactionActive);
1257        assert_eq!(err.code(), "stream_in_transaction_unsupported");
1258    }
1259
1260    #[test]
1261    fn open_stream_succeeds_when_session_is_autocommit() {
1262        let clock = FakeClock::new(1_700_000_000_000);
1263        let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1264        assert_eq!(lease.snapshot_lsn, 123);
1265        assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1266        assert!(lease.id >= 1);
1267    }
1268
1269    #[test]
1270    fn lease_ids_are_unique_and_monotonic() {
1271        let clock = FakeClock::new(0);
1272        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1273        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1274        assert!(b.id > a.id);
1275    }
1276
1277    #[test]
1278    fn snapshot_expired_uses_injected_clock_and_ttl() {
1279        // TTL fake-clock test: advance past ttl_ms and snapshot_expired
1280        // flips. Acceptance criterion #5.
1281        let clock = FakeClock::new(0);
1282        let mut config = StreamConfig::DEFAULT;
1283        config.snapshot_ttl_ms = 5_000;
1284        let lease = open_stream(config, 0, false, &clock).unwrap();
1285
1286        assert!(!lease.snapshot_expired(clock.now_ms()));
1287        clock.advance(4_999);
1288        assert!(!lease.snapshot_expired(clock.now_ms()));
1289        clock.advance(1);
1290        assert!(lease.snapshot_expired(clock.now_ms()));
1291    }
1292
1293    #[test]
1294    fn stream_config_loads_defaults_when_kv_is_empty() {
1295        // Without runtime, just sanity-check the defaults match ADR 0029.
1296        let cfg = StreamConfig::DEFAULT;
1297        assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1298        assert_eq!(cfg.chunk_default_pages, 4);
1299        assert_eq!(cfg.chunk_min_pages, 1);
1300        assert_eq!(cfg.chunk_max_pages, 64);
1301        assert_eq!(cfg.chunk_max_rows, 1000);
1302        assert_eq!(cfg.chunk_max_latency_ms, 50);
1303        assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1304    }
1305
1306    #[test]
1307    fn stream_config_normalize_clamps_inconsistent_inputs() {
1308        let mut cfg = StreamConfig {
1309            snapshot_ttl_ms: 1,
1310            chunk_default_pages: 100,
1311            chunk_min_pages: 0,
1312            chunk_max_pages: 8,
1313            chunk_max_rows: 0,
1314            chunk_max_latency_ms: 1,
1315            max_global_streams: 0,
1316            max_per_principal_streams: 0,
1317            default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1318        };
1319        cfg.normalize();
1320        assert_eq!(cfg.chunk_min_pages, 1);
1321        assert_eq!(cfg.chunk_max_pages, 8);
1322        assert_eq!(cfg.chunk_default_pages, 8); // clamped down to max
1323        assert!(cfg.chunk_max_rows >= 1);
1324        assert!(cfg.max_global_streams >= 1);
1325        assert!(cfg.max_per_principal_streams >= 1);
1326    }
1327
1328    /// Test sink that accumulates flushed chunks into an interior-mutable
1329    /// `Vec`. Avoids the closure-capture-mutable-then-borrow-immutable
1330    /// dance that the borrow checker rejects when assertions are
1331    /// interleaved with `push_line` calls.
1332    struct CapturingSink {
1333        chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1334    }
1335    impl CapturingSink {
1336        fn new() -> Self {
1337            Self {
1338                chunks: std::cell::RefCell::new(Vec::new()),
1339            }
1340        }
1341        fn len(&self) -> usize {
1342            self.chunks.borrow().len()
1343        }
1344        fn last_len(&self) -> Option<usize> {
1345            self.chunks.borrow().last().map(|c| c.len())
1346        }
1347    }
1348
1349    fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1350        move |bytes: &[u8]| {
1351            sink.chunks.borrow_mut().push(bytes.to_vec());
1352            Ok(())
1353        }
1354    }
1355
1356    #[test]
1357    fn chunk_producer_flushes_on_byte_cap() {
1358        let clock = FakeClock::new(0);
1359        let cfg = StreamConfig {
1360            chunk_default_pages: 1, // 16 KiB
1361            chunk_min_pages: 1,
1362            chunk_max_pages: 1,
1363            chunk_max_rows: 1_000_000,
1364            chunk_max_latency_ms: 1_000_000,
1365            ..StreamConfig::DEFAULT
1366        };
1367        let sink = CapturingSink::new();
1368        let mut producer = ChunkProducer::new(&cfg, &clock);
1369        let mut flush = capture(&sink);
1370
1371        producer
1372            .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1373            .unwrap();
1374        assert_eq!(sink.len(), 0);
1375
1376        let triggered = producer
1377            .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1378            .unwrap();
1379        assert!(triggered);
1380        assert_eq!(sink.len(), 1);
1381        assert_eq!(sink.last_len(), Some(16 * 1024));
1382        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1383    }
1384
1385    #[test]
1386    fn chunk_producer_flushes_on_row_cap() {
1387        let clock = FakeClock::new(0);
1388        let cfg = StreamConfig {
1389            chunk_default_pages: 4, // 64 KiB — well above any test row size
1390            chunk_min_pages: 1,
1391            chunk_max_pages: 64,
1392            chunk_max_rows: 3,
1393            chunk_max_latency_ms: 1_000_000,
1394            ..StreamConfig::DEFAULT
1395        };
1396        let sink = CapturingSink::new();
1397        let mut producer = ChunkProducer::new(&cfg, &clock);
1398        let mut flush = capture(&sink);
1399        let row = b"{\"row\":{}}\n";
1400        producer.push_line(row, &mut flush).unwrap();
1401        producer.push_line(row, &mut flush).unwrap();
1402        assert_eq!(sink.len(), 0);
1403        let triggered = producer.push_line(row, &mut flush).unwrap();
1404        assert!(triggered);
1405        assert_eq!(sink.len(), 1);
1406        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1407    }
1408
1409    #[test]
1410    fn chunk_producer_flushes_on_latency_cap() {
1411        let clock = FakeClock::new(0);
1412        let cfg = StreamConfig {
1413            chunk_default_pages: 4,
1414            chunk_min_pages: 1,
1415            chunk_max_pages: 64,
1416            chunk_max_rows: 1_000_000,
1417            chunk_max_latency_ms: 50,
1418            ..StreamConfig::DEFAULT
1419        };
1420        let sink = CapturingSink::new();
1421        let mut producer = ChunkProducer::new(&cfg, &clock);
1422        let mut flush = capture(&sink);
1423        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1424        assert_eq!(sink.len(), 0);
1425        clock.advance(60);
1426        let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1427        assert!(triggered);
1428        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1429    }
1430
1431    #[test]
1432    fn chunk_producer_finish_emits_terminal_flush() {
1433        let clock = FakeClock::new(0);
1434        let cfg = StreamConfig::DEFAULT;
1435        let sink = CapturingSink::new();
1436        let mut producer = ChunkProducer::new(&cfg, &clock);
1437        let mut flush = capture(&sink);
1438        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1439        producer.finish(&mut flush).unwrap();
1440        assert_eq!(sink.len(), 1);
1441        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1442    }
1443
1444    #[test]
1445    fn write_chunked_helpers_produce_well_formed_chunks() {
1446        let mut buf: Vec<u8> = Vec::new();
1447        write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1448        write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1449        write_chunked_terminator(&mut buf).unwrap();
1450        let text = String::from_utf8(buf).unwrap();
1451        assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1452        assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1453        // 11 bytes in `{"row":{}}\n` → hex 'b'.
1454        assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1455        assert!(text.ends_with("0\r\n\r\n"));
1456    }
1457
1458    // ──────── Issue #761 / S2 — capacity registry ────────
1459
1460    #[test]
1461    fn capacity_registry_global_exhausted_returns_structured_error() {
1462        let reg = StreamCapacityRegistry::new();
1463        let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1464        let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1465        let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1466        assert_eq!(
1467            err,
1468            AcquireError::GlobalExhausted {
1469                limit: 2,
1470                current: 2,
1471            }
1472        );
1473        assert_eq!(err.code(), "server_stream_capacity_exhausted");
1474    }
1475
1476    #[test]
1477    fn capacity_registry_per_principal_exhausted_independent_of_global() {
1478        // Acceptance criterion #2: per-principal cap fires even when
1479        // global has room. Acceptance criterion #3: counters are
1480        // independent across principals.
1481        let reg = StreamCapacityRegistry::new();
1482        let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1483        let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1484        let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1485        assert_eq!(
1486            err,
1487            AcquireError::PrincipalExhausted {
1488                principal: "alice".to_string(),
1489                limit: 2,
1490                current: 2,
1491            }
1492        );
1493        assert_eq!(err.code(), "principal_stream_quota_exhausted");
1494
1495        // Bob is unaffected by Alice's quota.
1496        let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1497        let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1498    }
1499
1500    #[test]
1501    fn capacity_registry_release_frees_both_counters() {
1502        // Acceptance criterion #4: drop releases both counters.
1503        let reg = StreamCapacityRegistry::new();
1504        let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1505        assert!(reg.try_acquire("alice", 1, 1).is_err());
1506        drop(g1);
1507        let (global, per_principal) = reg.snapshot();
1508        assert_eq!(global, 0);
1509        assert!(per_principal.is_empty());
1510        // Slot is now reclaimable.
1511        let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1512    }
1513
1514    #[test]
1515    fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1516        // Acceptance criterion #5: stress coverage. Spawn `THREADS`
1517        // threads each running `ITERS` acquire+release cycles against
1518        // a registry sized to fit only `CAP` slots; the live count
1519        // must never exceed `CAP`, and the registry must return to
1520        // zero once every thread has joined.
1521        use std::sync::atomic::{AtomicUsize, Ordering};
1522
1523        const THREADS: usize = 16;
1524        const ITERS: usize = 200;
1525        const CAP_GLOBAL: usize = 4;
1526        const CAP_PER_PRINCIPAL: usize = 4;
1527
1528        let reg = StreamCapacityRegistry::new();
1529        let observed_max = Arc::new(AtomicUsize::new(0));
1530        let mut handles = Vec::new();
1531        for tid in 0..THREADS {
1532            let reg = Arc::clone(&reg);
1533            let observed_max = Arc::clone(&observed_max);
1534            // Two principals share the global cap, each capped at
1535            // `CAP_PER_PRINCIPAL` themselves.
1536            let principal = format!("p{}", tid % 2);
1537            handles.push(std::thread::spawn(move || {
1538                for _ in 0..ITERS {
1539                    if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1540                        let (live, _) = reg.snapshot();
1541                        observed_max.fetch_max(live, Ordering::SeqCst);
1542                        // Hold the slot just long enough to let other
1543                        // threads race against the cap.
1544                        std::thread::yield_now();
1545                        drop(guard);
1546                    }
1547                }
1548            }));
1549        }
1550        for h in handles {
1551            h.join().unwrap();
1552        }
1553        let (global_after, per_principal_after) = reg.snapshot();
1554        assert_eq!(global_after, 0, "global counter leaked");
1555        assert!(
1556            per_principal_after.is_empty(),
1557            "per-principal map leaked: {per_principal_after:?}"
1558        );
1559        assert!(
1560            observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1561            "global cap was breached: observed {} > {}",
1562            observed_max.load(Ordering::SeqCst),
1563            CAP_GLOBAL
1564        );
1565    }
1566
1567    // ──────── Issue #766 / S7 — resume coordinator ────────
1568
1569    #[test]
1570    fn assess_resumability_accepts_plain_select() {
1571        assert!(assess_resumability("SELECT id, name FROM t"));
1572        assert!(assess_resumability("select * from t where id > 5"));
1573        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1574        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1575        assert!(assess_resumability(
1576            "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1577        ));
1578    }
1579
1580    #[test]
1581    fn assess_resumability_rejects_aggregates_and_unordered() {
1582        assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1583        assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1584        assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1585        assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1586        assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1587        assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1588        assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1589        assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1590        assert!(!assess_resumability(
1591            "SELECT a FROM t JOIN u ON t.id = u.id"
1592        ));
1593    }
1594
1595    #[test]
1596    fn lease_registry_records_and_expires_against_ttl() {
1597        let reg = LeaseRegistry::new();
1598        reg.record(42, 1_000, 5_000);
1599        assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1600        assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1601        assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1602        assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1603    }
1604
1605    #[test]
1606    fn prefix_hasher_is_order_sensitive_and_deterministic() {
1607        let mut a = PrefixHasher::new();
1608        a.update(b"{\"row\":{\"id\":1}}");
1609        a.update(b"{\"row\":{\"id\":2}}");
1610        let hash_a = a.finalize_hex();
1611
1612        let mut b = PrefixHasher::new();
1613        b.update(b"{\"row\":{\"id\":1}}");
1614        b.update(b"{\"row\":{\"id\":2}}");
1615        let hash_b = b.finalize_hex();
1616        assert_eq!(hash_a, hash_b);
1617
1618        let mut c = PrefixHasher::new();
1619        c.update(b"{\"row\":{\"id\":2}}");
1620        c.update(b"{\"row\":{\"id\":1}}");
1621        assert_ne!(hash_a, c.finalize_hex());
1622        assert_eq!(hash_a.len(), 64);
1623    }
1624
1625    // ──────── Issue #767 / S8 — opaque lease handle + audit ────────
1626
1627    #[test]
1628    fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1629        // Acceptance criterion #3 — wire-visible handle is opaque,
1630        // 128-bit, non-sequential. The internal monotonic `id` is
1631        // independent of the handle.
1632        let clock = FakeClock::new(0);
1633        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1634        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1635        assert_eq!(
1636            a.lease_handle.len(),
1637            LEASE_HANDLE_BYTES * 2,
1638            "handle must be 128 bits hex-encoded: {}",
1639            a.lease_handle
1640        );
1641        assert!(
1642            a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1643            "handle must be hex: {}",
1644            a.lease_handle
1645        );
1646        assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1647        // Sanity — internal id remains monotonic, handle is not
1648        // derivable from it (handle bytes don't encode the id).
1649        assert!(b.id > a.id);
1650    }
1651
1652    #[test]
1653    fn generate_lease_handle_produces_high_entropy_distinct_values() {
1654        // Defensive — large sample to surface a hypothetical CSPRNG
1655        // mis-wiring. 1024 draws with zero collisions on 128 bits is
1656        // statistically certain when the source is sound.
1657        let mut seen = std::collections::HashSet::new();
1658        for _ in 0..1024 {
1659            assert!(
1660                seen.insert(generate_lease_handle()),
1661                "duplicate handle in CSPRNG sequence"
1662            );
1663        }
1664    }
1665
1666    #[test]
1667    fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1668        // Synthetic JWT — header.payload.signature. We only consult
1669        // the payload (no signature verification).
1670        // Payload: {"exp":1700000000}
1671        // Base64url of that JSON: eyJleHAiOjE3MDAwMDAwMDB9
1672        let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1673        assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1674    }
1675
1676    #[test]
1677    fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1678        assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1679        assert_eq!(parse_jwt_exp_ms("only.two"), None);
1680        assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1681    }
1682
1683    // ──────── Issue #807 / 750c — cursor registry ────────
1684
1685    fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1686        reg.register(
1687            42,
1688            "acme",
1689            "bearer:abc",
1690            "SELECT id FROM users ORDER BY rid",
1691            None,
1692            None,
1693            now_ms,
1694            1_000,
1695        )
1696    }
1697
1698    #[test]
1699    fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1700        let reg = CursorRegistry::default();
1701        let a = register_default_cursor(&reg, 0);
1702        let b = register_default_cursor(&reg, 0);
1703        assert_eq!(
1704            a.len(),
1705            CURSOR_TOKEN_BYTES * 2,
1706            "token must be 192 bits hex-encoded: {a}"
1707        );
1708        assert!(
1709            a.chars().all(|c| c.is_ascii_hexdigit()),
1710            "token must be hex: {a}"
1711        );
1712        assert_ne!(a, b, "tokens must differ across registrations");
1713        assert_eq!(reg.len(), 2);
1714    }
1715
1716    #[test]
1717    fn cursor_resolves_for_owner_within_ttl() {
1718        // Happy-path resume — same tenant + principal, before TTL.
1719        let reg = CursorRegistry::default();
1720        let token = register_default_cursor(&reg, 1_000);
1721        let resume = reg
1722            .resolve(&token, "acme", "bearer:abc", 1_500)
1723            .expect("live cursor resolves for its owner");
1724        assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1725        assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1726        assert_eq!(resume.expires_at_ms, 2_000);
1727    }
1728
1729    #[test]
1730    fn cursor_rejects_after_ttl_for_owner() {
1731        // TTL expiry — owner sees a distinct `Expired` so they can
1732        // re-issue, but the cursor no longer resolves.
1733        let reg = CursorRegistry::default();
1734        let token = register_default_cursor(&reg, 0);
1735        assert_eq!(
1736            reg.resolve(&token, "acme", "bearer:abc", 1_000),
1737            Err(CursorReject::Expired),
1738            "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1739        );
1740        assert_eq!(
1741            reg.resolve(&token, "acme", "bearer:abc", 5_000),
1742            Err(CursorReject::Expired)
1743        );
1744    }
1745
1746    #[test]
1747    fn cursor_cross_tenant_is_masked_as_not_found() {
1748        // Tenant isolation — a different tenant cannot tell the token
1749        // from one that never existed (no existence leak), even while the
1750        // cursor is still live for its real owner.
1751        let reg = CursorRegistry::default();
1752        let token = register_default_cursor(&reg, 0);
1753        assert_eq!(
1754            reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1755            Err(CursorReject::NotFound),
1756            "cross-tenant resume must mask existence as NotFound"
1757        );
1758        // Still live for the rightful tenant.
1759        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1760    }
1761
1762    #[test]
1763    fn cursor_cross_principal_is_masked_as_not_found() {
1764        // Principal isolation — same tenant, different principal is also
1765        // masked as NotFound rather than leaking a permission error.
1766        let reg = CursorRegistry::default();
1767        let token = register_default_cursor(&reg, 0);
1768        assert_eq!(
1769            reg.resolve(&token, "acme", "bearer:other", 100),
1770            Err(CursorReject::NotFound),
1771            "cross-principal resume must mask existence as NotFound"
1772        );
1773        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1774    }
1775
1776    #[test]
1777    fn cursor_unknown_token_is_not_found() {
1778        let reg = CursorRegistry::default();
1779        assert!(reg.is_empty());
1780        assert_eq!(
1781            reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1782            Err(CursorReject::NotFound)
1783        );
1784    }
1785
1786    #[test]
1787    fn cursor_scope_is_checked_before_expiry() {
1788        // An unauthorized caller hitting an *expired* token must still see
1789        // NotFound, never Expired — Expired would confirm the token once
1790        // existed. Scope precedes expiry in `resolve`.
1791        let reg = CursorRegistry::default();
1792        let token = register_default_cursor(&reg, 0);
1793        assert_eq!(
1794            reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1795            Err(CursorReject::NotFound),
1796            "expired + wrong scope must mask as NotFound, not Expired"
1797        );
1798    }
1799
1800    // ──────── Issue #808 / 750d — cancel + tombstone ────────
1801
1802    #[test]
1803    fn cancel_tombstones_cursor_and_raises_token() {
1804        // Cancelling a live cursor raises its shared token (so an in-flight
1805        // executor observing it stops) and tombstones the entry.
1806        let reg = CursorRegistry::default();
1807        let token = register_default_cursor(&reg, 0);
1808        let live = reg
1809            .cancel_token_for(&token)
1810            .expect("freshly-minted cursor exposes its token");
1811        assert!(!live.is_cancelled(), "token starts un-cancelled");
1812
1813        let returned = reg
1814            .cancel(&token, "acme", "bearer:abc")
1815            .expect("owner cancels its own cursor");
1816        assert!(returned.is_cancelled(), "cancel raises the returned token");
1817        assert!(
1818            live.is_cancelled(),
1819            "the handler-held clone observes the cancel"
1820        );
1821    }
1822
1823    #[test]
1824    fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1825        // A tombstoned cursor refuses resume with a dedicated `Cancelled`
1826        // reason for its owner — distinct from Expired so the client learns
1827        // the stream was cancelled, not aged out.
1828        let reg = CursorRegistry::default();
1829        let token = register_default_cursor(&reg, 0);
1830        reg.cancel(&token, "acme", "bearer:abc")
1831            .expect("owner cancels");
1832        assert_eq!(
1833            reg.resolve(&token, "acme", "bearer:abc", 100),
1834            Err(CursorReject::Cancelled),
1835            "owner resuming a cancelled cursor sees Cancelled"
1836        );
1837    }
1838
1839    #[test]
1840    fn cancel_is_idempotent() {
1841        let reg = CursorRegistry::default();
1842        let token = register_default_cursor(&reg, 0);
1843        assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1844        // Second cancel still succeeds (already tombstoned) and stays cancelled.
1845        let second = reg
1846            .cancel(&token, "acme", "bearer:abc")
1847            .expect("re-cancel is a no-op success");
1848        assert!(second.is_cancelled());
1849    }
1850
1851    #[test]
1852    fn cancel_cross_tenant_is_masked_as_not_found() {
1853        // An unauthorized caller cannot cancel — or confirm the existence
1854        // of — a foreign cursor.
1855        let reg = CursorRegistry::default();
1856        let token = register_default_cursor(&reg, 0);
1857        assert!(
1858            matches!(
1859                reg.cancel(&token, "evil-corp", "bearer:abc"),
1860                Err(CursorReject::NotFound)
1861            ),
1862            "cross-tenant cancel must mask existence as NotFound"
1863        );
1864        // The rightful owner's cursor is untouched — still resumable.
1865        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1866    }
1867
1868    #[test]
1869    fn cancel_cross_principal_is_masked_as_not_found() {
1870        let reg = CursorRegistry::default();
1871        let token = register_default_cursor(&reg, 0);
1872        assert!(
1873            matches!(
1874                reg.cancel(&token, "acme", "bearer:other"),
1875                Err(CursorReject::NotFound)
1876            ),
1877            "cross-principal cancel must mask existence as NotFound"
1878        );
1879        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1880    }
1881
1882    #[test]
1883    fn cancel_unknown_token_is_not_found() {
1884        let reg = CursorRegistry::default();
1885        assert!(matches!(
1886            reg.cancel("deadbeef", "acme", "bearer:abc"),
1887            Err(CursorReject::NotFound)
1888        ));
1889    }
1890
1891    #[test]
1892    fn generate_cursor_token_produces_high_entropy_distinct_values() {
1893        let mut seen = std::collections::HashSet::new();
1894        for _ in 0..1024 {
1895            assert!(
1896                seen.insert(generate_cursor_token()),
1897                "duplicate cursor token in CSPRNG sequence"
1898            );
1899        }
1900    }
1901
1902    #[test]
1903    fn close_reason_as_str_covers_every_state_transition() {
1904        // Acceptance criterion #5 — every state transition in the
1905        // brief is representable in the audit close-reason taxonomy.
1906        for (variant, expected) in [
1907            (CloseReason::Ok, "ok"),
1908            (CloseReason::Cancelled, "cancelled"),
1909            (CloseReason::Error, "error"),
1910            (CloseReason::SnapshotExpired, "snapshot_expired"),
1911            (CloseReason::CapacityRefused, "capacity_refused"),
1912            (CloseReason::IntegrityFailed, "integrity_failed"),
1913        ] {
1914            assert_eq!(variant.as_str(), expected);
1915        }
1916    }
1917
1918    #[test]
1919    fn stream_config_defaults_carry_s2_caps() {
1920        assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1921        assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1922    }
1923
1924    // ──────── Issue #768 / S9 — pull-based driver ────────
1925
1926    /// Sink that records every flushed chunk's length so a test can
1927    /// assert the resident working set stayed bounded.
1928    struct SizeSink {
1929        sizes: std::cell::RefCell<Vec<usize>>,
1930    }
1931    impl SizeSink {
1932        fn new() -> Self {
1933            Self {
1934                sizes: std::cell::RefCell::new(Vec::new()),
1935            }
1936        }
1937        fn flushes(&self) -> usize {
1938            self.sizes.borrow().len()
1939        }
1940        fn max_chunk(&self) -> usize {
1941            self.sizes.borrow().iter().copied().max().unwrap_or(0)
1942        }
1943    }
1944    fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1945        move |bytes: &[u8]| {
1946            sink.sizes.borrow_mut().push(bytes.len());
1947            Ok(())
1948        }
1949    }
1950
1951    #[test]
1952    fn drive_lines_streams_large_source_with_bounded_working_set() {
1953        // Acceptance #1: a huge scan flows through the chunk buffer
1954        // without ever materialising the full result set. The source
1955        // is a *lazy* range mapped to records — collecting it would
1956        // allocate N rows, but `drive_lines` pulls one at a time. We
1957        // assert every flushed chunk stays within a small multiple of
1958        // the page buffer, i.e. memory tracks the buffer, not N.
1959        let clock = FakeClock::new(0);
1960        let cfg = StreamConfig {
1961            chunk_default_pages: 1, // 16 KiB buffer
1962            chunk_min_pages: 1,
1963            chunk_max_pages: 1,
1964            chunk_max_rows: 1_000_000, // don't let the row cap dominate
1965            chunk_max_latency_ms: 1_000_000,
1966            ..StreamConfig::DEFAULT
1967        };
1968        let sink = SizeSink::new();
1969        let mut producer = ChunkProducer::new(&cfg, &clock);
1970        let mut flush = size_capture(&sink);
1971
1972        const N: u64 = 1_000_000;
1973        // Lazy source: never collected into a Vec.
1974        let source = 0..N;
1975        let consumed = producer
1976            .drive_lines(
1977                source,
1978                |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1979                &mut flush,
1980            )
1981            .unwrap();
1982        producer.finish(&mut flush).unwrap();
1983
1984        assert_eq!(consumed, N);
1985        assert_eq!(producer.total_rows(), N);
1986        // Many flushes occurred (streamed), not one giant buffer.
1987        assert!(
1988            sink.flushes() > 1000,
1989            "expected the source to stream across many chunks, saw {}",
1990            sink.flushes()
1991        );
1992        // No chunk exceeded the byte cap plus one trailing line — the
1993        // resident buffer is bounded independent of N.
1994        let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
1995        assert!(
1996            sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
1997            "chunk {} exceeded bounded working set {}",
1998            sink.max_chunk(),
1999            cfg.production_buffer_bytes() + max_line
2000        );
2001    }
2002
2003    #[test]
2004    fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2005        // Acceptance #2: first-row latency is bounded by the latency
2006        // cap, not by full materialisation. The source yields rows
2007        // whose pull advances the fake clock; the first chunk must
2008        // flush as soon as the latency window elapses, long before the
2009        // (large) source is exhausted.
2010        let clock = FakeClock::new(0);
2011        let cfg = StreamConfig {
2012            chunk_default_pages: 64, // large byte cap — won't trip first
2013            chunk_min_pages: 1,
2014            chunk_max_pages: 64,
2015            chunk_max_rows: 1_000_000, // large row cap — won't trip first
2016            chunk_max_latency_ms: 50,
2017            ..StreamConfig::DEFAULT
2018        };
2019        let sink = SizeSink::new();
2020        let mut producer = ChunkProducer::new(&cfg, &clock);
2021
2022        // Drive manually so we can advance the clock between pulls and
2023        // observe the first flush. Each pull advances 20 ms; the 50 ms
2024        // latency cap trips on the third row.
2025        let mut first_flush_after: Option<u64> = None;
2026        let mut row = 0u64;
2027        while row < 1_000_000 {
2028            let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2029            clock.advance(20);
2030            let mut flush = size_capture(&sink);
2031            let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2032            row += 1;
2033            if flushed {
2034                first_flush_after = Some(row);
2035                break;
2036            }
2037        }
2038
2039        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2040        let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2041        assert!(
2042            rows_before_flush <= 4,
2043            "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2044        );
2045        // Crucially, the source was nowhere near drained (1e6 rows).
2046        assert!(rows_before_flush < 1_000_000);
2047    }
2048
2049    #[test]
2050    fn drive_lines_parity_with_manual_push_line() {
2051        // The driver must produce byte-identical output to hand-rolled
2052        // push_line calls — the chunk producer's framing is unchanged.
2053        let clock = FakeClock::new(0);
2054        let cfg = StreamConfig::DEFAULT;
2055
2056        let lines: Vec<Vec<u8>> = (0..50)
2057            .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2058            .collect();
2059
2060        let driven = CapturingSink::new();
2061        {
2062            let mut p = ChunkProducer::new(&cfg, &clock);
2063            let mut flush = capture(&driven);
2064            p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2065                .unwrap();
2066            p.finish(&mut flush).unwrap();
2067        }
2068
2069        let manual = CapturingSink::new();
2070        {
2071            let mut p = ChunkProducer::new(&cfg, &clock);
2072            let mut flush = capture(&manual);
2073            for l in &lines {
2074                p.push_line(l, &mut flush).unwrap();
2075            }
2076            p.finish(&mut flush).unwrap();
2077        }
2078
2079        assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2080    }
2081}