Skip to main content

reddb_server/server/
output_stream.rs

1//! Output streaming primitives for issue #760 (PRD #759 / ADR 0029) — the
2//! "shim slice" of bidirectional streaming.
3//!
4//! Scope of this module:
5//!   - [`StreamConfig`]: capture the `stream.*` namespace from `red_config`
6//!     at lease open and freeze it for the lease's lifetime (acceptance
7//!     criterion: KV mutations mid-stream do not retroactively change
8//!     behaviour).
9//!   - [`StreamLease`]: an internal, unforwarded handle bound to a
10//!     snapshot LSN and a frozen config. No external surface yet (S2 will
11//!     add quotas + per-principal accounting).
12//!   - [`open_stream`]: refuses with `stream_in_transaction_unsupported`
13//!     when the caller already has an active `BEGIN` on the session.
14//!   - [`ChunkProducer`]: page-aligned (N × 16 KiB) production buffer
15//!     that flushes on the first of byte / row / latency cap.
16//!   - [`Clock`]: trait-injected time source so TTL expiry is testable.
17//!
18//! The HTTP NDJSON wire framing built on top of these primitives lives in
19//! `handlers_query::handle_query_ndjson_stream` and is dispatched from
20//! `routing::try_route_streaming`.
21
22use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33/// Engine page size — the production buffer is always a multiple of this.
34/// Matches `storage::engine::PAGE_SIZE`.
35pub const PAGE_SIZE: usize = 16 * 1024;
36
37/// Injectable time source. Production code uses [`SystemClock`]; tests
38/// drive TTL expiry with [`FakeClock`] so they don't depend on wall time.
39pub trait Clock: Send + Sync {
40    fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47    fn now_ms(&self) -> u64 {
48        SystemTime::now()
49            .duration_since(UNIX_EPOCH)
50            .map(|d| d.as_millis() as u64)
51            .unwrap_or(0)
52    }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57    now_ms: AtomicU64,
58}
59
60impl FakeClock {
61    pub fn new(now_ms: u64) -> Self {
62        Self {
63            now_ms: AtomicU64::new(now_ms),
64        }
65    }
66
67    pub fn advance(&self, ms: u64) {
68        self.now_ms.fetch_add(ms, Ordering::SeqCst);
69    }
70}
71
72impl Clock for FakeClock {
73    fn now_ms(&self) -> u64 {
74        self.now_ms.load(Ordering::SeqCst)
75    }
76}
77
78/// Frozen snapshot of the `stream.*` namespace at lease open. Acceptance
79/// criterion: a `red_config` mutation while a stream is running does not
80/// retroactively change the running stream's behaviour — that is, the
81/// per-lease config is value-typed and not a back-reference.
82#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84    pub snapshot_ttl_ms: u64,
85    pub chunk_default_pages: usize,
86    pub chunk_min_pages: usize,
87    pub chunk_max_pages: usize,
88    pub chunk_max_rows: usize,
89    pub chunk_max_latency_ms: u64,
90    /// Process-wide concurrent stream cap (issue #761 / S2).
91    /// Acquiring the (N+1)th slot is refused with
92    /// `server_stream_capacity_exhausted`.
93    pub max_global_streams: usize,
94    /// Per-principal concurrent stream cap (issue #761 / S2).
95    /// Acquiring the (M+1)th slot for a single principal is refused
96    /// with `principal_stream_quota_exhausted` even when the global
97    /// counter still has room.
98    pub max_per_principal_streams: usize,
99    /// Issue #765 / S6 — default end-to-end verification mode for input
100    /// streams when the open frame does not request one
101    /// (`stream.integrity.default_verify`, `"none"` per ADR 0029).
102    pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106    fn default() -> Self {
107        Self::DEFAULT
108    }
109}
110
111impl StreamConfig {
112    /// Defaults match ADR 0029. `snapshot_ttl_ms` is the only key
113    /// observable mid-stream; the chunk caps only affect framing.
114    pub const DEFAULT: StreamConfig = StreamConfig {
115        snapshot_ttl_ms: 60_000,
116        chunk_default_pages: 4,
117        chunk_min_pages: 1,
118        chunk_max_pages: 64,
119        chunk_max_rows: 1000,
120        chunk_max_latency_ms: 50,
121        max_global_streams: 256,
122        max_per_principal_streams: 32,
123        default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124    };
125
126    /// Read every `stream.*` key from `red_config` over the runtime's KV
127    /// store. Missing keys fall back to [`Self::DEFAULT`]. Unparseable or
128    /// out-of-range values fall back silently — bad config never gets to
129    /// terminate a request that would otherwise succeed.
130    pub fn load(runtime: &RedDBRuntime) -> Self {
131        let db = runtime.db();
132        let read_u64 = |key: &str| -> Option<u64> {
133            match db.get_kv(RED_CONFIG_COLLECTION, key) {
134                Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135                Some((Value::UnsignedInteger(v), _)) => Some(v),
136                Some((Value::Text(text), _)) => text.parse().ok(),
137                _ => None,
138            }
139        };
140
141        let mut cfg = Self::DEFAULT;
142        if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143            cfg.snapshot_ttl_ms = v;
144        }
145        if let Some(v) = read_u64("stream.chunk.default_pages") {
146            cfg.chunk_default_pages = v as usize;
147        }
148        if let Some(v) = read_u64("stream.chunk.min_pages") {
149            cfg.chunk_min_pages = v as usize;
150        }
151        if let Some(v) = read_u64("stream.chunk.max_pages") {
152            cfg.chunk_max_pages = v as usize;
153        }
154        if let Some(v) = read_u64("stream.chunk.max_rows") {
155            cfg.chunk_max_rows = v as usize;
156        }
157        if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158            cfg.chunk_max_latency_ms = v;
159        }
160        if let Some(v) = read_u64("stream.max_global") {
161            cfg.max_global_streams = v as usize;
162        }
163        if let Some(v) = read_u64("stream.max_per_principal") {
164            cfg.max_per_principal_streams = v as usize;
165        }
166        // Issue #765 / S6 — opt-in integrity default. Stored as a Text key
167        // (`"none"` / `"sha256"`); anything else falls back to `None`.
168        if let Some((Value::Text(text), _)) =
169            db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170        {
171            cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172        }
173        cfg.normalize();
174        cfg
175    }
176
177    /// Clamp interrelated fields to a self-consistent state. Floors come
178    /// from ADR 0029 ("hard floor 1 page"); ceilings prevent zero-row /
179    /// zero-byte caps that would prevent the producer from ever flushing.
180    fn normalize(&mut self) {
181        if self.chunk_min_pages == 0 {
182            self.chunk_min_pages = 1;
183        }
184        if self.chunk_max_pages < self.chunk_min_pages {
185            self.chunk_max_pages = self.chunk_min_pages;
186        }
187        if self.chunk_default_pages < self.chunk_min_pages {
188            self.chunk_default_pages = self.chunk_min_pages;
189        }
190        if self.chunk_default_pages > self.chunk_max_pages {
191            self.chunk_default_pages = self.chunk_max_pages;
192        }
193        if self.chunk_max_rows == 0 {
194            self.chunk_max_rows = 1;
195        }
196        if self.max_global_streams == 0 {
197            self.max_global_streams = 1;
198        }
199        if self.max_per_principal_streams == 0 {
200            self.max_per_principal_streams = 1;
201        }
202    }
203
204    /// Page-aligned production buffer size in bytes. Acceptance criterion:
205    /// "production buffer is always N × 16 KiB".
206    pub fn production_buffer_bytes(&self) -> usize {
207        self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208    }
209}
210
211/// Monotonic, process-local lease id. The id is internal — the bearer
212/// token still authenticates the open, the lease only identifies the
213/// stream for audit and termination (ADR 0029 "Authorization").
214///
215/// Issue #767 / S8 — the internal id is **never** forwarded on the
216/// wire. The client receives [`StreamLease::lease_handle`], a 128-bit
217/// opaque random value, in `OpenAck` / the `{"end": …}` envelope.
218static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220/// Issue #767 / S8 — wire-visible lease handle, 128 bits drawn from
221/// the OS CSPRNG. Hex-encoded so it survives JSON / header transport.
222/// Opacity property: a client cannot derive the internal lease id
223/// from the handle (the two are independent), and the handle is not
224/// sequential across opens.
225pub const LEASE_HANDLE_BYTES: usize = 16;
226
227/// Generate an opaque 128-bit lease handle (32-char hex). Per ADR 0029,
228/// the handle is the only identifier the wire sees; the internal
229/// `StreamLease::id` stays server-side.
230pub fn generate_lease_handle() -> String {
231    let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233        // CSPRNG failure is exceedingly rare on supported platforms;
234        // fall back to a high-entropy mix so the open path never
235        // terminates a stream that would otherwise succeed. The
236        // fallback is *not* secure unguessable — production paths
237        // will hit /dev/urandom successfully.
238        let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239        let now = crate::utils::now_unix_nanos().to_le_bytes();
240        bytes[..8].copy_from_slice(&lo);
241        bytes[8..].copy_from_slice(&now);
242    }
243    crate::utils::to_hex(&bytes)
244}
245
246/// Issue #767 / S8 — non-validating JWT `exp` claim extractor. Returns
247/// the expiry time in milliseconds since the epoch when `token` is a
248/// JWT carrying an integer `exp` claim, otherwise `None`.
249///
250/// The extractor does **not** validate the JWT signature, issuer, or
251/// audience — those gates run during `is_authorized` at OpenStream.
252/// This helper is consulted *after* the bearer has been accepted, to
253/// decide whether the lease will outlive the bearer credential and an
254/// audit event should be emitted accordingly.
255pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256    let parts: Vec<&str> = token.split('.').collect();
257    if parts.len() != 3 {
258        return None;
259    }
260    let payload = base64url_decode_padded(parts[1])?;
261    let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262    let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263    if exp_secs <= 0 {
264        return None;
265    }
266    Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270    let mut s = String::with_capacity(input.len() + 4);
271    for ch in input.chars() {
272        match ch {
273            '-' => s.push('+'),
274            '_' => s.push('/'),
275            _ => s.push(ch),
276        }
277    }
278    while !s.len().is_multiple_of(4) {
279        s.push('=');
280    }
281    crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284/// Issue #767 / S8 — wire-visible reason for `stream.closed`. The
285/// audit emit site decides the variant once the stream's exit shape
286/// has been observed.
287#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289    Ok,
290    Cancelled,
291    Error,
292    SnapshotExpired,
293    CapacityRefused,
294    IntegrityFailed,
295}
296
297impl CloseReason {
298    pub fn as_str(&self) -> &'static str {
299        match self {
300            CloseReason::Ok => "ok",
301            CloseReason::Cancelled => "cancelled",
302            CloseReason::Error => "error",
303            CloseReason::SnapshotExpired => "snapshot_expired",
304            CloseReason::CapacityRefused => "capacity_refused",
305            CloseReason::IntegrityFailed => "integrity_failed",
306        }
307    }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312    pub id: u64,
313    /// Issue #767 / S8 — opaque handle handed to the client. 32-char
314    /// hex (128 bits of CSPRNG output). The internal `id` is not
315    /// derivable from this value.
316    pub lease_handle: String,
317    pub snapshot_lsn: u64,
318    pub opened_at_ms: u64,
319    pub config: StreamConfig,
320}
321
322impl StreamLease {
323    /// `true` once `snapshot_ttl_ms` has elapsed since the lease was opened.
324    /// The shim slice materialises the result first, so expiry in practice
325    /// only fires for streams that take longer than `ttl_ms` to drain to
326    /// the client. The check is wired in so the wire envelope can carry
327    /// `snapshot_expired` exactly the way later slices (pull-based
328    /// executors) will need it.
329    pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330        now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331    }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336    /// Acceptance criterion #4: `OpenStream` against a session with an
337    /// active `BEGIN` is refused with this code. The wire shape carries
338    /// it back as `{"error": {"code": "stream_in_transaction_unsupported"}}`.
339    TransactionActive,
340}
341
342impl OpenStreamError {
343    pub fn code(&self) -> &'static str {
344        match self {
345            OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346        }
347    }
348
349    pub fn message(&self) -> &'static str {
350        match self {
351            OpenStreamError::TransactionActive => {
352                "cannot open output stream while a transaction is active on this session"
353            }
354        }
355    }
356}
357
358/// Issue a lease. The caller is responsible for binding `snapshot_lsn`
359/// to the same MVCC view the underlying executor will read from; in the
360/// shim slice this is `runtime.cdc_current_lsn()` captured before
361/// `execute_query`.
362pub fn open_stream(
363    config: StreamConfig,
364    snapshot_lsn: u64,
365    in_transaction: bool,
366    clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368    if in_transaction {
369        return Err(OpenStreamError::TransactionActive);
370    }
371    Ok(StreamLease {
372        id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373        lease_handle: generate_lease_handle(),
374        snapshot_lsn,
375        opened_at_ms: clock.now_ms(),
376        config,
377    })
378}
379
380/// Page-aligned chunk producer. The producer accumulates byte-encoded
381/// rows in an N × 16 KiB buffer; on the first of byte / row / latency
382/// cap it forwards the buffer to the supplied flush closure, which the
383/// transport layer turns into a chunked-encoding frame.
384///
385/// The struct does not know about HTTP, NDJSON, or chunked transfer —
386/// it is wire-agnostic so the gRPC and RedWire paths can reuse it.
387pub struct ChunkProducer<'a> {
388    buf: Vec<u8>,
389    rows: usize,
390    window_started_ms: u64,
391    cap_bytes: usize,
392    cap_rows: usize,
393    cap_latency_ms: u64,
394    clock: &'a dyn Clock,
395    total_flushes: u64,
396    total_bytes: u64,
397    total_rows: u64,
398    last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403    Byte,
404    Row,
405    Latency,
406    Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410    pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411        let cap_bytes = config.production_buffer_bytes();
412        Self {
413            buf: Vec::with_capacity(cap_bytes),
414            rows: 0,
415            window_started_ms: clock.now_ms(),
416            cap_bytes,
417            cap_rows: config.chunk_max_rows,
418            cap_latency_ms: config.chunk_max_latency_ms,
419            clock,
420            total_flushes: 0,
421            total_bytes: 0,
422            total_rows: 0,
423            last_flush_reason: None,
424        }
425    }
426
427    /// Append one already-encoded line (NDJSON: bytes + `\n`). Returns
428    /// `true` if the append triggered a flush.
429    pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430    where
431        F: FnMut(&[u8]) -> std::io::Result<()>,
432    {
433        self.buf.extend_from_slice(line);
434        self.rows += 1;
435        self.total_rows += 1;
436
437        if self.buf.len() >= self.cap_bytes {
438            self.flush(flush, FlushReason::Byte)?;
439            return Ok(true);
440        }
441        if self.rows >= self.cap_rows {
442            self.flush(flush, FlushReason::Row)?;
443            return Ok(true);
444        }
445        let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446        if elapsed >= self.cap_latency_ms {
447            self.flush(flush, FlushReason::Latency)?;
448            return Ok(true);
449        }
450        Ok(false)
451    }
452
453    /// Issue #768 / S9 — drive a pull-based line source into the
454    /// production buffer. The producer *pulls* one encoded line at a
455    /// time from `source` and routes it through [`push_line`], so the
456    /// resident working set is the page-aligned buffer plus the single
457    /// line currently in hand — never the full result set. Pair this
458    /// with the pull-based scan iterators
459    /// (`parallel_scan::parallel_scan_rows`,
460    /// `bitmap_scan::execute_bitmap_scan_stream`) whose records are
461    /// encoded lazily by `encode`.
462    ///
463    /// Returns the number of lines consumed. Flush caps (byte / row /
464    /// latency) fire mid-drain exactly as they would for hand-driven
465    /// [`push_line`] calls, so first-line latency stays bounded by
466    /// `chunk.max_latency_ms` regardless of how many lines the source
467    /// will ultimately yield.
468    ///
469    /// [`push_line`]: ChunkProducer::push_line
470    pub fn drive_lines<S, R, Enc, F>(
471        &mut self,
472        source: S,
473        mut encode: Enc,
474        flush: &mut F,
475    ) -> std::io::Result<u64>
476    where
477        S: IntoIterator<Item = R>,
478        Enc: FnMut(&R) -> Vec<u8>,
479        F: FnMut(&[u8]) -> std::io::Result<()>,
480    {
481        let mut count = 0u64;
482        for record in source {
483            let line = encode(&record);
484            self.push_line(&line, flush)?;
485            count += 1;
486        }
487        Ok(count)
488    }
489
490    /// Force-flush any buffered bytes — used after the final NDJSON line
491    /// (`{"end": …}`) to push the tail of the buffer before closing the
492    /// connection.
493    pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494    where
495        F: FnMut(&[u8]) -> std::io::Result<()>,
496    {
497        if !self.buf.is_empty() {
498            self.flush(flush, FlushReason::Terminal)?;
499        }
500        Ok(())
501    }
502
503    fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504    where
505        F: FnMut(&[u8]) -> std::io::Result<()>,
506    {
507        flush(&self.buf)?;
508        self.total_bytes += self.buf.len() as u64;
509        self.total_flushes += 1;
510        self.last_flush_reason = Some(reason);
511        self.buf.clear();
512        self.rows = 0;
513        self.window_started_ms = self.clock.now_ms();
514        Ok(())
515    }
516
517    pub fn total_flushes(&self) -> u64 {
518        self.total_flushes
519    }
520    pub fn total_bytes(&self) -> u64 {
521        self.total_bytes
522    }
523    pub fn total_rows(&self) -> u64 {
524        self.total_rows
525    }
526    pub fn last_flush_reason(&self) -> Option<FlushReason> {
527        self.last_flush_reason
528    }
529}
530
531/// HTTP chunked transfer encoding helpers. We do not pull in `hyper` for
532/// the streaming path; the existing SSE handler also hand-rolls its own
533/// HTTP framing, and matching that style keeps the diff narrow.
534pub fn write_chunked_response_header<W: std::io::Write>(
535    writer: &mut W,
536    status: u16,
537    content_type: &str,
538) -> std::io::Result<()> {
539    // Streaming responses write their own head here instead of going
540    // through `HttpResponse::to_http_bytes`, so they must repeat the
541    // same permissive CORS posture — otherwise a browser that can call
542    // the JSON endpoints cross-origin would still be blocked on the
543    // NDJSON/SSE streaming routes (graph/vector results). API auth is
544    // by header/API key, never cookies, so wildcard origin is safe and
545    // Allow-Credentials is deliberately never sent.
546    let header = format!(
547        "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n\
548         Access-Control-Allow-Origin: *\r\n\
549         Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n\
550         Access-Control-Allow-Headers: *\r\n\
551         Access-Control-Max-Age: 86400\r\n\r\n",
552        status,
553        crate::server::transport::status_text(status),
554        content_type,
555    );
556    writer.write_all(header.as_bytes())?;
557    writer.flush()
558}
559
560/// Emit one HTTP chunk (`<hex-size>\r\n<bytes>\r\n`). A zero-length
561/// payload is silently dropped — the terminator chunk lives in
562/// [`write_chunked_terminator`].
563pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
564    if bytes.is_empty() {
565        return Ok(());
566    }
567    let size = format!("{:x}\r\n", bytes.len());
568    writer.write_all(size.as_bytes())?;
569    writer.write_all(bytes)?;
570    writer.write_all(b"\r\n")?;
571    writer.flush()
572}
573
574/// Final `0\r\n\r\n` chunk that terminates a chunked body.
575pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
576    writer.write_all(b"0\r\n\r\n")?;
577    writer.flush()
578}
579
580/// Issue #761 / S2 — process-wide stream capacity registry. Holds two
581/// counters: a global concurrent-stream count and a per-principal map.
582/// Both are decremented when the [`StreamCapacityGuard`] handed back
583/// from a successful `try_acquire` is dropped, so the release path
584/// covers every normal exit (success, mid-stream error, snapshot
585/// expiry, client disconnect that drops the writer chain, panic
586/// unwind through the stack frame holding the guard).
587#[derive(Debug, Default)]
588pub struct StreamCapacityRegistry {
589    inner: Mutex<CapacityInner>,
590}
591
592#[derive(Debug, Default)]
593struct CapacityInner {
594    global_count: usize,
595    per_principal: HashMap<String, usize>,
596}
597
598/// Failure modes of [`StreamCapacityRegistry::try_acquire`]. Each
599/// variant carries the cap that fired and the live counter value at
600/// refusal time so clients can back off intelligently (the HTTP layer
601/// surfaces these inside the structured 429 body).
602#[derive(Debug, Clone, PartialEq, Eq)]
603pub enum AcquireError {
604    /// `stream.max_global` exceeded. Per acceptance criterion #1.
605    GlobalExhausted { limit: usize, current: usize },
606    /// `stream.max_per_principal` exceeded for `principal`. Per
607    /// acceptance criterion #2. The principal is surfaced verbatim;
608    /// callers must escape it on the wire.
609    PrincipalExhausted {
610        principal: String,
611        limit: usize,
612        current: usize,
613    },
614}
615
616impl AcquireError {
617    pub fn code(&self) -> &'static str {
618        match self {
619            AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
620            AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
621        }
622    }
623
624    pub fn message(&self) -> String {
625        match self {
626            AcquireError::GlobalExhausted { limit, current } => {
627                format!("server stream capacity exhausted (limit {limit}, current {current})")
628            }
629            AcquireError::PrincipalExhausted {
630                principal,
631                limit,
632                current,
633            } => format!(
634                "principal {principal} stream quota exhausted (limit {limit}, current {current})"
635            ),
636        }
637    }
638}
639
640impl StreamCapacityRegistry {
641    pub fn new() -> Arc<Self> {
642        Arc::new(Self::default())
643    }
644
645    /// Attempt to acquire one slot. Both caps are checked under the
646    /// same lock, so a concurrent acquire+release pair cannot over-
647    /// issue beyond either ceiling (acceptance criterion #5).
648    pub fn try_acquire(
649        self: &Arc<Self>,
650        principal: &str,
651        max_global: usize,
652        max_per_principal: usize,
653    ) -> Result<StreamCapacityGuard, AcquireError> {
654        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
655        if inner.global_count >= max_global {
656            return Err(AcquireError::GlobalExhausted {
657                limit: max_global,
658                current: inner.global_count,
659            });
660        }
661        let current = inner.per_principal.get(principal).copied().unwrap_or(0);
662        if current >= max_per_principal {
663            return Err(AcquireError::PrincipalExhausted {
664                principal: principal.to_string(),
665                limit: max_per_principal,
666                current,
667            });
668        }
669        inner.global_count += 1;
670        inner
671            .per_principal
672            .insert(principal.to_string(), current + 1);
673        Ok(StreamCapacityGuard {
674            registry: Arc::clone(self),
675            principal: principal.to_string(),
676            released: false,
677        })
678    }
679
680    fn release(&self, principal: &str) {
681        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
682        if inner.global_count > 0 {
683            inner.global_count -= 1;
684        }
685        if let Some(count) = inner.per_principal.get_mut(principal) {
686            if *count > 0 {
687                *count -= 1;
688            }
689            if *count == 0 {
690                inner.per_principal.remove(principal);
691            }
692        }
693    }
694
695    /// Visible for tests and audit handlers.
696    pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
697        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
698        (inner.global_count, inner.per_principal.clone())
699    }
700}
701
702/// RAII slot returned by [`StreamCapacityRegistry::try_acquire`].
703/// Decrements both counters on drop — acceptance criterion #4
704/// ("releasing a slot on stream end (any reason) decrements both
705/// counters atomically").
706#[must_use = "dropping the guard immediately releases the stream slot"]
707#[derive(Debug)]
708pub struct StreamCapacityGuard {
709    registry: Arc<StreamCapacityRegistry>,
710    principal: String,
711    released: bool,
712}
713
714impl StreamCapacityGuard {
715    pub fn principal(&self) -> &str {
716        &self.principal
717    }
718}
719
720impl Drop for StreamCapacityGuard {
721    fn drop(&mut self) {
722        if !self.released {
723            self.registry.release(&self.principal);
724            self.released = true;
725        }
726    }
727}
728
729// ──────── Issue #766 / S7 — resume coordinator ────────
730
731/// Resumability assessment of a query plan. The shim slice runs a
732/// textual classifier over the SQL string: a query is resumable iff
733/// it has a stable total order over a unique key. By default we
734/// promise RID ASC; an explicit `ORDER BY rid` (or `ORDER BY rid ASC`)
735/// is also resumable. Anything that aggregates / groups / windows or
736/// orders on a non-unique column is not.
737pub fn assess_resumability(query: &str) -> bool {
738    let upper = query.to_uppercase();
739    let trimmed = upper.trim_start();
740    if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
741        return false;
742    }
743    const FORBIDDEN: &[&str] = &[
744        " GROUP BY ",
745        " HAVING ",
746        " DISTINCT ",
747        "DISTINCT ",
748        "COUNT(",
749        "SUM(",
750        "AVG(",
751        "MIN(",
752        "MAX(",
753        "ARRAY_AGG(",
754        "JSON_AGG(",
755        "OVER(",
756        " OVER (",
757        " JOIN ",
758    ];
759    for kw in FORBIDDEN {
760        if upper.contains(kw) {
761            return false;
762        }
763    }
764    if let Some(idx) = upper.find("ORDER BY") {
765        let tail = &upper[idx + "ORDER BY".len()..];
766        // Strip trailing LIMIT and statement terminator.
767        let mut clause = tail.to_string();
768        if let Some(lim) = clause.find(" LIMIT ") {
769            clause.truncate(lim);
770        }
771        if let Some(semi) = clause.find(';') {
772            clause.truncate(semi);
773        }
774        let clause = clause.trim();
775        if !matches!(clause, "RID" | "RID ASC") {
776            return false;
777        }
778    }
779    true
780}
781
782/// Resume-eligibility ledger. Holds `(snapshot_lsn → opened_at_ms,
783/// ttl_ms)` so a resume request can be checked against TTL without
784/// trusting the wall clock on the client. The shim slice does not
785/// implement true MVCC pinning — the registry's role is to make
786/// `snapshot_expired` deterministic and testable.
787#[derive(Debug, Default)]
788pub struct LeaseRegistry {
789    inner: Mutex<HashMap<u64, LeaseEntry>>,
790}
791
792#[derive(Debug, Clone, Copy)]
793struct LeaseEntry {
794    opened_at_ms: u64,
795    ttl_ms: u64,
796}
797
798#[derive(Debug, Clone, Copy, PartialEq, Eq)]
799pub enum LeaseLookup {
800    /// No lease ever recorded for this snapshot LSN.
801    Unknown,
802    /// Lease exists but its TTL has elapsed.
803    Expired,
804    /// Lease exists and is still within TTL.
805    Live,
806}
807
808impl LeaseRegistry {
809    pub fn new() -> Arc<Self> {
810        Arc::new(Self::default())
811    }
812
813    /// Record a freshly-opened lease. Idempotent — re-inserting the
814    /// same snapshot_lsn refreshes the timestamp (the client cannot
815    /// observe lease identity through the snapshot LSN alone, so this
816    /// matches "the latest open wins" semantics).
817    pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
818        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
819        inner.insert(
820            snapshot_lsn,
821            LeaseEntry {
822                opened_at_ms,
823                ttl_ms,
824            },
825        );
826    }
827
828    /// Resume-time lookup. Returns whether the lease is unknown,
829    /// expired, or still live as of `now_ms`.
830    pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
831        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
832        match inner.get(&snapshot_lsn) {
833            None => LeaseLookup::Unknown,
834            Some(entry) => {
835                if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
836                    LeaseLookup::Expired
837                } else {
838                    LeaseLookup::Live
839                }
840            }
841        }
842    }
843
844    /// Visible for tests / audit.
845    #[doc(hidden)]
846    pub fn len(&self) -> usize {
847        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
848    }
849}
850
851/// Issue #807 / PRD #750 — opaque cursor token. 192 bits drawn from the
852/// OS CSPRNG, hex-encoded so it survives JSON transport. The token is the
853/// only cursor identifier the wire sees; the pinned snapshot, scope, and
854/// query stay server-side in the [`CursorRegistry`]. Opacity property: a
855/// client cannot derive the pinned snapshot LSN or another principal's
856/// token from the value, and tokens are not sequential across opens.
857pub const CURSOR_TOKEN_BYTES: usize = 24;
858
859static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
860
861/// Generate an opaque 192-bit cursor token (48-char hex). Falls back to a
862/// high-entropy time/counter mix if the CSPRNG is unavailable so a fresh
863/// open never fails to mint a token (the fallback is not unguessable, but
864/// production paths reach `/dev/urandom` successfully).
865pub fn generate_cursor_token() -> String {
866    let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
867    if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
868        let salt = NEXT_CURSOR_SALT
869            .fetch_add(1, Ordering::SeqCst)
870            .to_le_bytes();
871        let now = crate::utils::now_unix_nanos().to_le_bytes();
872        bytes[..8].copy_from_slice(&salt);
873        bytes[8..16].copy_from_slice(&now);
874    }
875    crate::utils::to_hex(&bytes)
876}
877
878/// Server-side cursor record. Pins the read snapshot and the query so a
879/// resume replays the same point-in-time view, and binds the entry to the
880/// `(tenant, principal)` that opened it for authorization scoping.
881#[derive(Debug, Clone)]
882struct CursorEntry {
883    snapshot_lsn: u64,
884    tenant: String,
885    principal: String,
886    query: String,
887    entity_types: Option<Vec<String>>,
888    capabilities: Option<Vec<String>>,
889    opened_at_ms: u64,
890    ttl_ms: u64,
891    /// Issue #808 / 750d — shared cancel token observed by the executor for
892    /// this stream. The cancel endpoint and client-disconnect detection
893    /// raise it; the engine iterators poll it. Cloned out to the handler at
894    /// open so the live stream and a later cancel request share one flag.
895    cancel_token: CancelToken,
896    /// Issue #808 / 750d — tombstone marker. Set once the stream is
897    /// cancelled (explicitly or by disconnect); a tombstoned cursor can
898    /// never be resumed, only reported as cleanly cancelled to its owner.
899    cancelled: bool,
900}
901
902/// Pinned read state returned to the handler when a resume token resolves
903/// cleanly. The handler re-executes `query` against `snapshot_lsn` to
904/// re-stream the same view the original cursor referenced.
905#[derive(Debug, Clone, PartialEq, Eq)]
906pub struct CursorResume {
907    pub snapshot_lsn: u64,
908    pub query: String,
909    pub entity_types: Option<Vec<String>>,
910    pub capabilities: Option<Vec<String>>,
911    pub expires_at_ms: u64,
912}
913
914/// Why a resume token was refused. The handler maps every variant to a
915/// structured wire error. [`CursorReject::NotFound`] deliberately covers
916/// both "never issued" and "owned by a different tenant/principal" so an
917/// unauthorized caller cannot tell a foreign cursor from a nonexistent one.
918#[derive(Debug, Clone, Copy, PartialEq, Eq)]
919pub enum CursorReject {
920    /// Token unknown, or known to a different tenant/principal (masked so
921    /// existence never leaks to an unauthorized caller).
922    NotFound,
923    /// Token owned by this caller but its TTL has elapsed.
924    Expired,
925    /// Issue #808 / 750d — token owned by this caller but the stream was
926    /// cancelled (explicitly or by client disconnect) and tombstoned. A
927    /// resume is refused with a clean, structured error rather than
928    /// replaying a stream the client already abandoned.
929    Cancelled,
930}
931
932/// Issue #807 / PRD #750 — process-wide cursor registry for `/query/stream`.
933///
934/// On a fresh stream the server mints an opaque token, pins the read
935/// snapshot LSN, and records it here scoped to the requesting
936/// `(tenant, principal)` with a TTL. A later resume request presents the
937/// token; [`resolve`](CursorRegistry::resolve) returns the pinned query +
938/// snapshot only when the token is live AND the caller's scope matches.
939///
940/// In-memory is acceptable for this first cut (per the issue): the registry
941/// is shared via `Arc` so cloned server handles enforce against one map.
942#[derive(Debug, Default)]
943pub struct CursorRegistry {
944    inner: Mutex<HashMap<String, CursorEntry>>,
945}
946
947impl CursorRegistry {
948    pub fn new() -> Arc<Self> {
949        Arc::new(Self::default())
950    }
951
952    /// Mint + record a cursor for a freshly-opened stream, returning the
953    /// opaque token handed to the client. The entry expires at
954    /// `opened_at_ms + ttl_ms` (saturating).
955    #[allow(clippy::too_many_arguments)]
956    pub fn register(
957        &self,
958        snapshot_lsn: u64,
959        tenant: &str,
960        principal: &str,
961        query: &str,
962        entity_types: Option<Vec<String>>,
963        capabilities: Option<Vec<String>>,
964        opened_at_ms: u64,
965        ttl_ms: u64,
966    ) -> String {
967        let token = generate_cursor_token();
968        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
969        inner.insert(
970            token.clone(),
971            CursorEntry {
972                snapshot_lsn,
973                tenant: tenant.to_string(),
974                principal: principal.to_string(),
975                query: query.to_string(),
976                entity_types,
977                capabilities,
978                opened_at_ms,
979                ttl_ms,
980                cancel_token: CancelToken::new(),
981                cancelled: false,
982            },
983        );
984        token
985    }
986
987    /// Issue #808 / 750d — clone out the shared cancel token for a cursor
988    /// the caller just minted, so the live streaming handler can poll the
989    /// same flag a later cancel request raises. No scope check: the only
990    /// caller is the handler that owns the freshly-registered token.
991    pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
992        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
993        inner.get(token).map(|e| e.cancel_token.clone())
994    }
995
996    /// Issue #808 / 750d — cancel + tombstone a cursor on behalf of
997    /// `(tenant, principal)`. Raises the shared cancel token (so an
998    /// in-flight stream observing it stops) and marks the entry cancelled
999    /// so a later resume is refused with [`CursorReject::Cancelled`].
1000    ///
1001    /// Scope is checked exactly as in [`resolve`](CursorRegistry::resolve):
1002    /// a token owned by a different tenant/principal — or one that never
1003    /// existed — returns [`CursorReject::NotFound`] so an unauthorized
1004    /// caller cannot cancel, or even confirm the existence of, a foreign
1005    /// stream. Cancelling an already-cancelled cursor is idempotent.
1006    pub fn cancel(
1007        &self,
1008        token: &str,
1009        tenant: &str,
1010        principal: &str,
1011    ) -> Result<CancelToken, CursorReject> {
1012        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1013        let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1014        if entry.tenant != tenant || entry.principal != principal {
1015            return Err(CursorReject::NotFound);
1016        }
1017        entry.cancelled = true;
1018        entry.cancel_token.cancel();
1019        Ok(entry.cancel_token.clone())
1020    }
1021
1022    /// Resolve a resume token for `(tenant, principal)` as of `now_ms`.
1023    ///
1024    /// Scope is checked **before** expiry: a token owned by a different
1025    /// tenant or principal resolves to [`CursorReject::NotFound`] —
1026    /// identical to a token that was never issued — so an unauthorized
1027    /// caller learns nothing about its existence. Only the rightful owner
1028    /// can observe [`CursorReject::Expired`].
1029    pub fn resolve(
1030        &self,
1031        token: &str,
1032        tenant: &str,
1033        principal: &str,
1034        now_ms: u64,
1035    ) -> Result<CursorResume, CursorReject> {
1036        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1037        let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1038        // Scope gate first — a mismatch must be indistinguishable from
1039        // absence so a cross-tenant/cross-principal probe cannot confirm
1040        // the token exists.
1041        if entry.tenant != tenant || entry.principal != principal {
1042            return Err(CursorReject::NotFound);
1043        }
1044        // Issue #808 / 750d — a tombstoned (cancelled) cursor is refused
1045        // with a dedicated reason before the TTL check, so the owner learns
1046        // the stream was cancelled rather than seeing a generic expiry.
1047        if entry.cancelled {
1048            return Err(CursorReject::Cancelled);
1049        }
1050        if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1051            return Err(CursorReject::Expired);
1052        }
1053        Ok(CursorResume {
1054            snapshot_lsn: entry.snapshot_lsn,
1055            query: entry.query.clone(),
1056            entity_types: entry.entity_types.clone(),
1057            capabilities: entry.capabilities.clone(),
1058            expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1059        })
1060    }
1061
1062    /// Visible for tests / audit.
1063    #[doc(hidden)]
1064    pub fn len(&self) -> usize {
1065        self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1066    }
1067
1068    #[doc(hidden)]
1069    pub fn is_empty(&self) -> bool {
1070        self.len() == 0
1071    }
1072}
1073
1074/// Incremental SHA-256 hasher over emitted row lines, hex-encoded on
1075/// finalize. The wire contract is that the server hashes the exact
1076/// byte sequence of each row line (without trailing newline) in the
1077/// order the rows are emitted; the client stores the resulting digest
1078/// from the `end` envelope and replays it on resume.
1079#[derive(Debug, Default)]
1080pub struct PrefixHasher {
1081    inner: Option<sha2::Sha256>,
1082    rows: u64,
1083}
1084
1085impl PrefixHasher {
1086    pub fn new() -> Self {
1087        use sha2::Digest;
1088        Self {
1089            inner: Some(sha2::Sha256::new()),
1090            rows: 0,
1091        }
1092    }
1093
1094    pub fn update(&mut self, line: &[u8]) {
1095        use sha2::Digest;
1096        if let Some(h) = self.inner.as_mut() {
1097            h.update(line);
1098        }
1099        self.rows += 1;
1100    }
1101
1102    pub fn rows(&self) -> u64 {
1103        self.rows
1104    }
1105
1106    /// Hex-encoded digest of everything fed so far. Consumes the
1107    /// hasher (a `PrefixHasher` is single-use).
1108    pub fn finalize_hex(mut self) -> String {
1109        use sha2::Digest;
1110        let hasher = self
1111            .inner
1112            .take()
1113            .expect("PrefixHasher::finalize_hex called twice");
1114        let digest = hasher.finalize();
1115        let mut out = String::with_capacity(64);
1116        for b in digest.iter() {
1117            out.push_str(&format!("{b:02x}"));
1118        }
1119        out
1120    }
1121}
1122
1123/// Issue #767 / S8 — audit emission helpers. Each helper builds an
1124/// `AuditEvent` shaped to the brief and forwards it to the runtime's
1125/// audit log. The helpers are intentionally side-effect-only (no
1126/// return); audit emission must never terminate a stream that would
1127/// otherwise succeed.
1128pub fn audit_stream_opened(
1129    runtime: &RedDBRuntime,
1130    lease_handle: &str,
1131    principal: &str,
1132    snapshot_lsn: u64,
1133    query_hash: &str,
1134) {
1135    use crate::json::{Map, Value as JsonValue};
1136    let mut detail = Map::new();
1137    detail.insert(
1138        "lease_handle".to_string(),
1139        JsonValue::String(lease_handle.to_string()),
1140    );
1141    detail.insert(
1142        "snapshot_lsn".to_string(),
1143        JsonValue::Number(snapshot_lsn as f64),
1144    );
1145    detail.insert(
1146        "query_hash".to_string(),
1147        JsonValue::String(query_hash.to_string()),
1148    );
1149    let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1150        .principal(principal)
1151        .resource(lease_handle.to_string())
1152        .outcome(crate::runtime::audit_log::Outcome::Success)
1153        .detail(JsonValue::Object(detail))
1154        .build();
1155    runtime.audit_log().record_event(event);
1156}
1157
1158pub fn audit_stream_closed(
1159    runtime: &RedDBRuntime,
1160    lease_handle: &str,
1161    principal: &str,
1162    reason: CloseReason,
1163    row_count: u64,
1164    bytes_written: u64,
1165) {
1166    use crate::json::{Map, Value as JsonValue};
1167    let mut stats = Map::new();
1168    stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1169    stats.insert(
1170        "bytes_written".to_string(),
1171        JsonValue::Number(bytes_written as f64),
1172    );
1173    let mut detail = Map::new();
1174    detail.insert(
1175        "lease_handle".to_string(),
1176        JsonValue::String(lease_handle.to_string()),
1177    );
1178    detail.insert(
1179        "reason".to_string(),
1180        JsonValue::String(reason.as_str().to_string()),
1181    );
1182    detail.insert("stats".to_string(), JsonValue::Object(stats));
1183    let outcome = match reason {
1184        CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1185        CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1186        _ => crate::runtime::audit_log::Outcome::Error,
1187    };
1188    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1189        .principal(principal)
1190        .resource(lease_handle.to_string())
1191        .outcome(outcome)
1192        .detail(JsonValue::Object(detail))
1193        .build();
1194    runtime.audit_log().record_event(event);
1195}
1196
1197pub fn audit_token_expired_during_lease(
1198    runtime: &RedDBRuntime,
1199    lease_handle: &str,
1200    principal: &str,
1201    token_expiry_ms: u64,
1202) {
1203    use crate::json::{Map, Value as JsonValue};
1204    let mut detail = Map::new();
1205    detail.insert(
1206        "lease_handle".to_string(),
1207        JsonValue::String(lease_handle.to_string()),
1208    );
1209    detail.insert(
1210        "token_expiry".to_string(),
1211        JsonValue::Number(token_expiry_ms as f64),
1212    );
1213    detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1214    let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1215        .principal(principal)
1216        .resource(lease_handle.to_string())
1217        .outcome(crate::runtime::audit_log::Outcome::Success)
1218        .detail(JsonValue::Object(detail))
1219        .build();
1220    runtime.audit_log().record_event(event);
1221}
1222
1223/// Capacity refusal emits a `stream.closed` event with no lease handle
1224/// (the open never produced one) and `reason: capacity_refused`. The
1225/// brief lists capacity_refused alongside the other close reasons; we
1226/// keep the wire shape identical so downstream audit-log consumers
1227/// don't have to special-case it.
1228pub fn audit_stream_capacity_refused(
1229    runtime: &RedDBRuntime,
1230    principal: &str,
1231    code: &str,
1232    limit: usize,
1233    current: usize,
1234) {
1235    use crate::json::{Map, Value as JsonValue};
1236    let mut detail = Map::new();
1237    detail.insert(
1238        "reason".to_string(),
1239        JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1240    );
1241    detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1242    detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1243    detail.insert("current".to_string(), JsonValue::Number(current as f64));
1244    let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1245        .principal(principal)
1246        .outcome(crate::runtime::audit_log::Outcome::Denied)
1247        .detail(JsonValue::Object(detail))
1248        .build();
1249    runtime.audit_log().record_event(event);
1250}
1251
1252/// Borrow the lease registry's clock by default — the routing handler
1253/// uses this, tests inject their own.
1254pub fn system_clock() -> Arc<dyn Clock> {
1255    static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1256    Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261    use super::*;
1262
1263    #[test]
1264    fn open_stream_refuses_when_session_has_active_transaction() {
1265        let clock = FakeClock::new(0);
1266        let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1267        assert_eq!(err, OpenStreamError::TransactionActive);
1268        assert_eq!(err.code(), "stream_in_transaction_unsupported");
1269    }
1270
1271    #[test]
1272    fn open_stream_succeeds_when_session_is_autocommit() {
1273        let clock = FakeClock::new(1_700_000_000_000);
1274        let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1275        assert_eq!(lease.snapshot_lsn, 123);
1276        assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1277        assert!(lease.id >= 1);
1278    }
1279
1280    #[test]
1281    fn lease_ids_are_unique_and_monotonic() {
1282        let clock = FakeClock::new(0);
1283        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1284        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1285        assert!(b.id > a.id);
1286    }
1287
1288    #[test]
1289    fn snapshot_expired_uses_injected_clock_and_ttl() {
1290        // TTL fake-clock test: advance past ttl_ms and snapshot_expired
1291        // flips. Acceptance criterion #5.
1292        let clock = FakeClock::new(0);
1293        let mut config = StreamConfig::DEFAULT;
1294        config.snapshot_ttl_ms = 5_000;
1295        let lease = open_stream(config, 0, false, &clock).unwrap();
1296
1297        assert!(!lease.snapshot_expired(clock.now_ms()));
1298        clock.advance(4_999);
1299        assert!(!lease.snapshot_expired(clock.now_ms()));
1300        clock.advance(1);
1301        assert!(lease.snapshot_expired(clock.now_ms()));
1302    }
1303
1304    #[test]
1305    fn stream_config_loads_defaults_when_kv_is_empty() {
1306        // Without runtime, just sanity-check the defaults match ADR 0029.
1307        let cfg = StreamConfig::DEFAULT;
1308        assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1309        assert_eq!(cfg.chunk_default_pages, 4);
1310        assert_eq!(cfg.chunk_min_pages, 1);
1311        assert_eq!(cfg.chunk_max_pages, 64);
1312        assert_eq!(cfg.chunk_max_rows, 1000);
1313        assert_eq!(cfg.chunk_max_latency_ms, 50);
1314        assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1315    }
1316
1317    #[test]
1318    fn stream_config_normalize_clamps_inconsistent_inputs() {
1319        let mut cfg = StreamConfig {
1320            snapshot_ttl_ms: 1,
1321            chunk_default_pages: 100,
1322            chunk_min_pages: 0,
1323            chunk_max_pages: 8,
1324            chunk_max_rows: 0,
1325            chunk_max_latency_ms: 1,
1326            max_global_streams: 0,
1327            max_per_principal_streams: 0,
1328            default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1329        };
1330        cfg.normalize();
1331        assert_eq!(cfg.chunk_min_pages, 1);
1332        assert_eq!(cfg.chunk_max_pages, 8);
1333        assert_eq!(cfg.chunk_default_pages, 8); // clamped down to max
1334        assert!(cfg.chunk_max_rows >= 1);
1335        assert!(cfg.max_global_streams >= 1);
1336        assert!(cfg.max_per_principal_streams >= 1);
1337    }
1338
1339    /// Test sink that accumulates flushed chunks into an interior-mutable
1340    /// `Vec`. Avoids the closure-capture-mutable-then-borrow-immutable
1341    /// dance that the borrow checker rejects when assertions are
1342    /// interleaved with `push_line` calls.
1343    struct CapturingSink {
1344        chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1345    }
1346    impl CapturingSink {
1347        fn new() -> Self {
1348            Self {
1349                chunks: std::cell::RefCell::new(Vec::new()),
1350            }
1351        }
1352        fn len(&self) -> usize {
1353            self.chunks.borrow().len()
1354        }
1355        fn last_len(&self) -> Option<usize> {
1356            self.chunks.borrow().last().map(|c| c.len())
1357        }
1358    }
1359
1360    fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1361        move |bytes: &[u8]| {
1362            sink.chunks.borrow_mut().push(bytes.to_vec());
1363            Ok(())
1364        }
1365    }
1366
1367    #[test]
1368    fn chunk_producer_flushes_on_byte_cap() {
1369        let clock = FakeClock::new(0);
1370        let cfg = StreamConfig {
1371            chunk_default_pages: 1, // 16 KiB
1372            chunk_min_pages: 1,
1373            chunk_max_pages: 1,
1374            chunk_max_rows: 1_000_000,
1375            chunk_max_latency_ms: 1_000_000,
1376            ..StreamConfig::DEFAULT
1377        };
1378        let sink = CapturingSink::new();
1379        let mut producer = ChunkProducer::new(&cfg, &clock);
1380        let mut flush = capture(&sink);
1381
1382        producer
1383            .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1384            .unwrap();
1385        assert_eq!(sink.len(), 0);
1386
1387        let triggered = producer
1388            .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1389            .unwrap();
1390        assert!(triggered);
1391        assert_eq!(sink.len(), 1);
1392        assert_eq!(sink.last_len(), Some(16 * 1024));
1393        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1394    }
1395
1396    #[test]
1397    fn chunk_producer_flushes_on_row_cap() {
1398        let clock = FakeClock::new(0);
1399        let cfg = StreamConfig {
1400            chunk_default_pages: 4, // 64 KiB — well above any test row size
1401            chunk_min_pages: 1,
1402            chunk_max_pages: 64,
1403            chunk_max_rows: 3,
1404            chunk_max_latency_ms: 1_000_000,
1405            ..StreamConfig::DEFAULT
1406        };
1407        let sink = CapturingSink::new();
1408        let mut producer = ChunkProducer::new(&cfg, &clock);
1409        let mut flush = capture(&sink);
1410        let row = b"{\"row\":{}}\n";
1411        producer.push_line(row, &mut flush).unwrap();
1412        producer.push_line(row, &mut flush).unwrap();
1413        assert_eq!(sink.len(), 0);
1414        let triggered = producer.push_line(row, &mut flush).unwrap();
1415        assert!(triggered);
1416        assert_eq!(sink.len(), 1);
1417        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1418    }
1419
1420    #[test]
1421    fn chunk_producer_flushes_on_latency_cap() {
1422        let clock = FakeClock::new(0);
1423        let cfg = StreamConfig {
1424            chunk_default_pages: 4,
1425            chunk_min_pages: 1,
1426            chunk_max_pages: 64,
1427            chunk_max_rows: 1_000_000,
1428            chunk_max_latency_ms: 50,
1429            ..StreamConfig::DEFAULT
1430        };
1431        let sink = CapturingSink::new();
1432        let mut producer = ChunkProducer::new(&cfg, &clock);
1433        let mut flush = capture(&sink);
1434        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1435        assert_eq!(sink.len(), 0);
1436        clock.advance(60);
1437        let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1438        assert!(triggered);
1439        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1440    }
1441
1442    #[test]
1443    fn chunk_producer_finish_emits_terminal_flush() {
1444        let clock = FakeClock::new(0);
1445        let cfg = StreamConfig::DEFAULT;
1446        let sink = CapturingSink::new();
1447        let mut producer = ChunkProducer::new(&cfg, &clock);
1448        let mut flush = capture(&sink);
1449        producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1450        producer.finish(&mut flush).unwrap();
1451        assert_eq!(sink.len(), 1);
1452        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1453    }
1454
1455    #[test]
1456    fn write_chunked_helpers_produce_well_formed_chunks() {
1457        let mut buf: Vec<u8> = Vec::new();
1458        write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1459        write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1460        write_chunked_terminator(&mut buf).unwrap();
1461        let text = String::from_utf8(buf).unwrap();
1462        assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1463        assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1464        // 11 bytes in `{"row":{}}\n` → hex 'b'.
1465        assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1466        assert!(text.ends_with("0\r\n\r\n"));
1467    }
1468
1469    // ──────── Issue #761 / S2 — capacity registry ────────
1470
1471    #[test]
1472    fn capacity_registry_global_exhausted_returns_structured_error() {
1473        let reg = StreamCapacityRegistry::new();
1474        let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1475        let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1476        let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1477        assert_eq!(
1478            err,
1479            AcquireError::GlobalExhausted {
1480                limit: 2,
1481                current: 2,
1482            }
1483        );
1484        assert_eq!(err.code(), "server_stream_capacity_exhausted");
1485    }
1486
1487    #[test]
1488    fn capacity_registry_per_principal_exhausted_independent_of_global() {
1489        // Acceptance criterion #2: per-principal cap fires even when
1490        // global has room. Acceptance criterion #3: counters are
1491        // independent across principals.
1492        let reg = StreamCapacityRegistry::new();
1493        let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1494        let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1495        let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1496        assert_eq!(
1497            err,
1498            AcquireError::PrincipalExhausted {
1499                principal: "alice".to_string(),
1500                limit: 2,
1501                current: 2,
1502            }
1503        );
1504        assert_eq!(err.code(), "principal_stream_quota_exhausted");
1505
1506        // Bob is unaffected by Alice's quota.
1507        let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1508        let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1509    }
1510
1511    #[test]
1512    fn capacity_registry_release_frees_both_counters() {
1513        // Acceptance criterion #4: drop releases both counters.
1514        let reg = StreamCapacityRegistry::new();
1515        let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1516        assert!(reg.try_acquire("alice", 1, 1).is_err());
1517        drop(g1);
1518        let (global, per_principal) = reg.snapshot();
1519        assert_eq!(global, 0);
1520        assert!(per_principal.is_empty());
1521        // Slot is now reclaimable.
1522        let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1523    }
1524
1525    #[test]
1526    fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1527        // Acceptance criterion #5: stress coverage. Spawn `THREADS`
1528        // threads each running `ITERS` acquire+release cycles against
1529        // a registry sized to fit only `CAP` slots; the live count
1530        // must never exceed `CAP`, and the registry must return to
1531        // zero once every thread has joined.
1532        use std::sync::atomic::{AtomicUsize, Ordering};
1533
1534        const THREADS: usize = 16;
1535        const ITERS: usize = 200;
1536        const CAP_GLOBAL: usize = 4;
1537        const CAP_PER_PRINCIPAL: usize = 4;
1538
1539        let reg = StreamCapacityRegistry::new();
1540        let observed_max = Arc::new(AtomicUsize::new(0));
1541        let mut handles = Vec::new();
1542        for tid in 0..THREADS {
1543            let reg = Arc::clone(&reg);
1544            let observed_max = Arc::clone(&observed_max);
1545            // Two principals share the global cap, each capped at
1546            // `CAP_PER_PRINCIPAL` themselves.
1547            let principal = format!("p{}", tid % 2);
1548            handles.push(std::thread::spawn(move || {
1549                for _ in 0..ITERS {
1550                    if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1551                        let (live, _) = reg.snapshot();
1552                        observed_max.fetch_max(live, Ordering::SeqCst);
1553                        // Hold the slot just long enough to let other
1554                        // threads race against the cap.
1555                        std::thread::yield_now();
1556                        drop(guard);
1557                    }
1558                }
1559            }));
1560        }
1561        for h in handles {
1562            h.join().unwrap();
1563        }
1564        let (global_after, per_principal_after) = reg.snapshot();
1565        assert_eq!(global_after, 0, "global counter leaked");
1566        assert!(
1567            per_principal_after.is_empty(),
1568            "per-principal map leaked: {per_principal_after:?}"
1569        );
1570        assert!(
1571            observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1572            "global cap was breached: observed {} > {}",
1573            observed_max.load(Ordering::SeqCst),
1574            CAP_GLOBAL
1575        );
1576    }
1577
1578    // ──────── Issue #766 / S7 — resume coordinator ────────
1579
1580    #[test]
1581    fn assess_resumability_accepts_plain_select() {
1582        assert!(assess_resumability("SELECT id, name FROM t"));
1583        assert!(assess_resumability("select * from t where id > 5"));
1584        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1585        assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1586        assert!(assess_resumability(
1587            "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1588        ));
1589    }
1590
1591    #[test]
1592    fn assess_resumability_rejects_aggregates_and_unordered() {
1593        assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1594        assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1595        assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1596        assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1597        assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1598        assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1599        assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1600        assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1601        assert!(!assess_resumability(
1602            "SELECT a FROM t JOIN u ON t.id = u.id"
1603        ));
1604    }
1605
1606    #[test]
1607    fn lease_registry_records_and_expires_against_ttl() {
1608        let reg = LeaseRegistry::new();
1609        reg.record(42, 1_000, 5_000);
1610        assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1611        assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1612        assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1613        assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1614    }
1615
1616    #[test]
1617    fn prefix_hasher_is_order_sensitive_and_deterministic() {
1618        let mut a = PrefixHasher::new();
1619        a.update(b"{\"row\":{\"id\":1}}");
1620        a.update(b"{\"row\":{\"id\":2}}");
1621        let hash_a = a.finalize_hex();
1622
1623        let mut b = PrefixHasher::new();
1624        b.update(b"{\"row\":{\"id\":1}}");
1625        b.update(b"{\"row\":{\"id\":2}}");
1626        let hash_b = b.finalize_hex();
1627        assert_eq!(hash_a, hash_b);
1628
1629        let mut c = PrefixHasher::new();
1630        c.update(b"{\"row\":{\"id\":2}}");
1631        c.update(b"{\"row\":{\"id\":1}}");
1632        assert_ne!(hash_a, c.finalize_hex());
1633        assert_eq!(hash_a.len(), 64);
1634    }
1635
1636    // ──────── Issue #767 / S8 — opaque lease handle + audit ────────
1637
1638    #[test]
1639    fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1640        // Acceptance criterion #3 — wire-visible handle is opaque,
1641        // 128-bit, non-sequential. The internal monotonic `id` is
1642        // independent of the handle.
1643        let clock = FakeClock::new(0);
1644        let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1645        let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1646        assert_eq!(
1647            a.lease_handle.len(),
1648            LEASE_HANDLE_BYTES * 2,
1649            "handle must be 128 bits hex-encoded: {}",
1650            a.lease_handle
1651        );
1652        assert!(
1653            a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1654            "handle must be hex: {}",
1655            a.lease_handle
1656        );
1657        assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1658        // Sanity — internal id remains monotonic, handle is not
1659        // derivable from it (handle bytes don't encode the id).
1660        assert!(b.id > a.id);
1661    }
1662
1663    #[test]
1664    fn generate_lease_handle_produces_high_entropy_distinct_values() {
1665        // Defensive — large sample to surface a hypothetical CSPRNG
1666        // mis-wiring. 1024 draws with zero collisions on 128 bits is
1667        // statistically certain when the source is sound.
1668        let mut seen = std::collections::HashSet::new();
1669        for _ in 0..1024 {
1670            assert!(
1671                seen.insert(generate_lease_handle()),
1672                "duplicate handle in CSPRNG sequence"
1673            );
1674        }
1675    }
1676
1677    #[test]
1678    fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1679        // Synthetic JWT — header.payload.signature. We only consult
1680        // the payload (no signature verification).
1681        // Payload: {"exp":1700000000}
1682        // Base64url of that JSON: eyJleHAiOjE3MDAwMDAwMDB9
1683        let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1684        assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1685    }
1686
1687    #[test]
1688    fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1689        assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1690        assert_eq!(parse_jwt_exp_ms("only.two"), None);
1691        assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1692    }
1693
1694    // ──────── Issue #807 / 750c — cursor registry ────────
1695
1696    fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1697        reg.register(
1698            42,
1699            "acme",
1700            "bearer:abc",
1701            "SELECT id FROM users ORDER BY rid",
1702            None,
1703            None,
1704            now_ms,
1705            1_000,
1706        )
1707    }
1708
1709    #[test]
1710    fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1711        let reg = CursorRegistry::default();
1712        let a = register_default_cursor(&reg, 0);
1713        let b = register_default_cursor(&reg, 0);
1714        assert_eq!(
1715            a.len(),
1716            CURSOR_TOKEN_BYTES * 2,
1717            "token must be 192 bits hex-encoded: {a}"
1718        );
1719        assert!(
1720            a.chars().all(|c| c.is_ascii_hexdigit()),
1721            "token must be hex: {a}"
1722        );
1723        assert_ne!(a, b, "tokens must differ across registrations");
1724        assert_eq!(reg.len(), 2);
1725    }
1726
1727    #[test]
1728    fn cursor_resolves_for_owner_within_ttl() {
1729        // Happy-path resume — same tenant + principal, before TTL.
1730        let reg = CursorRegistry::default();
1731        let token = register_default_cursor(&reg, 1_000);
1732        let resume = reg
1733            .resolve(&token, "acme", "bearer:abc", 1_500)
1734            .expect("live cursor resolves for its owner");
1735        assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1736        assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1737        assert_eq!(resume.expires_at_ms, 2_000);
1738    }
1739
1740    #[test]
1741    fn cursor_rejects_after_ttl_for_owner() {
1742        // TTL expiry — owner sees a distinct `Expired` so they can
1743        // re-issue, but the cursor no longer resolves.
1744        let reg = CursorRegistry::default();
1745        let token = register_default_cursor(&reg, 0);
1746        assert_eq!(
1747            reg.resolve(&token, "acme", "bearer:abc", 1_000),
1748            Err(CursorReject::Expired),
1749            "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1750        );
1751        assert_eq!(
1752            reg.resolve(&token, "acme", "bearer:abc", 5_000),
1753            Err(CursorReject::Expired)
1754        );
1755    }
1756
1757    #[test]
1758    fn cursor_cross_tenant_is_masked_as_not_found() {
1759        // Tenant isolation — a different tenant cannot tell the token
1760        // from one that never existed (no existence leak), even while the
1761        // cursor is still live for its real owner.
1762        let reg = CursorRegistry::default();
1763        let token = register_default_cursor(&reg, 0);
1764        assert_eq!(
1765            reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1766            Err(CursorReject::NotFound),
1767            "cross-tenant resume must mask existence as NotFound"
1768        );
1769        // Still live for the rightful tenant.
1770        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1771    }
1772
1773    #[test]
1774    fn cursor_cross_principal_is_masked_as_not_found() {
1775        // Principal isolation — same tenant, different principal is also
1776        // masked as NotFound rather than leaking a permission error.
1777        let reg = CursorRegistry::default();
1778        let token = register_default_cursor(&reg, 0);
1779        assert_eq!(
1780            reg.resolve(&token, "acme", "bearer:other", 100),
1781            Err(CursorReject::NotFound),
1782            "cross-principal resume must mask existence as NotFound"
1783        );
1784        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1785    }
1786
1787    #[test]
1788    fn cursor_unknown_token_is_not_found() {
1789        let reg = CursorRegistry::default();
1790        assert!(reg.is_empty());
1791        assert_eq!(
1792            reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1793            Err(CursorReject::NotFound)
1794        );
1795    }
1796
1797    #[test]
1798    fn cursor_scope_is_checked_before_expiry() {
1799        // An unauthorized caller hitting an *expired* token must still see
1800        // NotFound, never Expired — Expired would confirm the token once
1801        // existed. Scope precedes expiry in `resolve`.
1802        let reg = CursorRegistry::default();
1803        let token = register_default_cursor(&reg, 0);
1804        assert_eq!(
1805            reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1806            Err(CursorReject::NotFound),
1807            "expired + wrong scope must mask as NotFound, not Expired"
1808        );
1809    }
1810
1811    // ──────── Issue #808 / 750d — cancel + tombstone ────────
1812
1813    #[test]
1814    fn cancel_tombstones_cursor_and_raises_token() {
1815        // Cancelling a live cursor raises its shared token (so an in-flight
1816        // executor observing it stops) and tombstones the entry.
1817        let reg = CursorRegistry::default();
1818        let token = register_default_cursor(&reg, 0);
1819        let live = reg
1820            .cancel_token_for(&token)
1821            .expect("freshly-minted cursor exposes its token");
1822        assert!(!live.is_cancelled(), "token starts un-cancelled");
1823
1824        let returned = reg
1825            .cancel(&token, "acme", "bearer:abc")
1826            .expect("owner cancels its own cursor");
1827        assert!(returned.is_cancelled(), "cancel raises the returned token");
1828        assert!(
1829            live.is_cancelled(),
1830            "the handler-held clone observes the cancel"
1831        );
1832    }
1833
1834    #[test]
1835    fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1836        // A tombstoned cursor refuses resume with a dedicated `Cancelled`
1837        // reason for its owner — distinct from Expired so the client learns
1838        // the stream was cancelled, not aged out.
1839        let reg = CursorRegistry::default();
1840        let token = register_default_cursor(&reg, 0);
1841        reg.cancel(&token, "acme", "bearer:abc")
1842            .expect("owner cancels");
1843        assert_eq!(
1844            reg.resolve(&token, "acme", "bearer:abc", 100),
1845            Err(CursorReject::Cancelled),
1846            "owner resuming a cancelled cursor sees Cancelled"
1847        );
1848    }
1849
1850    #[test]
1851    fn cancel_is_idempotent() {
1852        let reg = CursorRegistry::default();
1853        let token = register_default_cursor(&reg, 0);
1854        assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1855        // Second cancel still succeeds (already tombstoned) and stays cancelled.
1856        let second = reg
1857            .cancel(&token, "acme", "bearer:abc")
1858            .expect("re-cancel is a no-op success");
1859        assert!(second.is_cancelled());
1860    }
1861
1862    #[test]
1863    fn cancel_cross_tenant_is_masked_as_not_found() {
1864        // An unauthorized caller cannot cancel — or confirm the existence
1865        // of — a foreign cursor.
1866        let reg = CursorRegistry::default();
1867        let token = register_default_cursor(&reg, 0);
1868        assert!(
1869            matches!(
1870                reg.cancel(&token, "evil-corp", "bearer:abc"),
1871                Err(CursorReject::NotFound)
1872            ),
1873            "cross-tenant cancel must mask existence as NotFound"
1874        );
1875        // The rightful owner's cursor is untouched — still resumable.
1876        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1877    }
1878
1879    #[test]
1880    fn cancel_cross_principal_is_masked_as_not_found() {
1881        let reg = CursorRegistry::default();
1882        let token = register_default_cursor(&reg, 0);
1883        assert!(
1884            matches!(
1885                reg.cancel(&token, "acme", "bearer:other"),
1886                Err(CursorReject::NotFound)
1887            ),
1888            "cross-principal cancel must mask existence as NotFound"
1889        );
1890        assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1891    }
1892
1893    #[test]
1894    fn cancel_unknown_token_is_not_found() {
1895        let reg = CursorRegistry::default();
1896        assert!(matches!(
1897            reg.cancel("deadbeef", "acme", "bearer:abc"),
1898            Err(CursorReject::NotFound)
1899        ));
1900    }
1901
1902    #[test]
1903    fn generate_cursor_token_produces_high_entropy_distinct_values() {
1904        let mut seen = std::collections::HashSet::new();
1905        for _ in 0..1024 {
1906            assert!(
1907                seen.insert(generate_cursor_token()),
1908                "duplicate cursor token in CSPRNG sequence"
1909            );
1910        }
1911    }
1912
1913    #[test]
1914    fn close_reason_as_str_covers_every_state_transition() {
1915        // Acceptance criterion #5 — every state transition in the
1916        // brief is representable in the audit close-reason taxonomy.
1917        for (variant, expected) in [
1918            (CloseReason::Ok, "ok"),
1919            (CloseReason::Cancelled, "cancelled"),
1920            (CloseReason::Error, "error"),
1921            (CloseReason::SnapshotExpired, "snapshot_expired"),
1922            (CloseReason::CapacityRefused, "capacity_refused"),
1923            (CloseReason::IntegrityFailed, "integrity_failed"),
1924        ] {
1925            assert_eq!(variant.as_str(), expected);
1926        }
1927    }
1928
1929    #[test]
1930    fn stream_config_defaults_carry_s2_caps() {
1931        assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1932        assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1933    }
1934
1935    // ──────── Issue #768 / S9 — pull-based driver ────────
1936
1937    /// Sink that records every flushed chunk's length so a test can
1938    /// assert the resident working set stayed bounded.
1939    struct SizeSink {
1940        sizes: std::cell::RefCell<Vec<usize>>,
1941    }
1942    impl SizeSink {
1943        fn new() -> Self {
1944            Self {
1945                sizes: std::cell::RefCell::new(Vec::new()),
1946            }
1947        }
1948        fn flushes(&self) -> usize {
1949            self.sizes.borrow().len()
1950        }
1951        fn max_chunk(&self) -> usize {
1952            self.sizes.borrow().iter().copied().max().unwrap_or(0)
1953        }
1954    }
1955    fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1956        move |bytes: &[u8]| {
1957            sink.sizes.borrow_mut().push(bytes.len());
1958            Ok(())
1959        }
1960    }
1961
1962    #[test]
1963    fn drive_lines_streams_large_source_with_bounded_working_set() {
1964        // Acceptance #1: a huge scan flows through the chunk buffer
1965        // without ever materialising the full result set. The source
1966        // is a *lazy* range mapped to records — collecting it would
1967        // allocate N rows, but `drive_lines` pulls one at a time. We
1968        // assert every flushed chunk stays within a small multiple of
1969        // the page buffer, i.e. memory tracks the buffer, not N.
1970        let clock = FakeClock::new(0);
1971        let cfg = StreamConfig {
1972            chunk_default_pages: 1, // 16 KiB buffer
1973            chunk_min_pages: 1,
1974            chunk_max_pages: 1,
1975            chunk_max_rows: 1_000_000, // don't let the row cap dominate
1976            chunk_max_latency_ms: 1_000_000,
1977            ..StreamConfig::DEFAULT
1978        };
1979        let sink = SizeSink::new();
1980        let mut producer = ChunkProducer::new(&cfg, &clock);
1981        let mut flush = size_capture(&sink);
1982
1983        const N: u64 = 1_000_000;
1984        // Lazy source: never collected into a Vec.
1985        let source = 0..N;
1986        let consumed = producer
1987            .drive_lines(
1988                source,
1989                |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1990                &mut flush,
1991            )
1992            .unwrap();
1993        producer.finish(&mut flush).unwrap();
1994
1995        assert_eq!(consumed, N);
1996        assert_eq!(producer.total_rows(), N);
1997        // Many flushes occurred (streamed), not one giant buffer.
1998        assert!(
1999            sink.flushes() > 1000,
2000            "expected the source to stream across many chunks, saw {}",
2001            sink.flushes()
2002        );
2003        // No chunk exceeded the byte cap plus one trailing line — the
2004        // resident buffer is bounded independent of N.
2005        let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
2006        assert!(
2007            sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
2008            "chunk {} exceeded bounded working set {}",
2009            sink.max_chunk(),
2010            cfg.production_buffer_bytes() + max_line
2011        );
2012    }
2013
2014    #[test]
2015    fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2016        // Acceptance #2: first-row latency is bounded by the latency
2017        // cap, not by full materialisation. The source yields rows
2018        // whose pull advances the fake clock; the first chunk must
2019        // flush as soon as the latency window elapses, long before the
2020        // (large) source is exhausted.
2021        let clock = FakeClock::new(0);
2022        let cfg = StreamConfig {
2023            chunk_default_pages: 64, // large byte cap — won't trip first
2024            chunk_min_pages: 1,
2025            chunk_max_pages: 64,
2026            chunk_max_rows: 1_000_000, // large row cap — won't trip first
2027            chunk_max_latency_ms: 50,
2028            ..StreamConfig::DEFAULT
2029        };
2030        let sink = SizeSink::new();
2031        let mut producer = ChunkProducer::new(&cfg, &clock);
2032
2033        // Drive manually so we can advance the clock between pulls and
2034        // observe the first flush. Each pull advances 20 ms; the 50 ms
2035        // latency cap trips on the third row.
2036        let mut first_flush_after: Option<u64> = None;
2037        let mut row = 0u64;
2038        while row < 1_000_000 {
2039            let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2040            clock.advance(20);
2041            let mut flush = size_capture(&sink);
2042            let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2043            row += 1;
2044            if flushed {
2045                first_flush_after = Some(row);
2046                break;
2047            }
2048        }
2049
2050        assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2051        let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2052        assert!(
2053            rows_before_flush <= 4,
2054            "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2055        );
2056        // Crucially, the source was nowhere near drained (1e6 rows).
2057        assert!(rows_before_flush < 1_000_000);
2058    }
2059
2060    #[test]
2061    fn drive_lines_parity_with_manual_push_line() {
2062        // The driver must produce byte-identical output to hand-rolled
2063        // push_line calls — the chunk producer's framing is unchanged.
2064        let clock = FakeClock::new(0);
2065        let cfg = StreamConfig::DEFAULT;
2066
2067        let lines: Vec<Vec<u8>> = (0..50)
2068            .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2069            .collect();
2070
2071        let driven = CapturingSink::new();
2072        {
2073            let mut p = ChunkProducer::new(&cfg, &clock);
2074            let mut flush = capture(&driven);
2075            p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2076                .unwrap();
2077            p.finish(&mut flush).unwrap();
2078        }
2079
2080        let manual = CapturingSink::new();
2081        {
2082            let mut p = ChunkProducer::new(&cfg, &clock);
2083            let mut flush = capture(&manual);
2084            for l in &lines {
2085                p.push_line(l, &mut flush).unwrap();
2086            }
2087            p.finish(&mut flush).unwrap();
2088        }
2089
2090        assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2091    }
2092}