Skip to main content

reddb_server/server/
output_stream.rs

1//! Output streaming primitives for issue #760 (PRD #759 / ADR 0029) — the
2//! "shim slice" of bidirectional streaming.
3//!
4//! Scope of this module:
5//!   - [`StreamConfig`]: capture the `stream.*` namespace from `red_config`
6//!     at lease open and freeze it for the lease's lifetime (acceptance
7//!     criterion: KV mutations mid-stream do not retroactively change
8//!     behaviour).
9//!   - [`StreamLease`]: an internal, unforwarded handle bound to a
10//!     snapshot LSN and a frozen config. No external surface yet (S2 will
11//!     add quotas + per-principal accounting).
12//!   - [`open_stream`]: refuses with `stream_in_transaction_unsupported`
13//!     when the caller already has an active `BEGIN` on the session.
14//!   - [`ChunkProducer`]: page-aligned (N × 16 KiB) production buffer
15//!     that flushes on the first of byte / row / latency cap.
16//!   - [`Clock`]: trait-injected time source so TTL expiry is testable.
17//!
18//! The HTTP NDJSON wire framing built on top of these primitives lives in
19//! `handlers_query::handle_query_ndjson_stream` and is dispatched from
20//! `routing::try_route_streaming`.
21
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33/// Engine page size — the production buffer is always a multiple of this.
34/// Matches `storage::engine::PAGE_SIZE`.
35pub const PAGE_SIZE: usize = 16 * 1024;
36
37/// Injectable time source. Production code uses [`SystemClock`]; tests
38/// drive TTL expiry with [`FakeClock`] so they don't depend on wall time.
39pub trait Clock: Send + Sync {
40    fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47    fn now_ms(&self) -> u64 {
48        SystemTime::now()
49            .duration_since(UNIX_EPOCH)
50            .map(|d| d.as_millis() as u64)
51            .unwrap_or(0)
52    }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57    now_ms: AtomicU64,
58}
59
60impl FakeClock {
61    pub fn new(now_ms: u64) -> Self {
62        Self {
63            now_ms: AtomicU64::new(now_ms),
64        }
65    }
66
67    pub fn advance(&self, ms: u64) {
68        self.now_ms.fetch_add(ms, Ordering::SeqCst);
69    }
70}
71
72impl Clock for FakeClock {
73    fn now_ms(&self) -> u64 {
74        self.now_ms.load(Ordering::SeqCst)
75    }
76}
77
78/// Frozen snapshot of the `stream.*` namespace at lease open. Acceptance
79/// criterion: a `red_config` mutation while a stream is running does not
80/// retroactively change the running stream's behaviour — that is, the
81/// per-lease config is value-typed and not a back-reference.
82#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84    pub snapshot_ttl_ms: u64,
85    pub chunk_default_pages: usize,
86    pub chunk_min_pages: usize,
87    pub chunk_max_pages: usize,
88    pub chunk_max_rows: usize,
89    pub chunk_max_latency_ms: u64,
90    /// Process-wide concurrent stream cap (issue #761 / S2).
91    /// Acquiring the (N+1)th slot is refused with
92    /// `server_stream_capacity_exhausted`.
93    pub max_global_streams: usize,
94    /// Per-principal concurrent stream cap (issue #761 / S2).
95    /// Acquiring the (M+1)th slot for a single principal is refused
96    /// with `principal_stream_quota_exhausted` even when the global
97    /// counter still has room.
98    pub max_per_principal_streams: usize,
99    /// Issue #765 / S6 — default end-to-end verification mode for input
100    /// streams when the open frame does not request one
101    /// (`stream.integrity.default_verify`, `"none"` per ADR 0029).
102    pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106    fn default() -> Self {
107        Self::DEFAULT
108    }
109}
110
111impl StreamConfig {
112    /// Defaults match ADR 0029. `snapshot_ttl_ms` is the only key
113    /// observable mid-stream; the chunk caps only affect framing.
114    pub const DEFAULT: StreamConfig = StreamConfig {
115        snapshot_ttl_ms: 60_000,
116        chunk_default_pages: 4,
117        chunk_min_pages: 1,
118        chunk_max_pages: 64,
119        chunk_max_rows: 1000,
120        chunk_max_latency_ms: 50,
121        max_global_streams: 256,
122        max_per_principal_streams: 32,
123        default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124    };
125
126    /// Read every `stream.*` key from `red_config` over the runtime's KV
127    /// store. Missing keys fall back to [`Self::DEFAULT`]. Unparseable or
128    /// out-of-range values fall back silently — bad config never gets to
129    /// terminate a request that would otherwise succeed.
130    pub fn load(runtime: &RedDBRuntime) -> Self {
131        let db = runtime.db();
132        let read_u64 = |key: &str| -> Option<u64> {
133            match db.get_kv(RED_CONFIG_COLLECTION, key) {
134                Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135                Some((Value::UnsignedInteger(v), _)) => Some(v),
136                Some((Value::Text(text), _)) => text.parse().ok(),
137                _ => None,
138            }
139        };
140
141        let mut cfg = Self::DEFAULT;
142        if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143            cfg.snapshot_ttl_ms = v;
144        }
145        if let Some(v) = read_u64("stream.chunk.default_pages") {
146            cfg.chunk_default_pages = v as usize;
147        }
148        if let Some(v) = read_u64("stream.chunk.min_pages") {
149            cfg.chunk_min_pages = v as usize;
150        }
151        if let Some(v) = read_u64("stream.chunk.max_pages") {
152            cfg.chunk_max_pages = v as usize;
153        }
154        if let Some(v) = read_u64("stream.chunk.max_rows") {
155            cfg.chunk_max_rows = v as usize;
156        }
157        if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158            cfg.chunk_max_latency_ms = v;
159        }
160        if let Some(v) = read_u64("stream.max_global") {
161            cfg.max_global_streams = v as usize;
162        }
163        if let Some(v) = read_u64("stream.max_per_principal") {
164            cfg.max_per_principal_streams = v as usize;
165        }
166        // Issue #765 / S6 — opt-in integrity default. Stored as a Text key
167        // (`"none"` / `"sha256"`); anything else falls back to `None`.
168        if let Some((Value::Text(text), _)) =
169            db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170        {
171            cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172        }
173        cfg.normalize();
174        cfg
175    }
176
177    /// Clamp interrelated fields to a self-consistent state. Floors come
178    /// from ADR 0029 ("hard floor 1 page"); ceilings prevent zero-row /
179    /// zero-byte caps that would prevent the producer from ever flushing.
180    fn normalize(&mut self) {
181        if self.chunk_min_pages == 0 {
182            self.chunk_min_pages = 1;
183        }
184        if self.chunk_max_pages < self.chunk_min_pages {
185            self.chunk_max_pages = self.chunk_min_pages;
186        }
187        if self.chunk_default_pages < self.chunk_min_pages {
188            self.chunk_default_pages = self.chunk_min_pages;
189        }
190        if self.chunk_default_pages > self.chunk_max_pages {
191            self.chunk_default_pages = self.chunk_max_pages;
192        }
193        if self.chunk_max_rows == 0 {
194            self.chunk_max_rows = 1;
195        }
196        if self.max_global_streams == 0 {
197            self.max_global_streams = 1;
198        }
199        if self.max_per_principal_streams == 0 {
200            self.max_per_principal_streams = 1;
201        }
202    }
203
204    /// Page-aligned production buffer size in bytes. Acceptance criterion:
205    /// "production buffer is always N × 16 KiB".
206    pub fn production_buffer_bytes(&self) -> usize {
207        self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208    }
209}
210
211/// Monotonic, process-local lease id. The id is internal — the bearer
212/// token still authenticates the open, the lease only identifies the
213/// stream for audit and termination (ADR 0029 "Authorization").
214///
215/// Issue #767 / S8 — the internal id is **never** forwarded on the
216/// wire. The client receives [`StreamLease::lease_handle`], a 128-bit
217/// opaque random value, in `OpenAck` / the `{"end": …}` envelope.
218static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220/// Issue #767 / S8 — wire-visible lease handle, 128 bits drawn from
221/// the OS CSPRNG. Hex-encoded so it survives JSON / header transport.
222/// Opacity property: a client cannot derive the internal lease id
223/// from the handle (the two are independent), and the handle is not
224/// sequential across opens.
225pub const LEASE_HANDLE_BYTES: usize = 16;
226
227/// Generate an opaque 128-bit lease handle (32-char hex). Per ADR 0029,
228/// the handle is the only identifier the wire sees; the internal
229/// `StreamLease::id` stays server-side.
230pub fn generate_lease_handle() -> String {
231    let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233        // CSPRNG failure is exceedingly rare on supported platforms;
234        // fall back to a high-entropy mix so the open path never
235        // terminates a stream that would otherwise succeed. The
236        // fallback is *not* secure unguessable — production paths
237        // will hit /dev/urandom successfully.
238        let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239        let now = crate::utils::now_unix_nanos().to_le_bytes();
240        bytes[..8].copy_from_slice(&lo);
241        bytes[8..].copy_from_slice(&now);
242    }
243    crate::utils::to_hex(&bytes)
244}
245
246/// Issue #767 / S8 — non-validating JWT `exp` claim extractor. Returns
247/// the expiry time in milliseconds since the epoch when `token` is a
248/// JWT carrying an integer `exp` claim, otherwise `None`.
249///
250/// The extractor does **not** validate the JWT signature, issuer, or
251/// audience — those gates run during `is_authorized` at OpenStream.
252/// This helper is consulted *after* the bearer has been accepted, to
253/// decide whether the lease will outlive the bearer credential and an
254/// audit event should be emitted accordingly.
255pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256    let parts: Vec<&str> = token.split('.').collect();
257    if parts.len() != 3 {
258        return None;
259    }
260    let payload = base64url_decode_padded(parts[1])?;
261    let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262    let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263    if exp_secs <= 0 {
264        return None;
265    }
266    Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270    let mut s = String::with_capacity(input.len() + 4);
271    for ch in input.chars() {
272        match ch {
273            '-' => s.push('+'),
274            '_' => s.push('/'),
275            _ => s.push(ch),
276        }
277    }
278    while !s.len().is_multiple_of(4) {
279        s.push('=');
280    }
281    crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284/// Issue #767 / S8 — wire-visible reason for `stream.closed`. The
285/// audit emit site decides the variant once the stream's exit shape
286/// has been observed.
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289    Ok,
290    Cancelled,
291    Error,
292    SnapshotExpired,
293    CapacityRefused,
294    IntegrityFailed,
295}
296
297impl CloseReason {
298    pub fn as_str(&self) -> &'static str {
299        match self {
300            CloseReason::Ok => "ok",
301            CloseReason::Cancelled => "cancelled",
302            CloseReason::Error => "error",
303            CloseReason::SnapshotExpired => "snapshot_expired",
304            CloseReason::CapacityRefused => "capacity_refused",
305            CloseReason::IntegrityFailed => "integrity_failed",
306        }
307    }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312    pub id: u64,
313    /// Issue #767 / S8 — opaque handle handed to the client. 32-char
314    /// hex (128 bits of CSPRNG output). The internal `id` is not
315    /// derivable from this value.
316    pub lease_handle: String,
317    pub snapshot_lsn: u64,
318    pub opened_at_ms: u64,
319    pub config: StreamConfig,
320}
321
322impl StreamLease {
323    /// `true` once `snapshot_ttl_ms` has elapsed since the lease was opened.
324    /// The shim slice materialises the result first, so expiry in practice
325    /// only fires for streams that take longer than `ttl_ms` to drain to
326    /// the client. The check is wired in so the wire envelope can carry
327    /// `snapshot_expired` exactly the way later slices (pull-based
328    /// executors) will need it.
329    pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330        now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331    }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336    /// Acceptance criterion #4: `OpenStream` against a session with an
337    /// active `BEGIN` is refused with this code. The wire shape carries
338    /// it back as `{"error": {"code": "stream_in_transaction_unsupported"}}`.
339    TransactionActive,
340}
341
342impl OpenStreamError {
343    pub fn code(&self) -> &'static str {
344        match self {
345            OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346        }
347    }
348
349    pub fn message(&self) -> &'static str {
350        match self {
351            OpenStreamError::TransactionActive => {
352                "cannot open output stream while a transaction is active on this session"
353            }
354        }
355    }
356}
357
358/// Issue a lease. The caller is responsible for binding `snapshot_lsn`
359/// to the same MVCC view the underlying executor will read from; in the
360/// shim slice this is `runtime.cdc_current_lsn()` captured before
361/// `execute_query`.
362pub fn open_stream(
363    config: StreamConfig,
364    snapshot_lsn: u64,
365    in_transaction: bool,
366    clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368    if in_transaction {
369        return Err(OpenStreamError::TransactionActive);
370    }
371    Ok(StreamLease {
372        id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373        lease_handle: generate_lease_handle(),
374        snapshot_lsn,
375        opened_at_ms: clock.now_ms(),
376        config,
377    })
378}
379
380/// Page-aligned chunk producer. The producer accumulates byte-encoded
381/// rows in an N × 16 KiB buffer; on the first of byte / row / latency
382/// cap it forwards the buffer to the supplied flush closure, which the
383/// transport layer turns into a chunked-encoding frame.
384///
385/// The struct does not know about HTTP, NDJSON, or chunked transfer —
386/// it is wire-agnostic so the gRPC and RedWire paths can reuse it.
387pub struct ChunkProducer<'a> {
388    buf: Vec<u8>,
389    rows: usize,
390    window_started_ms: u64,
391    cap_bytes: usize,
392    cap_rows: usize,
393    cap_latency_ms: u64,
394    clock: &'a dyn Clock,
395    total_flushes: u64,
396    total_bytes: u64,
397    total_rows: u64,
398    last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403    Byte,
404    Row,
405    Latency,
406    Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410    pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411        let cap_bytes = config.production_buffer_bytes();
412        Self {
413            buf: Vec::with_capacity(cap_bytes),
414            rows: 0,
415            window_started_ms: clock.now_ms(),
416            cap_bytes,
417            cap_rows: config.chunk_max_rows,
418            cap_latency_ms: config.chunk_max_latency_ms,
419            clock,
420            total_flushes: 0,
421            total_bytes: 0,
422            total_rows: 0,
423            last_flush_reason: None,
424        }
425    }
426
427    /// Append one already-encoded line (NDJSON: bytes + `\n`). Returns
428    /// `true` if the append triggered a flush.
429    pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430    where
431        F: FnMut(&[u8]) -> std::io::Result<()>,
432    {
433        self.buf.extend_from_slice(line);
434        self.rows += 1;
435        self.total_rows += 1;
436
437        if self.buf.len() >= self.cap_bytes {
438            self.flush(flush, FlushReason::Byte)?;
439            return Ok(true);
440        }
441        if self.rows >= self.cap_rows {
442            self.flush(flush, FlushReason::Row)?;
443            return Ok(true);
444        }
445        let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446        if elapsed >= self.cap_latency_ms {
447            self.flush(flush, FlushReason::Latency)?;
448            return Ok(true);
449        }
450        Ok(false)
451    }
452
453    /// Issue #768 / S9 — drive a pull-based line source into the
454    /// production buffer. The producer *pulls* one encoded line at a
455    /// time from `source` and routes it through [`push_line`], so the
456    /// resident working set is the page-aligned buffer plus the single
457    /// line currently in hand — never the full result set. Pair this
458    /// with the pull-based scan iterators
459    /// (`parallel_scan::parallel_scan_rows`,
460    /// `bitmap_scan::execute_bitmap_scan_stream`) whose records are
461    /// encoded lazily by `encode`.
462    ///
463    /// Returns the number of lines consumed. Flush caps (byte / row /
464    /// latency) fire mid-drain exactly as they would for hand-driven
465    /// [`push_line`] calls, so first-line latency stays bounded by
466    /// `chunk.max_latency_ms` regardless of how many lines the source
467    /// will ultimately yield.
468    ///
469    /// [`push_line`]: ChunkProducer::push_line
470    pub fn drive_lines<S, R, Enc, F>(
471        &mut self,
472        source: S,
473        mut encode: Enc,
474        flush: &mut F,
475    ) -> std::io::Result<u64>
476    where
477        S: IntoIterator<Item = R>,
478        Enc: FnMut(&R) -> Vec<u8>,
479        F: FnMut(&[u8]) -> std::io::Result<()>,
480    {
481        let mut count = 0u64;
482        for record in source {
483            let line = encode(&record);
484            self.push_line(&line, flush)?;
485            count += 1;
486        }
487        Ok(count)
488    }
489
490    /// Force-flush any buffered bytes — used after the final NDJSON line
491    /// (`{"end": …}`) to push the tail of the buffer before closing the
492    /// connection.
493    pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494    where
495        F: FnMut(&[u8]) -> std::io::Result<()>,
496    {
497        if !self.buf.is_empty() {
498            self.flush(flush, FlushReason::Terminal)?;
499        }
500        Ok(())
501    }
502
503    fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504    where
505        F: FnMut(&[u8]) -> std::io::Result<()>,
506    {
507        flush(&self.buf)?;
508        self.total_bytes += self.buf.len() as u64;
509        self.total_flushes += 1;
510        self.last_flush_reason = Some(reason);
511        self.buf.clear();
512        self.rows = 0;
513        self.window_started_ms = self.clock.now_ms();
514        Ok(())
515    }
516
517    pub fn total_flushes(&self) -> u64 {
518        self.total_flushes
519    }
520    pub fn total_bytes(&self) -> u64 {
521        self.total_bytes
522    }
523    pub fn total_rows(&self) -> u64 {
524        self.total_rows
525    }
526    pub fn last_flush_reason(&self) -> Option<FlushReason> {
527        self.last_flush_reason
528    }
529}
530
531/// HTTP chunked transfer encoding helpers. We do not pull in `hyper` for
532/// the streaming path; the existing SSE handler also hand-rolls its own
533/// HTTP framing, and matching that style keeps the diff narrow.
534pub fn write_chunked_response_header<W: std::io::Write>(
535    writer: &mut W,
536    status: u16,
537    content_type: &str,
538) -> std::io::Result<()> {
539    // Streaming responses write their own head here instead of going
540    // through `HttpResponse::to_http_bytes`, so they must repeat the
541    // same permissive CORS posture — otherwise a browser that can call
542    // the JSON endpoints cross-origin would still be blocked on the
543    // NDJSON/SSE streaming routes (graph/vector results). API auth is
544    // by header/API key, never cookies, so wildcard origin is safe and
545    // Allow-Credentials is deliberately never sent.
546    let mut header = format!(
547        "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n",
548        status,
549        crate::server::transport::status_text(status),
550        content_type,
551    );
552    // Same permissive CORS posture as the buffered path — sourced from
553    // the single `CORS_HEADER_PAIRS` choke point so streaming routes
554    // (graph/vector NDJSON results) are reachable cross-origin too.
555    for (name, value) in crate::server::transport::CORS_HEADER_PAIRS {
556        header.push_str(name);
557        header.push_str(": ");
558        header.push_str(value);
559        header.push_str("\r\n");
560    }
561    header.push_str("\r\n");
562    writer.write_all(header.as_bytes())?;
563    writer.flush()
564}
565
566/// Emit one HTTP chunk (`<hex-size>\r\n<bytes>\r\n`). A zero-length
567/// payload is silently dropped — the terminator chunk lives in
568/// [`write_chunked_terminator`].
569pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
570    if bytes.is_empty() {
571        return Ok(());
572    }
573    let size = format!("{:x}\r\n", bytes.len());
574    writer.write_all(size.as_bytes())?;
575    writer.write_all(bytes)?;
576    writer.write_all(b"\r\n")?;
577    writer.flush()
578}
579
580/// Final `0\r\n\r\n` chunk that terminates a chunked body.
581pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
582    writer.write_all(b"0\r\n\r\n")?;
583    writer.flush()
584}
585
586/// Issue #761 / S2 — process-wide stream capacity registry. Holds two
587/// counters: a global concurrent-stream count and a per-principal map.
588/// Both are decremented when the [`StreamCapacityGuard`] handed back
589/// from a successful `try_acquire` is dropped, so the release path
590/// covers every normal exit (success, mid-stream error, snapshot
591/// expiry, client disconnect that drops the writer chain, panic
592/// unwind through the stack frame holding the guard).
593#[derive(Debug, Default)]
594pub struct StreamCapacityRegistry {
595    inner: Mutex<CapacityInner>,
596}
597
598#[derive(Debug, Default)]
599struct CapacityInner {
600    global_count: usize,
601    per_principal: HashMap<String, usize>,
602}
603
604/// Failure modes of [`StreamCapacityRegistry::try_acquire`]. Each
605/// variant carries the cap that fired and the live counter value at
606/// refusal time so clients can back off intelligently (the HTTP layer
607/// surfaces these inside the structured 429 body).
608#[derive(Debug, Clone, PartialEq, Eq)]
609pub enum AcquireError {
610    /// `stream.max_global` exceeded. Per acceptance criterion #1.
611    GlobalExhausted { limit: usize, current: usize },
612    /// `stream.max_per_principal` exceeded for `principal`. Per
613    /// acceptance criterion #2. The principal is surfaced verbatim;
614    /// callers must escape it on the wire.
615    PrincipalExhausted {
616        principal: String,
617        limit: usize,
618        current: usize,
619    },
620}
621
622impl AcquireError {
623    pub fn code(&self) -> &'static str {
624        match self {
625            AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
626            AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
627        }
628    }
629
630    pub fn message(&self) -> String {
631        match self {
632            AcquireError::GlobalExhausted { limit, current } => {
633                format!("server stream capacity exhausted (limit {limit}, current {current})")
634            }
635            AcquireError::PrincipalExhausted {
636                principal,
637                limit,
638                current,
639            } => format!(
640                "principal {principal} stream quota exhausted (limit {limit}, current {current})"
641            ),
642        }
643    }
644}
645
646impl StreamCapacityRegistry {
647    pub fn new() -> Arc<Self> {
648        Arc::new(Self::default())
649    }
650
651    /// Attempt to acquire one slot. Both caps are checked under the
652    /// same lock, so a concurrent acquire+release pair cannot over-
653    /// issue beyond either ceiling (acceptance criterion #5).
654    pub fn try_acquire(
655        self: &Arc<Self>,
656        principal: &str,
657        max_global: usize,
658        max_per_principal: usize,
659    ) -> Result<StreamCapacityGuard, AcquireError> {
660        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
661        if inner.global_count >= max_global {
662            return Err(AcquireError::GlobalExhausted {
663                limit: max_global,
664                current: inner.global_count,
665            });
666        }
667        let current = inner.per_principal.get(principal).copied().unwrap_or(0);
668        if current >= max_per_principal {
669            return Err(AcquireError::PrincipalExhausted {
670                principal: principal.to_string(),
671                limit: max_per_principal,
672                current,
673            });
674        }
675        inner.global_count += 1;
676        inner
677            .per_principal
678            .insert(principal.to_string(), current + 1);
679        Ok(StreamCapacityGuard {
680            registry: Arc::clone(self),
681            principal: principal.to_string(),
682            released: false,
683        })
684    }
685
686    fn release(&self, principal: &str) {
687        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
688        if inner.global_count > 0 {
689            inner.global_count -= 1;
690        }
691        if let Some(count) = inner.per_principal.get_mut(principal) {
692            if *count > 0 {
693                *count -= 1;
694            }
695            if *count == 0 {
696                inner.per_principal.remove(principal);
697            }
698        }
699    }
700
701    /// Visible for tests and audit handlers.
702    pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
703        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
704        (inner.global_count, inner.per_principal.clone())
705    }
706}
707
708/// RAII slot returned by [`StreamCapacityRegistry::try_acquire`].
709/// Decrements both counters on drop — acceptance criterion #4
710/// ("releasing a slot on stream end (any reason) decrements both
711/// counters atomically").
712#[must_use = "dropping the guard immediately releases the stream slot"]
713#[derive(Debug)]
714pub struct StreamCapacityGuard {
715    registry: Arc<StreamCapacityRegistry>,
716    principal: String,
717    released: bool,
718}
719
720impl StreamCapacityGuard {
721    pub fn principal(&self) -> &str {
722        &self.principal
723    }
724}
725
726impl Drop for StreamCapacityGuard {
727    fn drop(&mut self) {
728        if !self.released {
729            self.registry.release(&self.principal);
730            self.released = true;
731        }
732    }
733}
734
735// ──────── Issue #766 / S7 — resume coordinator ────────
736
737/// Resumability assessment of a query plan. The shim slice runs a
738/// textual classifier over the SQL string: a query is resumable iff
739/// it has a stable total order over a unique key. By default we
740/// promise RID ASC; an explicit `ORDER BY rid` (or `ORDER BY rid ASC`)
741/// is also resumable. Anything that aggregates / groups / windows or
742/// orders on a non-unique column is not.
743pub fn assess_resumability(query: &str) -> bool {
744    let upper = query.to_uppercase();
745    let trimmed = upper.trim_start();
746    if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
747        return false;
748    }
749    const FORBIDDEN: &[&str] = &[
750        " GROUP BY ",
751        " HAVING ",
752        " DISTINCT ",
753        "DISTINCT ",
754        "COUNT(",
755        "SUM(",
756        "AVG(",
757        "MIN(",
758        "MAX(",
759        "ARRAY_AGG(",
760        "JSON_AGG(",
761        "OVER(",
762        " OVER (",
763        " JOIN ",
764    ];
765    for kw in FORBIDDEN {
766        if upper.contains(kw) {
767            return false;
768        }
769    }
770    if let Some(idx) = upper.find("ORDER BY") {
771        let tail = &upper[idx + "ORDER BY".len()..];
772        // Strip trailing LIMIT and statement terminator.
773        let mut clause = tail.to_string();
774        if let Some(lim) = clause.find(" LIMIT ") {
775            clause.truncate(lim);
776        }
777        if let Some(semi) = clause.find(';') {
778            clause.truncate(semi);
779        }
780        let clause = clause.trim();
781        if !matches!(clause, "RID" | "RID ASC") {
782            return false;
783        }
784    }
785    true
786}
787
788/// Resume-eligibility ledger. Holds `(snapshot_lsn → opened_at_ms,
789/// ttl_ms)` so a resume request can be checked against TTL without
790/// trusting the wall clock on the client. The shim slice does not
791/// implement true MVCC pinning — the registry's role is to make
792/// `snapshot_expired` deterministic and testable.
793#[derive(Debug, Default)]
794pub struct LeaseRegistry {
795    inner: Mutex<HashMap<u64, LeaseEntry>>,
796}
797
798#[derive(Debug, Clone, Copy)]
799struct LeaseEntry {
800    opened_at_ms: u64,
801    ttl_ms: u64,
802}
803
804#[derive(Debug, Clone, Copy, PartialEq, Eq)]
805pub enum LeaseLookup {
806    /// No lease ever recorded for this snapshot LSN.
807    Unknown,
808    /// Lease exists but its TTL has elapsed.
809    Expired,
810    /// Lease exists and is still within TTL.
811    Live,
812}
813
814impl LeaseRegistry {
815    pub fn new() -> Arc<Self> {
816        Arc::new(Self::default())
817    }
818
819    /// Record a freshly-opened lease. Idempotent — re-inserting the
820    /// same snapshot_lsn refreshes the timestamp (the client cannot
821    /// observe lease identity through the snapshot LSN alone, so this
822    /// matches "the latest open wins" semantics).
823    pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
824        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
825        inner.insert(
826            snapshot_lsn,
827            LeaseEntry {
828                opened_at_ms,
829                ttl_ms,
830            },
831        );
832    }
833
834    /// Resume-time lookup. Returns whether the lease is unknown,
835    /// expired, or still live as of `now_ms`.
836    pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
837        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
838        match inner.get(&snapshot_lsn) {
839            None => LeaseLookup::Unknown,
840            Some(entry) => {
841                if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
842                    LeaseLookup::Expired
843                } else {
844                    LeaseLookup::Live
845                }
846            }
847        }
848    }
849
850    /// Visible for tests / audit.
851    #[doc(hidden)]
852    pub fn len(&self) -> usize {
853        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
854    }
855}
856
857/// Issue #807 / PRD #750 — opaque cursor token. 192 bits drawn from the
858/// OS CSPRNG, hex-encoded so it survives JSON transport. The token is the
859/// only cursor identifier the wire sees; the pinned snapshot, scope, and
860/// query stay server-side in the [`CursorRegistry`]. Opacity property: a
861/// client cannot derive the pinned snapshot LSN or another principal's
862/// token from the value, and tokens are not sequential across opens.
863pub const CURSOR_TOKEN_BYTES: usize = 24;
864
865static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
866
867/// Generate an opaque 192-bit cursor token (48-char hex). Falls back to a
868/// high-entropy time/counter mix if the CSPRNG is unavailable so a fresh
869/// open never fails to mint a token (the fallback is not unguessable, but
870/// production paths reach `/dev/urandom` successfully).
871pub fn generate_cursor_token() -> String {
872    let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
873    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
874        let salt = NEXT_CURSOR_SALT
875            .fetch_add(1, Ordering::SeqCst)
876            .to_le_bytes();
877        let now = crate::utils::now_unix_nanos().to_le_bytes();
878        bytes[..8].copy_from_slice(&salt);
879        bytes[8..16].copy_from_slice(&now);
880    }
881    crate::utils::to_hex(&bytes)
882}
883
884/// Server-side cursor record. Pins the read snapshot and the query so a
885/// resume replays the same point-in-time view, and binds the entry to the
886/// `(tenant, principal)` that opened it for authorization scoping.
887#[derive(Debug, Clone)]
888struct CursorEntry {
889    snapshot_lsn: u64,
890    tenant: String,
891    principal: String,
892    query: String,
893    entity_types: Option<Vec<String>>,
894    capabilities: Option<Vec<String>>,
895    opened_at_ms: u64,
896    ttl_ms: u64,
897    /// Issue #808 / 750d — shared cancel token observed by the executor for
898    /// this stream. The cancel endpoint and client-disconnect detection
899    /// raise it; the engine iterators poll it. Cloned out to the handler at
900    /// open so the live stream and a later cancel request share one flag.
901    cancel_token: CancelToken,
902    /// Issue #808 / 750d — tombstone marker. Set once the stream is
903    /// cancelled (explicitly or by disconnect); a tombstoned cursor can
904    /// never be resumed, only reported as cleanly cancelled to its owner.
905    cancelled: bool,
906}
907
908/// Pinned read state returned to the handler when a resume token resolves
909/// cleanly. The handler re-executes `query` against `snapshot_lsn` to
910/// re-stream the same view the original cursor referenced.
911#[derive(Debug, Clone, PartialEq, Eq)]
912pub struct CursorResume {
913    pub snapshot_lsn: u64,
914    pub query: String,
915    pub entity_types: Option<Vec<String>>,
916    pub capabilities: Option<Vec<String>>,
917    pub expires_at_ms: u64,
918}
919
920/// Why a resume token was refused. The handler maps every variant to a
921/// structured wire error. [`CursorReject::NotFound`] deliberately covers
922/// both "never issued" and "owned by a different tenant/principal" so an
923/// unauthorized caller cannot tell a foreign cursor from a nonexistent one.
924#[derive(Debug, Clone, Copy, PartialEq, Eq)]
925pub enum CursorReject {
926    /// Token unknown, or known to a different tenant/principal (masked so
927    /// existence never leaks to an unauthorized caller).
928    NotFound,
929    /// Token owned by this caller but its TTL has elapsed.
930    Expired,
931    /// Issue #808 / 750d — token owned by this caller but the stream was
932    /// cancelled (explicitly or by client disconnect) and tombstoned. A
933    /// resume is refused with a clean, structured error rather than
934    /// replaying a stream the client already abandoned.
935    Cancelled,
936}
937
938/// Issue #807 / PRD #750 — process-wide cursor registry for `/query/stream`.
939///
940/// On a fresh stream the server mints an opaque token, pins the read
941/// snapshot LSN, and records it here scoped to the requesting
942/// `(tenant, principal)` with a TTL. A later resume request presents the
943/// token; [`resolve`](CursorRegistry::resolve) returns the pinned query +
944/// snapshot only when the token is live AND the caller's scope matches.
945///
946/// In-memory is acceptable for this first cut (per the issue): the registry
947/// is shared via `Arc` so cloned server handles enforce against one map.
948#[derive(Debug, Default)]
949pub struct CursorRegistry {
950    inner: Mutex<HashMap<String, CursorEntry>>,
951}
952
953impl CursorRegistry {
954    pub fn new() -> Arc<Self> {
955        Arc::new(Self::default())
956    }
957
958    /// Mint + record a cursor for a freshly-opened stream, returning the
959    /// opaque token handed to the client. The entry expires at
960    /// `opened_at_ms + ttl_ms` (saturating).
961    #[allow(clippy::too_many_arguments)]
962    pub fn register(
963        &self,
964        snapshot_lsn: u64,
965        tenant: &str,
966        principal: &str,
967        query: &str,
968        entity_types: Option<Vec<String>>,
969        capabilities: Option<Vec<String>>,
970        opened_at_ms: u64,
971        ttl_ms: u64,
972    ) -> String {
973        let token = generate_cursor_token();
974        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
975        inner.insert(
976            token.clone(),
977            CursorEntry {
978                snapshot_lsn,
979                tenant: tenant.to_string(),
980                principal: principal.to_string(),
981                query: query.to_string(),
982                entity_types,
983                capabilities,
984                opened_at_ms,
985                ttl_ms,
986                cancel_token: CancelToken::new(),
987                cancelled: false,
988            },
989        );
990        token
991    }
992
993    /// Issue #808 / 750d — clone out the shared cancel token for a cursor
994    /// the caller just minted, so the live streaming handler can poll the
995    /// same flag a later cancel request raises. No scope check: the only
996    /// caller is the handler that owns the freshly-registered token.
997    pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
998        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
999        inner.get(token).map(|e| e.cancel_token.clone())
1000    }
1001
1002    /// Issue #808 / 750d — cancel + tombstone a cursor on behalf of
1003    /// `(tenant, principal)`. Raises the shared cancel token (so an
1004    /// in-flight stream observing it stops) and marks the entry cancelled
1005    /// so a later resume is refused with [`CursorReject::Cancelled`].
1006    ///
1007    /// Scope is checked exactly as in [`resolve`](CursorRegistry::resolve):
1008    /// a token owned by a different tenant/principal — or one that never
1009    /// existed — returns [`CursorReject::NotFound`] so an unauthorized
1010    /// caller cannot cancel, or even confirm the existence of, a foreign
1011    /// stream. Cancelling an already-cancelled cursor is idempotent.
1012    pub fn cancel(
1013        &self,
1014        token: &str,
1015        tenant: &str,
1016        principal: &str,
1017    ) -> Result<CancelToken, CursorReject> {
1018        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1019        let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1020        if entry.tenant != tenant || entry.principal != principal {
1021            return Err(CursorReject::NotFound);
1022        }
1023        entry.cancelled = true;
1024        entry.cancel_token.cancel();
1025        Ok(entry.cancel_token.clone())
1026    }
1027
1028    /// Resolve a resume token for `(tenant, principal)` as of `now_ms`.
1029    ///
1030    /// Scope is checked **before** expiry: a token owned by a different
1031    /// tenant or principal resolves to [`CursorReject::NotFound`] —
1032    /// identical to a token that was never issued — so an unauthorized
1033    /// caller learns nothing about its existence. Only the rightful owner
1034    /// can observe [`CursorReject::Expired`].
1035    pub fn resolve(
1036        &self,
1037        token: &str,
1038        tenant: &str,
1039        principal: &str,
1040        now_ms: u64,
1041    ) -> Result<CursorResume, CursorReject> {
1042        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1043        let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1044        // Scope gate first — a mismatch must be indistinguishable from
1045        // absence so a cross-tenant/cross-principal probe cannot confirm
1046        // the token exists.
1047        if entry.tenant != tenant || entry.principal != principal {
1048            return Err(CursorReject::NotFound);
1049        }
1050        // Issue #808 / 750d — a tombstoned (cancelled) cursor is refused
1051        // with a dedicated reason before the TTL check, so the owner learns
1052        // the stream was cancelled rather than seeing a generic expiry.
1053        if entry.cancelled {
1054            return Err(CursorReject::Cancelled);
1055        }
1056        if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1057            return Err(CursorReject::Expired);
1058        }
1059        Ok(CursorResume {
1060            snapshot_lsn: entry.snapshot_lsn,
1061            query: entry.query.clone(),
1062            entity_types: entry.entity_types.clone(),
1063            capabilities: entry.capabilities.clone(),
1064            expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1065        })
1066    }
1067
1068    /// Visible for tests / audit.
1069    #[doc(hidden)]
1070    pub fn len(&self) -> usize {
1071        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1072    }
1073
1074    #[doc(hidden)]
1075    pub fn is_empty(&self) -> bool {
1076        self.len() == 0
1077    }
1078}
1079
1080/// Incremental SHA-256 hasher over emitted row lines, hex-encoded on
1081/// finalize. The wire contract is that the server hashes the exact
1082/// byte sequence of each row line (without trailing newline) in the
1083/// order the rows are emitted; the client stores the resulting digest
1084/// from the `end` envelope and replays it on resume.
1085#[derive(Debug, Default)]
1086pub struct PrefixHasher {
1087    inner: Option<sha2::Sha256>,
1088    rows: u64,
1089}
1090
1091impl PrefixHasher {
1092    pub fn new() -> Self {
1093        use sha2::Digest;
1094        Self {
1095            inner: Some(sha2::Sha256::new()),
1096            rows: 0,
1097        }
1098    }
1099
1100    pub fn update(&mut self, line: &[u8]) {
1101        use sha2::Digest;
1102        if let Some(h) = self.inner.as_mut() {
1103            h.update(line);
1104        }
1105        self.rows += 1;
1106    }
1107
1108    pub fn rows(&self) -> u64 {
1109        self.rows
1110    }
1111
1112    /// Hex-encoded digest of everything fed so far. Consumes the
1113    /// hasher (a `PrefixHasher` is single-use).
1114    pub fn finalize_hex(mut self) -> String {
1115        use sha2::Digest;
1116        let hasher = self
1117            .inner
1118            .take()
1119            .expect("PrefixHasher::finalize_hex called twice");
1120        let digest = hasher.finalize();
1121        let mut out = String::with_capacity(64);
1122        for b in digest.iter() {
1123            out.push_str(&format!("{b:02x}"));
1124        }
1125        out
1126    }
1127}
1128
1129/// Issue #767 / S8 — audit emission helpers. Each helper builds an
1130/// `AuditEvent` shaped to the brief and forwards it to the runtime's
1131/// audit log. The helpers are intentionally side-effect-only (no
1132/// return); audit emission must never terminate a stream that would
1133/// otherwise succeed.
1134pub fn audit_stream_opened(
1135    runtime: &RedDBRuntime,
1136    lease_handle: &str,
1137    principal: &str,
1138    snapshot_lsn: u64,
1139    query_hash: &str,
1140) {
1141    use crate::json::{Map, Value as JsonValue};
1142    let mut detail = Map::new();
1143    detail.insert(
1144        "lease_handle".to_string(),
1145        JsonValue::String(lease_handle.to_string()),
1146    );
1147    detail.insert(
1148        "snapshot_lsn".to_string(),
1149        JsonValue::Number(snapshot_lsn as f64),
1150    );
1151    detail.insert(
1152        "query_hash".to_string(),
1153        JsonValue::String(query_hash.to_string()),
1154    );
1155    let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1156        .principal(principal)
1157        .resource(lease_handle.to_string())
1158        .outcome(crate::runtime::audit_log::Outcome::Success)
1159        .detail(JsonValue::Object(detail))
1160        .build();
1161    runtime.audit_log().record_event(event);
1162}
1163
1164pub fn audit_stream_closed(
1165    runtime: &RedDBRuntime,
1166    lease_handle: &str,
1167    principal: &str,
1168    reason: CloseReason,
1169    row_count: u64,
1170    bytes_written: u64,
1171) {
1172    use crate::json::{Map, Value as JsonValue};
1173    let mut stats = Map::new();
1174    stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1175    stats.insert(
1176        "bytes_written".to_string(),
1177        JsonValue::Number(bytes_written as f64),
1178    );
1179    let mut detail = Map::new();
1180    detail.insert(
1181        "lease_handle".to_string(),
1182        JsonValue::String(lease_handle.to_string()),
1183    );
1184    detail.insert(
1185        "reason".to_string(),
1186        JsonValue::String(reason.as_str().to_string()),
1187    );
1188    detail.insert("stats".to_string(), JsonValue::Object(stats));
1189    let outcome = match reason {
1190        CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1191        CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1192        _ => crate::runtime::audit_log::Outcome::Error,
1193    };
1194    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1195        .principal(principal)
1196        .resource(lease_handle.to_string())
1197        .outcome(outcome)
1198        .detail(JsonValue::Object(detail))
1199        .build();
1200    runtime.audit_log().record_event(event);
1201}
1202
1203pub fn audit_token_expired_during_lease(
1204    runtime: &RedDBRuntime,
1205    lease_handle: &str,
1206    principal: &str,
1207    token_expiry_ms: u64,
1208) {
1209    use crate::json::{Map, Value as JsonValue};
1210    let mut detail = Map::new();
1211    detail.insert(
1212        "lease_handle".to_string(),
1213        JsonValue::String(lease_handle.to_string()),
1214    );
1215    detail.insert(
1216        "token_expiry".to_string(),
1217        JsonValue::Number(token_expiry_ms as f64),
1218    );
1219    detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1220    let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1221        .principal(principal)
1222        .resource(lease_handle.to_string())
1223        .outcome(crate::runtime::audit_log::Outcome::Success)
1224        .detail(JsonValue::Object(detail))
1225        .build();
1226    runtime.audit_log().record_event(event);
1227}
1228
1229/// Capacity refusal emits a `stream.closed` event with no lease handle
1230/// (the open never produced one) and `reason: capacity_refused`. The
1231/// brief lists capacity_refused alongside the other close reasons; we
1232/// keep the wire shape identical so downstream audit-log consumers
1233/// don't have to special-case it.
1234pub fn audit_stream_capacity_refused(
1235    runtime: &RedDBRuntime,
1236    principal: &str,
1237    code: &str,
1238    limit: usize,
1239    current: usize,
1240) {
1241    use crate::json::{Map, Value as JsonValue};
1242    let mut detail = Map::new();
1243    detail.insert(
1244        "reason".to_string(),
1245        JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1246    );
1247    detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1248    detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1249    detail.insert("current".to_string(), JsonValue::Number(current as f64));
1250    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1251        .principal(principal)
1252        .outcome(crate::runtime::audit_log::Outcome::Denied)
1253        .detail(JsonValue::Object(detail))
1254        .build();
1255    runtime.audit_log().record_event(event);
1256}
1257
1258/// Borrow the lease registry's clock by default — the routing handler
1259/// uses this, tests inject their own.
1260pub fn system_clock() -> Arc<dyn Clock> {
1261    static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1262    Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267    use super::*;
1268
1269    #[test]
1270    fn open_stream_refuses_when_session_has_active_transaction() {
1271        let clock = FakeClock::new(0);
1272        let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1273        assert_eq!(err, OpenStreamError::TransactionActive);
1274        assert_eq!(err.code(), "stream_in_transaction_unsupported");
1275    }
1276
1277    #[test]
1278    fn open_stream_succeeds_when_session_is_autocommit() {
1279        let clock = FakeClock::new(1_700_000_000_000);
1280        let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1281        assert_eq!(lease.snapshot_lsn, 123);
1282        assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1283        assert!(lease.id >= 1);
1284    }
1285
1286    #[test]
1287    fn lease_ids_are_unique_and_monotonic() {
1288        let clock = FakeClock::new(0);
1289        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1290        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1291        assert!(b.id > a.id);
1292    }
1293
1294    #[test]
1295    fn snapshot_expired_uses_injected_clock_and_ttl() {
1296        // TTL fake-clock test: advance past ttl_ms and snapshot_expired
1297        // flips. Acceptance criterion #5.
1298        let clock = FakeClock::new(0);
1299        let mut config = StreamConfig::DEFAULT;
1300        config.snapshot_ttl_ms = 5_000;
1301        let lease = open_stream(config, 0, false, &clock).unwrap();
1302
1303        assert!(!lease.snapshot_expired(clock.now_ms()));
1304        clock.advance(4_999);
1305        assert!(!lease.snapshot_expired(clock.now_ms()));
1306        clock.advance(1);
1307        assert!(lease.snapshot_expired(clock.now_ms()));
1308    }
1309
1310    #[test]
1311    fn stream_config_loads_defaults_when_kv_is_empty() {
1312        // Without runtime, just sanity-check the defaults match ADR 0029.
1313        let cfg = StreamConfig::DEFAULT;
1314        assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1315        assert_eq!(cfg.chunk_default_pages, 4);
1316        assert_eq!(cfg.chunk_min_pages, 1);
1317        assert_eq!(cfg.chunk_max_pages, 64);
1318        assert_eq!(cfg.chunk_max_rows, 1000);
1319        assert_eq!(cfg.chunk_max_latency_ms, 50);
1320        assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1321    }
1322
1323    #[test]
1324    fn stream_config_normalize_clamps_inconsistent_inputs() {
1325        let mut cfg = StreamConfig {
1326            snapshot_ttl_ms: 1,
1327            chunk_default_pages: 100,
1328            chunk_min_pages: 0,
1329            chunk_max_pages: 8,
1330            chunk_max_rows: 0,
1331            chunk_max_latency_ms: 1,
1332            max_global_streams: 0,
1333            max_per_principal_streams: 0,
1334            default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1335        };
1336        cfg.normalize();
1337        assert_eq!(cfg.chunk_min_pages, 1);
1338        assert_eq!(cfg.chunk_max_pages, 8);
1339        assert_eq!(cfg.chunk_default_pages, 8); // clamped down to max
1340        assert!(cfg.chunk_max_rows >= 1);
1341        assert!(cfg.max_global_streams >= 1);
1342        assert!(cfg.max_per_principal_streams >= 1);
1343    }
1344
1345    /// Test sink that accumulates flushed chunks into an interior-mutable
1346    /// `Vec`. Avoids the closure-capture-mutable-then-borrow-immutable
1347    /// dance that the borrow checker rejects when assertions are
1348    /// interleaved with `push_line` calls.
1349    struct CapturingSink {
1350        chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1351    }
1352    impl CapturingSink {
1353        fn new() -> Self {
1354            Self {
1355                chunks: std::cell::RefCell::new(Vec::new()),
1356            }
1357        }
1358        fn len(&self) -> usize {
1359            self.chunks.borrow().len()
1360        }
1361        fn last_len(&self) -> Option<usize> {
1362            self.chunks.borrow().last().map(|c| c.len())
1363        }
1364    }
1365
1366    fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1367        move |bytes: &[u8]| {
1368            sink.chunks.borrow_mut().push(bytes.to_vec());
1369            Ok(())
1370        }
1371    }
1372
1373    #[test]
1374    fn chunk_producer_flushes_on_byte_cap() {
1375        let clock = FakeClock::new(0);
1376        let cfg = StreamConfig {
1377            chunk_default_pages: 1, // 16 KiB
1378            chunk_min_pages: 1,
1379            chunk_max_pages: 1,
1380            chunk_max_rows: 1_000_000,
1381            chunk_max_latency_ms: 1_000_000,
1382            ..StreamConfig::DEFAULT
1383        };
1384        let sink = CapturingSink::new();
1385        let mut producer = ChunkProducer::new(&cfg, &clock);
1386        let mut flush = capture(&sink);
1387
1388        producer
1389            .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1390            .unwrap();
1391        assert_eq!(sink.len(), 0);
1392
1393        let triggered = producer
1394            .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1395            .unwrap();
1396        assert!(triggered);
1397        assert_eq!(sink.len(), 1);
1398        assert_eq!(sink.last_len(), Some(16 * 1024));
1399        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1400    }
1401
1402    #[test]
1403    fn chunk_producer_flushes_on_row_cap() {
1404        let clock = FakeClock::new(0);
1405        let cfg = StreamConfig {
1406            chunk_default_pages: 4, // 64 KiB — well above any test row size
1407            chunk_min_pages: 1,
1408            chunk_max_pages: 64,
1409            chunk_max_rows: 3,
1410            chunk_max_latency_ms: 1_000_000,
1411            ..StreamConfig::DEFAULT
1412        };
1413        let sink = CapturingSink::new();
1414        let mut producer = ChunkProducer::new(&cfg, &clock);
1415        let mut flush = capture(&sink);
1416        let row = b"{\"row\":{}}\n";
1417        producer.push_line(row, &mut flush).unwrap();
1418        producer.push_line(row, &mut flush).unwrap();
1419        assert_eq!(sink.len(), 0);
1420        let triggered = producer.push_line(row, &mut flush).unwrap();
1421        assert!(triggered);
1422        assert_eq!(sink.len(), 1);
1423        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1424    }
1425
1426    #[test]
1427    fn chunk_producer_flushes_on_latency_cap() {
1428        let clock = FakeClock::new(0);
1429        let cfg = StreamConfig {
1430            chunk_default_pages: 4,
1431            chunk_min_pages: 1,
1432            chunk_max_pages: 64,
1433            chunk_max_rows: 1_000_000,
1434            chunk_max_latency_ms: 50,
1435            ..StreamConfig::DEFAULT
1436        };
1437        let sink = CapturingSink::new();
1438        let mut producer = ChunkProducer::new(&cfg, &clock);
1439        let mut flush = capture(&sink);
1440        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1441        assert_eq!(sink.len(), 0);
1442        clock.advance(60);
1443        let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1444        assert!(triggered);
1445        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1446    }
1447
1448    #[test]
1449    fn chunk_producer_finish_emits_terminal_flush() {
1450        let clock = FakeClock::new(0);
1451        let cfg = StreamConfig::DEFAULT;
1452        let sink = CapturingSink::new();
1453        let mut producer = ChunkProducer::new(&cfg, &clock);
1454        let mut flush = capture(&sink);
1455        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1456        producer.finish(&mut flush).unwrap();
1457        assert_eq!(sink.len(), 1);
1458        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1459    }
1460
1461    #[test]
1462    fn write_chunked_helpers_produce_well_formed_chunks() {
1463        let mut buf: Vec<u8> = Vec::new();
1464        write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1465        write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1466        write_chunked_terminator(&mut buf).unwrap();
1467        let text = String::from_utf8(buf).unwrap();
1468        assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1469        assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1470        // 11 bytes in `{"row":{}}\n` → hex 'b'.
1471        assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1472        assert!(text.ends_with("0\r\n\r\n"));
1473    }
1474
1475    // ──────── Issue #761 / S2 — capacity registry ────────
1476
1477    #[test]
1478    fn capacity_registry_global_exhausted_returns_structured_error() {
1479        let reg = StreamCapacityRegistry::new();
1480        let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1481        let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1482        let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1483        assert_eq!(
1484            err,
1485            AcquireError::GlobalExhausted {
1486                limit: 2,
1487                current: 2,
1488            }
1489        );
1490        assert_eq!(err.code(), "server_stream_capacity_exhausted");
1491    }
1492
1493    #[test]
1494    fn capacity_registry_per_principal_exhausted_independent_of_global() {
1495        // Acceptance criterion #2: per-principal cap fires even when
1496        // global has room. Acceptance criterion #3: counters are
1497        // independent across principals.
1498        let reg = StreamCapacityRegistry::new();
1499        let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1500        let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1501        let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1502        assert_eq!(
1503            err,
1504            AcquireError::PrincipalExhausted {
1505                principal: "alice".to_string(),
1506                limit: 2,
1507                current: 2,
1508            }
1509        );
1510        assert_eq!(err.code(), "principal_stream_quota_exhausted");
1511
1512        // Bob is unaffected by Alice's quota.
1513        let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1514        let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1515    }
1516
1517    #[test]
1518    fn capacity_registry_release_frees_both_counters() {
1519        // Acceptance criterion #4: drop releases both counters.
1520        let reg = StreamCapacityRegistry::new();
1521        let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1522        assert!(reg.try_acquire("alice", 1, 1).is_err());
1523        drop(g1);
1524        let (global, per_principal) = reg.snapshot();
1525        assert_eq!(global, 0);
1526        assert!(per_principal.is_empty());
1527        // Slot is now reclaimable.
1528        let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1529    }
1530
1531    #[test]
1532    fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1533        // Acceptance criterion #5: stress coverage. Spawn `THREADS`
1534        // threads each running `ITERS` acquire+release cycles against
1535        // a registry sized to fit only `CAP` slots; the live count
1536        // must never exceed `CAP`, and the registry must return to
1537        // zero once every thread has joined.
1538        use std::sync::atomic::{AtomicUsize, Ordering};
1539
1540        const THREADS: usize = 16;
1541        const ITERS: usize = 200;
1542        const CAP_GLOBAL: usize = 4;
1543        const CAP_PER_PRINCIPAL: usize = 4;
1544
1545        let reg = StreamCapacityRegistry::new();
1546        let observed_max = Arc::new(AtomicUsize::new(0));
1547        let mut handles = Vec::new();
1548        for tid in 0..THREADS {
1549            let reg = Arc::clone(&reg);
1550            let observed_max = Arc::clone(&observed_max);
1551            // Two principals share the global cap, each capped at
1552            // `CAP_PER_PRINCIPAL` themselves.
1553            let principal = format!("p{}", tid % 2);
1554            handles.push(std::thread::spawn(move || {
1555                for _ in 0..ITERS {
1556                    if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1557                        let (live, _) = reg.snapshot();
1558                        observed_max.fetch_max(live, Ordering::SeqCst);
1559                        // Hold the slot just long enough to let other
1560                        // threads race against the cap.
1561                        std::thread::yield_now();
1562                        drop(guard);
1563                    }
1564                }
1565            }));
1566        }
1567        for h in handles {
1568            h.join().unwrap();
1569        }
1570        let (global_after, per_principal_after) = reg.snapshot();
1571        assert_eq!(global_after, 0, "global counter leaked");
1572        assert!(
1573            per_principal_after.is_empty(),
1574            "per-principal map leaked: {per_principal_after:?}"
1575        );
1576        assert!(
1577            observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1578            "global cap was breached: observed {} > {}",
1579            observed_max.load(Ordering::SeqCst),
1580            CAP_GLOBAL
1581        );
1582    }
1583
1584    // ──────── Issue #766 / S7 — resume coordinator ────────
1585
1586    #[test]
1587    fn assess_resumability_accepts_plain_select() {
1588        assert!(assess_resumability("SELECT id, name FROM t"));
1589        assert!(assess_resumability("select * from t where id > 5"));
1590        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1591        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1592        assert!(assess_resumability(
1593            "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1594        ));
1595    }
1596
1597    #[test]
1598    fn assess_resumability_rejects_aggregates_and_unordered() {
1599        assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1600        assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1601        assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1602        assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1603        assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1604        assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1605        assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1606        assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1607        assert!(!assess_resumability(
1608            "SELECT a FROM t JOIN u ON t.id = u.id"
1609        ));
1610    }
1611
1612    #[test]
1613    fn lease_registry_records_and_expires_against_ttl() {
1614        let reg = LeaseRegistry::new();
1615        reg.record(42, 1_000, 5_000);
1616        assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1617        assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1618        assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1619        assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1620    }
1621
1622    #[test]
1623    fn prefix_hasher_is_order_sensitive_and_deterministic() {
1624        let mut a = PrefixHasher::new();
1625        a.update(b"{\"row\":{\"id\":1}}");
1626        a.update(b"{\"row\":{\"id\":2}}");
1627        let hash_a = a.finalize_hex();
1628
1629        let mut b = PrefixHasher::new();
1630        b.update(b"{\"row\":{\"id\":1}}");
1631        b.update(b"{\"row\":{\"id\":2}}");
1632        let hash_b = b.finalize_hex();
1633        assert_eq!(hash_a, hash_b);
1634
1635        let mut c = PrefixHasher::new();
1636        c.update(b"{\"row\":{\"id\":2}}");
1637        c.update(b"{\"row\":{\"id\":1}}");
1638        assert_ne!(hash_a, c.finalize_hex());
1639        assert_eq!(hash_a.len(), 64);
1640    }
1641
1642    // ──────── Issue #767 / S8 — opaque lease handle + audit ────────
1643
1644    #[test]
1645    fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1646        // Acceptance criterion #3 — wire-visible handle is opaque,
1647        // 128-bit, non-sequential. The internal monotonic `id` is
1648        // independent of the handle.
1649        let clock = FakeClock::new(0);
1650        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1651        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1652        assert_eq!(
1653            a.lease_handle.len(),
1654            LEASE_HANDLE_BYTES * 2,
1655            "handle must be 128 bits hex-encoded: {}",
1656            a.lease_handle
1657        );
1658        assert!(
1659            a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1660            "handle must be hex: {}",
1661            a.lease_handle
1662        );
1663        assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1664        // Sanity — internal id remains monotonic, handle is not
1665        // derivable from it (handle bytes don't encode the id).
1666        assert!(b.id > a.id);
1667    }
1668
1669    #[test]
1670    fn generate_lease_handle_produces_high_entropy_distinct_values() {
1671        // Defensive — large sample to surface a hypothetical CSPRNG
1672        // mis-wiring. 1024 draws with zero collisions on 128 bits is
1673        // statistically certain when the source is sound.
1674        let mut seen = std::collections::HashSet::new();
1675        for _ in 0..1024 {
1676            assert!(
1677                seen.insert(generate_lease_handle()),
1678                "duplicate handle in CSPRNG sequence"
1679            );
1680        }
1681    }
1682
1683    #[test]
1684    fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1685        // Synthetic JWT — header.payload.signature. We only consult
1686        // the payload (no signature verification).
1687        // Payload: {"exp":1700000000}
1688        // Base64url of that JSON: eyJleHAiOjE3MDAwMDAwMDB9
1689        let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1690        assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1691    }
1692
1693    #[test]
1694    fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1695        assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1696        assert_eq!(parse_jwt_exp_ms("only.two"), None);
1697        assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1698    }
1699
1700    // ──────── Issue #807 / 750c — cursor registry ────────
1701
1702    fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1703        reg.register(
1704            42,
1705            "acme",
1706            "bearer:abc",
1707            "SELECT id FROM users ORDER BY rid",
1708            None,
1709            None,
1710            now_ms,
1711            1_000,
1712        )
1713    }
1714
1715    #[test]
1716    fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1717        let reg = CursorRegistry::default();
1718        let a = register_default_cursor(&reg, 0);
1719        let b = register_default_cursor(&reg, 0);
1720        assert_eq!(
1721            a.len(),
1722            CURSOR_TOKEN_BYTES * 2,
1723            "token must be 192 bits hex-encoded: {a}"
1724        );
1725        assert!(
1726            a.chars().all(|c| c.is_ascii_hexdigit()),
1727            "token must be hex: {a}"
1728        );
1729        assert_ne!(a, b, "tokens must differ across registrations");
1730        assert_eq!(reg.len(), 2);
1731    }
1732
1733    #[test]
1734    fn cursor_resolves_for_owner_within_ttl() {
1735        // Happy-path resume — same tenant + principal, before TTL.
1736        let reg = CursorRegistry::default();
1737        let token = register_default_cursor(&reg, 1_000);
1738        let resume = reg
1739            .resolve(&token, "acme", "bearer:abc", 1_500)
1740            .expect("live cursor resolves for its owner");
1741        assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1742        assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1743        assert_eq!(resume.expires_at_ms, 2_000);
1744    }
1745
1746    #[test]
1747    fn cursor_rejects_after_ttl_for_owner() {
1748        // TTL expiry — owner sees a distinct `Expired` so they can
1749        // re-issue, but the cursor no longer resolves.
1750        let reg = CursorRegistry::default();
1751        let token = register_default_cursor(&reg, 0);
1752        assert_eq!(
1753            reg.resolve(&token, "acme", "bearer:abc", 1_000),
1754            Err(CursorReject::Expired),
1755            "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1756        );
1757        assert_eq!(
1758            reg.resolve(&token, "acme", "bearer:abc", 5_000),
1759            Err(CursorReject::Expired)
1760        );
1761    }
1762
1763    #[test]
1764    fn cursor_cross_tenant_is_masked_as_not_found() {
1765        // Tenant isolation — a different tenant cannot tell the token
1766        // from one that never existed (no existence leak), even while the
1767        // cursor is still live for its real owner.
1768        let reg = CursorRegistry::default();
1769        let token = register_default_cursor(&reg, 0);
1770        assert_eq!(
1771            reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1772            Err(CursorReject::NotFound),
1773            "cross-tenant resume must mask existence as NotFound"
1774        );
1775        // Still live for the rightful tenant.
1776        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1777    }
1778
1779    #[test]
1780    fn cursor_cross_principal_is_masked_as_not_found() {
1781        // Principal isolation — same tenant, different principal is also
1782        // masked as NotFound rather than leaking a permission error.
1783        let reg = CursorRegistry::default();
1784        let token = register_default_cursor(&reg, 0);
1785        assert_eq!(
1786            reg.resolve(&token, "acme", "bearer:other", 100),
1787            Err(CursorReject::NotFound),
1788            "cross-principal resume must mask existence as NotFound"
1789        );
1790        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1791    }
1792
1793    #[test]
1794    fn cursor_unknown_token_is_not_found() {
1795        let reg = CursorRegistry::default();
1796        assert!(reg.is_empty());
1797        assert_eq!(
1798            reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1799            Err(CursorReject::NotFound)
1800        );
1801    }
1802
1803    #[test]
1804    fn cursor_scope_is_checked_before_expiry() {
1805        // An unauthorized caller hitting an *expired* token must still see
1806        // NotFound, never Expired — Expired would confirm the token once
1807        // existed. Scope precedes expiry in `resolve`.
1808        let reg = CursorRegistry::default();
1809        let token = register_default_cursor(&reg, 0);
1810        assert_eq!(
1811            reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1812            Err(CursorReject::NotFound),
1813            "expired + wrong scope must mask as NotFound, not Expired"
1814        );
1815    }
1816
1817    // ──────── Issue #808 / 750d — cancel + tombstone ────────
1818
1819    #[test]
1820    fn cancel_tombstones_cursor_and_raises_token() {
1821        // Cancelling a live cursor raises its shared token (so an in-flight
1822        // executor observing it stops) and tombstones the entry.
1823        let reg = CursorRegistry::default();
1824        let token = register_default_cursor(&reg, 0);
1825        let live = reg
1826            .cancel_token_for(&token)
1827            .expect("freshly-minted cursor exposes its token");
1828        assert!(!live.is_cancelled(), "token starts un-cancelled");
1829
1830        let returned = reg
1831            .cancel(&token, "acme", "bearer:abc")
1832            .expect("owner cancels its own cursor");
1833        assert!(returned.is_cancelled(), "cancel raises the returned token");
1834        assert!(
1835            live.is_cancelled(),
1836            "the handler-held clone observes the cancel"
1837        );
1838    }
1839
1840    #[test]
1841    fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1842        // A tombstoned cursor refuses resume with a dedicated `Cancelled`
1843        // reason for its owner — distinct from Expired so the client learns
1844        // the stream was cancelled, not aged out.
1845        let reg = CursorRegistry::default();
1846        let token = register_default_cursor(&reg, 0);
1847        reg.cancel(&token, "acme", "bearer:abc")
1848            .expect("owner cancels");
1849        assert_eq!(
1850            reg.resolve(&token, "acme", "bearer:abc", 100),
1851            Err(CursorReject::Cancelled),
1852            "owner resuming a cancelled cursor sees Cancelled"
1853        );
1854    }
1855
1856    #[test]
1857    fn cancel_is_idempotent() {
1858        let reg = CursorRegistry::default();
1859        let token = register_default_cursor(&reg, 0);
1860        assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1861        // Second cancel still succeeds (already tombstoned) and stays cancelled.
1862        let second = reg
1863            .cancel(&token, "acme", "bearer:abc")
1864            .expect("re-cancel is a no-op success");
1865        assert!(second.is_cancelled());
1866    }
1867
1868    #[test]
1869    fn cancel_cross_tenant_is_masked_as_not_found() {
1870        // An unauthorized caller cannot cancel — or confirm the existence
1871        // of — a foreign cursor.
1872        let reg = CursorRegistry::default();
1873        let token = register_default_cursor(&reg, 0);
1874        assert!(
1875            matches!(
1876                reg.cancel(&token, "evil-corp", "bearer:abc"),
1877                Err(CursorReject::NotFound)
1878            ),
1879            "cross-tenant cancel must mask existence as NotFound"
1880        );
1881        // The rightful owner's cursor is untouched — still resumable.
1882        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1883    }
1884
1885    #[test]
1886    fn cancel_cross_principal_is_masked_as_not_found() {
1887        let reg = CursorRegistry::default();
1888        let token = register_default_cursor(&reg, 0);
1889        assert!(
1890            matches!(
1891                reg.cancel(&token, "acme", "bearer:other"),
1892                Err(CursorReject::NotFound)
1893            ),
1894            "cross-principal cancel must mask existence as NotFound"
1895        );
1896        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1897    }
1898
1899    #[test]
1900    fn cancel_unknown_token_is_not_found() {
1901        let reg = CursorRegistry::default();
1902        assert!(matches!(
1903            reg.cancel("deadbeef", "acme", "bearer:abc"),
1904            Err(CursorReject::NotFound)
1905        ));
1906    }
1907
1908    #[test]
1909    fn generate_cursor_token_produces_high_entropy_distinct_values() {
1910        let mut seen = std::collections::HashSet::new();
1911        for _ in 0..1024 {
1912            assert!(
1913                seen.insert(generate_cursor_token()),
1914                "duplicate cursor token in CSPRNG sequence"
1915            );
1916        }
1917    }
1918
1919    #[test]
1920    fn close_reason_as_str_covers_every_state_transition() {
1921        // Acceptance criterion #5 — every state transition in the
1922        // brief is representable in the audit close-reason taxonomy.
1923        for (variant, expected) in [
1924            (CloseReason::Ok, "ok"),
1925            (CloseReason::Cancelled, "cancelled"),
1926            (CloseReason::Error, "error"),
1927            (CloseReason::SnapshotExpired, "snapshot_expired"),
1928            (CloseReason::CapacityRefused, "capacity_refused"),
1929            (CloseReason::IntegrityFailed, "integrity_failed"),
1930        ] {
1931            assert_eq!(variant.as_str(), expected);
1932        }
1933    }
1934
1935    #[test]
1936    fn stream_config_defaults_carry_s2_caps() {
1937        assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1938        assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1939    }
1940
1941    // ──────── Issue #768 / S9 — pull-based driver ────────
1942
1943    /// Sink that records every flushed chunk's length so a test can
1944    /// assert the resident working set stayed bounded.
1945    struct SizeSink {
1946        sizes: std::cell::RefCell<Vec<usize>>,
1947    }
1948    impl SizeSink {
1949        fn new() -> Self {
1950            Self {
1951                sizes: std::cell::RefCell::new(Vec::new()),
1952            }
1953        }
1954        fn flushes(&self) -> usize {
1955            self.sizes.borrow().len()
1956        }
1957        fn max_chunk(&self) -> usize {
1958            self.sizes.borrow().iter().copied().max().unwrap_or(0)
1959        }
1960    }
1961    fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1962        move |bytes: &[u8]| {
1963            sink.sizes.borrow_mut().push(bytes.len());
1964            Ok(())
1965        }
1966    }
1967
1968    #[test]
1969    fn drive_lines_streams_large_source_with_bounded_working_set() {
1970        // Acceptance #1: a huge scan flows through the chunk buffer
1971        // without ever materialising the full result set. The source
1972        // is a *lazy* range mapped to records — collecting it would
1973        // allocate N rows, but `drive_lines` pulls one at a time. We
1974        // assert every flushed chunk stays within a small multiple of
1975        // the page buffer, i.e. memory tracks the buffer, not N.
1976        let clock = FakeClock::new(0);
1977        let cfg = StreamConfig {
1978            chunk_default_pages: 1, // 16 KiB buffer
1979            chunk_min_pages: 1,
1980            chunk_max_pages: 1,
1981            chunk_max_rows: 1_000_000, // don't let the row cap dominate
1982            chunk_max_latency_ms: 1_000_000,
1983            ..StreamConfig::DEFAULT
1984        };
1985        let sink = SizeSink::new();
1986        let mut producer = ChunkProducer::new(&cfg, &clock);
1987        let mut flush = size_capture(&sink);
1988
1989        const N: u64 = 1_000_000;
1990        // Lazy source: never collected into a Vec.
1991        let source = 0..N;
1992        let consumed = producer
1993            .drive_lines(
1994                source,
1995                |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1996                &mut flush,
1997            )
1998            .unwrap();
1999        producer.finish(&mut flush).unwrap();
2000
2001        assert_eq!(consumed, N);
2002        assert_eq!(producer.total_rows(), N);
2003        // Many flushes occurred (streamed), not one giant buffer.
2004        assert!(
2005            sink.flushes() > 1000,
2006            "expected the source to stream across many chunks, saw {}",
2007            sink.flushes()
2008        );
2009        // No chunk exceeded the byte cap plus one trailing line — the
2010        // resident buffer is bounded independent of N.
2011        let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
2012        assert!(
2013            sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
2014            "chunk {} exceeded bounded working set {}",
2015            sink.max_chunk(),
2016            cfg.production_buffer_bytes() + max_line
2017        );
2018    }
2019
2020    #[test]
2021    fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2022        // Acceptance #2: first-row latency is bounded by the latency
2023        // cap, not by full materialisation. The source yields rows
2024        // whose pull advances the fake clock; the first chunk must
2025        // flush as soon as the latency window elapses, long before the
2026        // (large) source is exhausted.
2027        let clock = FakeClock::new(0);
2028        let cfg = StreamConfig {
2029            chunk_default_pages: 64, // large byte cap — won't trip first
2030            chunk_min_pages: 1,
2031            chunk_max_pages: 64,
2032            chunk_max_rows: 1_000_000, // large row cap — won't trip first
2033            chunk_max_latency_ms: 50,
2034            ..StreamConfig::DEFAULT
2035        };
2036        let sink = SizeSink::new();
2037        let mut producer = ChunkProducer::new(&cfg, &clock);
2038
2039        // Drive manually so we can advance the clock between pulls and
2040        // observe the first flush. Each pull advances 20 ms; the 50 ms
2041        // latency cap trips on the third row.
2042        let mut first_flush_after: Option<u64> = None;
2043        let mut row = 0u64;
2044        while row < 1_000_000 {
2045            let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2046            clock.advance(20);
2047            let mut flush = size_capture(&sink);
2048            let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2049            row += 1;
2050            if flushed {
2051                first_flush_after = Some(row);
2052                break;
2053            }
2054        }
2055
2056        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2057        let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2058        assert!(
2059            rows_before_flush <= 4,
2060            "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2061        );
2062        // Crucially, the source was nowhere near drained (1e6 rows).
2063        assert!(rows_before_flush < 1_000_000);
2064    }
2065
2066    #[test]
2067    fn drive_lines_parity_with_manual_push_line() {
2068        // The driver must produce byte-identical output to hand-rolled
2069        // push_line calls — the chunk producer's framing is unchanged.
2070        let clock = FakeClock::new(0);
2071        let cfg = StreamConfig::DEFAULT;
2072
2073        let lines: Vec<Vec<u8>> = (0..50)
2074            .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2075            .collect();
2076
2077        let driven = CapturingSink::new();
2078        {
2079            let mut p = ChunkProducer::new(&cfg, &clock);
2080            let mut flush = capture(&driven);
2081            p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2082                .unwrap();
2083            p.finish(&mut flush).unwrap();
2084        }
2085
2086        let manual = CapturingSink::new();
2087        {
2088            let mut p = ChunkProducer::new(&cfg, &clock);
2089            let mut flush = capture(&manual);
2090            for l in &lines {
2091                p.push_line(l, &mut flush).unwrap();
2092            }
2093            p.finish(&mut flush).unwrap();
2094        }
2095
2096        assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2097    }
2098}