1use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33pub const PAGE_SIZE: usize = 16 * 1024;
36
37pub trait Clock: Send + Sync {
40 fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47 fn now_ms(&self) -> u64 {
48 SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .map(|d| d.as_millis() as u64)
51 .unwrap_or(0)
52 }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57 now_ms: AtomicU64,
58}
59
60impl FakeClock {
61 pub fn new(now_ms: u64) -> Self {
62 Self {
63 now_ms: AtomicU64::new(now_ms),
64 }
65 }
66
67 pub fn advance(&self, ms: u64) {
68 self.now_ms.fetch_add(ms, Ordering::SeqCst);
69 }
70}
71
72impl Clock for FakeClock {
73 fn now_ms(&self) -> u64 {
74 self.now_ms.load(Ordering::SeqCst)
75 }
76}
77
78#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84 pub snapshot_ttl_ms: u64,
85 pub chunk_default_pages: usize,
86 pub chunk_min_pages: usize,
87 pub chunk_max_pages: usize,
88 pub chunk_max_rows: usize,
89 pub chunk_max_latency_ms: u64,
90 pub max_global_streams: usize,
94 pub max_per_principal_streams: usize,
99 pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106 fn default() -> Self {
107 Self::DEFAULT
108 }
109}
110
111impl StreamConfig {
112 pub const DEFAULT: StreamConfig = StreamConfig {
115 snapshot_ttl_ms: 60_000,
116 chunk_default_pages: 4,
117 chunk_min_pages: 1,
118 chunk_max_pages: 64,
119 chunk_max_rows: 1000,
120 chunk_max_latency_ms: 50,
121 max_global_streams: 256,
122 max_per_principal_streams: 32,
123 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124 };
125
126 pub fn load(runtime: &RedDBRuntime) -> Self {
131 let db = runtime.db();
132 let read_u64 = |key: &str| -> Option<u64> {
133 match db.get_kv(RED_CONFIG_COLLECTION, key) {
134 Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135 Some((Value::UnsignedInteger(v), _)) => Some(v),
136 Some((Value::Text(text), _)) => text.parse().ok(),
137 _ => None,
138 }
139 };
140
141 let mut cfg = Self::DEFAULT;
142 if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143 cfg.snapshot_ttl_ms = v;
144 }
145 if let Some(v) = read_u64("stream.chunk.default_pages") {
146 cfg.chunk_default_pages = v as usize;
147 }
148 if let Some(v) = read_u64("stream.chunk.min_pages") {
149 cfg.chunk_min_pages = v as usize;
150 }
151 if let Some(v) = read_u64("stream.chunk.max_pages") {
152 cfg.chunk_max_pages = v as usize;
153 }
154 if let Some(v) = read_u64("stream.chunk.max_rows") {
155 cfg.chunk_max_rows = v as usize;
156 }
157 if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158 cfg.chunk_max_latency_ms = v;
159 }
160 if let Some(v) = read_u64("stream.max_global") {
161 cfg.max_global_streams = v as usize;
162 }
163 if let Some(v) = read_u64("stream.max_per_principal") {
164 cfg.max_per_principal_streams = v as usize;
165 }
166 if let Some((Value::Text(text), _)) =
169 db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170 {
171 cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172 }
173 cfg.normalize();
174 cfg
175 }
176
177 fn normalize(&mut self) {
181 if self.chunk_min_pages == 0 {
182 self.chunk_min_pages = 1;
183 }
184 if self.chunk_max_pages < self.chunk_min_pages {
185 self.chunk_max_pages = self.chunk_min_pages;
186 }
187 if self.chunk_default_pages < self.chunk_min_pages {
188 self.chunk_default_pages = self.chunk_min_pages;
189 }
190 if self.chunk_default_pages > self.chunk_max_pages {
191 self.chunk_default_pages = self.chunk_max_pages;
192 }
193 if self.chunk_max_rows == 0 {
194 self.chunk_max_rows = 1;
195 }
196 if self.max_global_streams == 0 {
197 self.max_global_streams = 1;
198 }
199 if self.max_per_principal_streams == 0 {
200 self.max_per_principal_streams = 1;
201 }
202 }
203
204 pub fn production_buffer_bytes(&self) -> usize {
207 self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208 }
209}
210
211static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220pub const LEASE_HANDLE_BYTES: usize = 16;
226
227pub fn generate_lease_handle() -> String {
231 let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233 let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239 let now = crate::utils::now_unix_nanos().to_le_bytes();
240 bytes[..8].copy_from_slice(&lo);
241 bytes[8..].copy_from_slice(&now);
242 }
243 crate::utils::to_hex(&bytes)
244}
245
246pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256 let parts: Vec<&str> = token.split('.').collect();
257 if parts.len() != 3 {
258 return None;
259 }
260 let payload = base64url_decode_padded(parts[1])?;
261 let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262 let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263 if exp_secs <= 0 {
264 return None;
265 }
266 Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270 let mut s = String::with_capacity(input.len() + 4);
271 for ch in input.chars() {
272 match ch {
273 '-' => s.push('+'),
274 '_' => s.push('/'),
275 _ => s.push(ch),
276 }
277 }
278 while !s.len().is_multiple_of(4) {
279 s.push('=');
280 }
281 crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289 Ok,
290 Cancelled,
291 Error,
292 SnapshotExpired,
293 CapacityRefused,
294 IntegrityFailed,
295}
296
297impl CloseReason {
298 pub fn as_str(&self) -> &'static str {
299 match self {
300 CloseReason::Ok => "ok",
301 CloseReason::Cancelled => "cancelled",
302 CloseReason::Error => "error",
303 CloseReason::SnapshotExpired => "snapshot_expired",
304 CloseReason::CapacityRefused => "capacity_refused",
305 CloseReason::IntegrityFailed => "integrity_failed",
306 }
307 }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312 pub id: u64,
313 pub lease_handle: String,
317 pub snapshot_lsn: u64,
318 pub opened_at_ms: u64,
319 pub config: StreamConfig,
320}
321
322impl StreamLease {
323 pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330 now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331 }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336 TransactionActive,
340}
341
342impl OpenStreamError {
343 pub fn code(&self) -> &'static str {
344 match self {
345 OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346 }
347 }
348
349 pub fn message(&self) -> &'static str {
350 match self {
351 OpenStreamError::TransactionActive => {
352 "cannot open output stream while a transaction is active on this session"
353 }
354 }
355 }
356}
357
358pub fn open_stream(
363 config: StreamConfig,
364 snapshot_lsn: u64,
365 in_transaction: bool,
366 clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368 if in_transaction {
369 return Err(OpenStreamError::TransactionActive);
370 }
371 Ok(StreamLease {
372 id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373 lease_handle: generate_lease_handle(),
374 snapshot_lsn,
375 opened_at_ms: clock.now_ms(),
376 config,
377 })
378}
379
380pub struct ChunkProducer<'a> {
388 buf: Vec<u8>,
389 rows: usize,
390 window_started_ms: u64,
391 cap_bytes: usize,
392 cap_rows: usize,
393 cap_latency_ms: u64,
394 clock: &'a dyn Clock,
395 total_flushes: u64,
396 total_bytes: u64,
397 total_rows: u64,
398 last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403 Byte,
404 Row,
405 Latency,
406 Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410 pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411 let cap_bytes = config.production_buffer_bytes();
412 Self {
413 buf: Vec::with_capacity(cap_bytes),
414 rows: 0,
415 window_started_ms: clock.now_ms(),
416 cap_bytes,
417 cap_rows: config.chunk_max_rows,
418 cap_latency_ms: config.chunk_max_latency_ms,
419 clock,
420 total_flushes: 0,
421 total_bytes: 0,
422 total_rows: 0,
423 last_flush_reason: None,
424 }
425 }
426
427 pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430 where
431 F: FnMut(&[u8]) -> std::io::Result<()>,
432 {
433 self.buf.extend_from_slice(line);
434 self.rows += 1;
435 self.total_rows += 1;
436
437 if self.buf.len() >= self.cap_bytes {
438 self.flush(flush, FlushReason::Byte)?;
439 return Ok(true);
440 }
441 if self.rows >= self.cap_rows {
442 self.flush(flush, FlushReason::Row)?;
443 return Ok(true);
444 }
445 let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446 if elapsed >= self.cap_latency_ms {
447 self.flush(flush, FlushReason::Latency)?;
448 return Ok(true);
449 }
450 Ok(false)
451 }
452
453 pub fn drive_lines<S, R, Enc, F>(
471 &mut self,
472 source: S,
473 mut encode: Enc,
474 flush: &mut F,
475 ) -> std::io::Result<u64>
476 where
477 S: IntoIterator<Item = R>,
478 Enc: FnMut(&R) -> Vec<u8>,
479 F: FnMut(&[u8]) -> std::io::Result<()>,
480 {
481 let mut count = 0u64;
482 for record in source {
483 let line = encode(&record);
484 self.push_line(&line, flush)?;
485 count += 1;
486 }
487 Ok(count)
488 }
489
490 pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494 where
495 F: FnMut(&[u8]) -> std::io::Result<()>,
496 {
497 if !self.buf.is_empty() {
498 self.flush(flush, FlushReason::Terminal)?;
499 }
500 Ok(())
501 }
502
503 fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504 where
505 F: FnMut(&[u8]) -> std::io::Result<()>,
506 {
507 flush(&self.buf)?;
508 self.total_bytes += self.buf.len() as u64;
509 self.total_flushes += 1;
510 self.last_flush_reason = Some(reason);
511 self.buf.clear();
512 self.rows = 0;
513 self.window_started_ms = self.clock.now_ms();
514 Ok(())
515 }
516
517 pub fn total_flushes(&self) -> u64 {
518 self.total_flushes
519 }
520 pub fn total_bytes(&self) -> u64 {
521 self.total_bytes
522 }
523 pub fn total_rows(&self) -> u64 {
524 self.total_rows
525 }
526 pub fn last_flush_reason(&self) -> Option<FlushReason> {
527 self.last_flush_reason
528 }
529}
530
531pub fn write_chunked_response_header<W: std::io::Write>(
535 writer: &mut W,
536 status: u16,
537 content_type: &str,
538) -> std::io::Result<()> {
539 let header = format!(
540 "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n\r\n",
541 status,
542 crate::server::transport::status_text(status),
543 content_type,
544 );
545 writer.write_all(header.as_bytes())?;
546 writer.flush()
547}
548
549pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
553 if bytes.is_empty() {
554 return Ok(());
555 }
556 let size = format!("{:x}\r\n", bytes.len());
557 writer.write_all(size.as_bytes())?;
558 writer.write_all(bytes)?;
559 writer.write_all(b"\r\n")?;
560 writer.flush()
561}
562
563pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
565 writer.write_all(b"0\r\n\r\n")?;
566 writer.flush()
567}
568
569#[derive(Debug, Default)]
577pub struct StreamCapacityRegistry {
578 inner: Mutex<CapacityInner>,
579}
580
581#[derive(Debug, Default)]
582struct CapacityInner {
583 global_count: usize,
584 per_principal: HashMap<String, usize>,
585}
586
587#[derive(Debug, Clone, PartialEq, Eq)]
592pub enum AcquireError {
593 GlobalExhausted { limit: usize, current: usize },
595 PrincipalExhausted {
599 principal: String,
600 limit: usize,
601 current: usize,
602 },
603}
604
605impl AcquireError {
606 pub fn code(&self) -> &'static str {
607 match self {
608 AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
609 AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
610 }
611 }
612
613 pub fn message(&self) -> String {
614 match self {
615 AcquireError::GlobalExhausted { limit, current } => {
616 format!("server stream capacity exhausted (limit {limit}, current {current})")
617 }
618 AcquireError::PrincipalExhausted {
619 principal,
620 limit,
621 current,
622 } => format!(
623 "principal {principal} stream quota exhausted (limit {limit}, current {current})"
624 ),
625 }
626 }
627}
628
629impl StreamCapacityRegistry {
630 pub fn new() -> Arc<Self> {
631 Arc::new(Self::default())
632 }
633
634 pub fn try_acquire(
638 self: &Arc<Self>,
639 principal: &str,
640 max_global: usize,
641 max_per_principal: usize,
642 ) -> Result<StreamCapacityGuard, AcquireError> {
643 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
644 if inner.global_count >= max_global {
645 return Err(AcquireError::GlobalExhausted {
646 limit: max_global,
647 current: inner.global_count,
648 });
649 }
650 let current = inner.per_principal.get(principal).copied().unwrap_or(0);
651 if current >= max_per_principal {
652 return Err(AcquireError::PrincipalExhausted {
653 principal: principal.to_string(),
654 limit: max_per_principal,
655 current,
656 });
657 }
658 inner.global_count += 1;
659 inner
660 .per_principal
661 .insert(principal.to_string(), current + 1);
662 Ok(StreamCapacityGuard {
663 registry: Arc::clone(self),
664 principal: principal.to_string(),
665 released: false,
666 })
667 }
668
669 fn release(&self, principal: &str) {
670 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
671 if inner.global_count > 0 {
672 inner.global_count -= 1;
673 }
674 if let Some(count) = inner.per_principal.get_mut(principal) {
675 if *count > 0 {
676 *count -= 1;
677 }
678 if *count == 0 {
679 inner.per_principal.remove(principal);
680 }
681 }
682 }
683
684 pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
686 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
687 (inner.global_count, inner.per_principal.clone())
688 }
689}
690
691#[must_use = "dropping the guard immediately releases the stream slot"]
696#[derive(Debug)]
697pub struct StreamCapacityGuard {
698 registry: Arc<StreamCapacityRegistry>,
699 principal: String,
700 released: bool,
701}
702
703impl StreamCapacityGuard {
704 pub fn principal(&self) -> &str {
705 &self.principal
706 }
707}
708
709impl Drop for StreamCapacityGuard {
710 fn drop(&mut self) {
711 if !self.released {
712 self.registry.release(&self.principal);
713 self.released = true;
714 }
715 }
716}
717
718pub fn assess_resumability(query: &str) -> bool {
727 let upper = query.to_uppercase();
728 let trimmed = upper.trim_start();
729 if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
730 return false;
731 }
732 const FORBIDDEN: &[&str] = &[
733 " GROUP BY ",
734 " HAVING ",
735 " DISTINCT ",
736 "DISTINCT ",
737 "COUNT(",
738 "SUM(",
739 "AVG(",
740 "MIN(",
741 "MAX(",
742 "ARRAY_AGG(",
743 "JSON_AGG(",
744 "OVER(",
745 " OVER (",
746 " JOIN ",
747 ];
748 for kw in FORBIDDEN {
749 if upper.contains(kw) {
750 return false;
751 }
752 }
753 if let Some(idx) = upper.find("ORDER BY") {
754 let tail = &upper[idx + "ORDER BY".len()..];
755 let mut clause = tail.to_string();
757 if let Some(lim) = clause.find(" LIMIT ") {
758 clause.truncate(lim);
759 }
760 if let Some(semi) = clause.find(';') {
761 clause.truncate(semi);
762 }
763 let clause = clause.trim();
764 if !matches!(clause, "RID" | "RID ASC") {
765 return false;
766 }
767 }
768 true
769}
770
771#[derive(Debug, Default)]
777pub struct LeaseRegistry {
778 inner: Mutex<HashMap<u64, LeaseEntry>>,
779}
780
781#[derive(Debug, Clone, Copy)]
782struct LeaseEntry {
783 opened_at_ms: u64,
784 ttl_ms: u64,
785}
786
787#[derive(Debug, Clone, Copy, PartialEq, Eq)]
788pub enum LeaseLookup {
789 Unknown,
791 Expired,
793 Live,
795}
796
797impl LeaseRegistry {
798 pub fn new() -> Arc<Self> {
799 Arc::new(Self::default())
800 }
801
802 pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
807 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
808 inner.insert(
809 snapshot_lsn,
810 LeaseEntry {
811 opened_at_ms,
812 ttl_ms,
813 },
814 );
815 }
816
817 pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
820 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
821 match inner.get(&snapshot_lsn) {
822 None => LeaseLookup::Unknown,
823 Some(entry) => {
824 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
825 LeaseLookup::Expired
826 } else {
827 LeaseLookup::Live
828 }
829 }
830 }
831 }
832
833 #[doc(hidden)]
835 pub fn len(&self) -> usize {
836 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
837 }
838}
839
840pub const CURSOR_TOKEN_BYTES: usize = 24;
847
848static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
849
850pub fn generate_cursor_token() -> String {
855 let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
856 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
857 let salt = NEXT_CURSOR_SALT
858 .fetch_add(1, Ordering::SeqCst)
859 .to_le_bytes();
860 let now = crate::utils::now_unix_nanos().to_le_bytes();
861 bytes[..8].copy_from_slice(&salt);
862 bytes[8..16].copy_from_slice(&now);
863 }
864 crate::utils::to_hex(&bytes)
865}
866
867#[derive(Debug, Clone)]
871struct CursorEntry {
872 snapshot_lsn: u64,
873 tenant: String,
874 principal: String,
875 query: String,
876 entity_types: Option<Vec<String>>,
877 capabilities: Option<Vec<String>>,
878 opened_at_ms: u64,
879 ttl_ms: u64,
880 cancel_token: CancelToken,
885 cancelled: bool,
889}
890
891#[derive(Debug, Clone, PartialEq, Eq)]
895pub struct CursorResume {
896 pub snapshot_lsn: u64,
897 pub query: String,
898 pub entity_types: Option<Vec<String>>,
899 pub capabilities: Option<Vec<String>>,
900 pub expires_at_ms: u64,
901}
902
903#[derive(Debug, Clone, Copy, PartialEq, Eq)]
908pub enum CursorReject {
909 NotFound,
912 Expired,
914 Cancelled,
919}
920
921#[derive(Debug, Default)]
932pub struct CursorRegistry {
933 inner: Mutex<HashMap<String, CursorEntry>>,
934}
935
936impl CursorRegistry {
937 pub fn new() -> Arc<Self> {
938 Arc::new(Self::default())
939 }
940
941 #[allow(clippy::too_many_arguments)]
945 pub fn register(
946 &self,
947 snapshot_lsn: u64,
948 tenant: &str,
949 principal: &str,
950 query: &str,
951 entity_types: Option<Vec<String>>,
952 capabilities: Option<Vec<String>>,
953 opened_at_ms: u64,
954 ttl_ms: u64,
955 ) -> String {
956 let token = generate_cursor_token();
957 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
958 inner.insert(
959 token.clone(),
960 CursorEntry {
961 snapshot_lsn,
962 tenant: tenant.to_string(),
963 principal: principal.to_string(),
964 query: query.to_string(),
965 entity_types,
966 capabilities,
967 opened_at_ms,
968 ttl_ms,
969 cancel_token: CancelToken::new(),
970 cancelled: false,
971 },
972 );
973 token
974 }
975
976 pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
981 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
982 inner.get(token).map(|e| e.cancel_token.clone())
983 }
984
985 pub fn cancel(
996 &self,
997 token: &str,
998 tenant: &str,
999 principal: &str,
1000 ) -> Result<CancelToken, CursorReject> {
1001 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1002 let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1003 if entry.tenant != tenant || entry.principal != principal {
1004 return Err(CursorReject::NotFound);
1005 }
1006 entry.cancelled = true;
1007 entry.cancel_token.cancel();
1008 Ok(entry.cancel_token.clone())
1009 }
1010
1011 pub fn resolve(
1019 &self,
1020 token: &str,
1021 tenant: &str,
1022 principal: &str,
1023 now_ms: u64,
1024 ) -> Result<CursorResume, CursorReject> {
1025 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1026 let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1027 if entry.tenant != tenant || entry.principal != principal {
1031 return Err(CursorReject::NotFound);
1032 }
1033 if entry.cancelled {
1037 return Err(CursorReject::Cancelled);
1038 }
1039 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1040 return Err(CursorReject::Expired);
1041 }
1042 Ok(CursorResume {
1043 snapshot_lsn: entry.snapshot_lsn,
1044 query: entry.query.clone(),
1045 entity_types: entry.entity_types.clone(),
1046 capabilities: entry.capabilities.clone(),
1047 expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1048 })
1049 }
1050
1051 #[doc(hidden)]
1053 pub fn len(&self) -> usize {
1054 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1055 }
1056
1057 #[doc(hidden)]
1058 pub fn is_empty(&self) -> bool {
1059 self.len() == 0
1060 }
1061}
1062
1063#[derive(Debug, Default)]
1069pub struct PrefixHasher {
1070 inner: Option<sha2::Sha256>,
1071 rows: u64,
1072}
1073
1074impl PrefixHasher {
1075 pub fn new() -> Self {
1076 use sha2::Digest;
1077 Self {
1078 inner: Some(sha2::Sha256::new()),
1079 rows: 0,
1080 }
1081 }
1082
1083 pub fn update(&mut self, line: &[u8]) {
1084 use sha2::Digest;
1085 if let Some(h) = self.inner.as_mut() {
1086 h.update(line);
1087 }
1088 self.rows += 1;
1089 }
1090
1091 pub fn rows(&self) -> u64 {
1092 self.rows
1093 }
1094
1095 pub fn finalize_hex(mut self) -> String {
1098 use sha2::Digest;
1099 let hasher = self
1100 .inner
1101 .take()
1102 .expect("PrefixHasher::finalize_hex called twice");
1103 let digest = hasher.finalize();
1104 let mut out = String::with_capacity(64);
1105 for b in digest.iter() {
1106 out.push_str(&format!("{b:02x}"));
1107 }
1108 out
1109 }
1110}
1111
1112pub fn audit_stream_opened(
1118 runtime: &RedDBRuntime,
1119 lease_handle: &str,
1120 principal: &str,
1121 snapshot_lsn: u64,
1122 query_hash: &str,
1123) {
1124 use crate::json::{Map, Value as JsonValue};
1125 let mut detail = Map::new();
1126 detail.insert(
1127 "lease_handle".to_string(),
1128 JsonValue::String(lease_handle.to_string()),
1129 );
1130 detail.insert(
1131 "snapshot_lsn".to_string(),
1132 JsonValue::Number(snapshot_lsn as f64),
1133 );
1134 detail.insert(
1135 "query_hash".to_string(),
1136 JsonValue::String(query_hash.to_string()),
1137 );
1138 let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1139 .principal(principal)
1140 .resource(lease_handle.to_string())
1141 .outcome(crate::runtime::audit_log::Outcome::Success)
1142 .detail(JsonValue::Object(detail))
1143 .build();
1144 runtime.audit_log().record_event(event);
1145}
1146
1147pub fn audit_stream_closed(
1148 runtime: &RedDBRuntime,
1149 lease_handle: &str,
1150 principal: &str,
1151 reason: CloseReason,
1152 row_count: u64,
1153 bytes_written: u64,
1154) {
1155 use crate::json::{Map, Value as JsonValue};
1156 let mut stats = Map::new();
1157 stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1158 stats.insert(
1159 "bytes_written".to_string(),
1160 JsonValue::Number(bytes_written as f64),
1161 );
1162 let mut detail = Map::new();
1163 detail.insert(
1164 "lease_handle".to_string(),
1165 JsonValue::String(lease_handle.to_string()),
1166 );
1167 detail.insert(
1168 "reason".to_string(),
1169 JsonValue::String(reason.as_str().to_string()),
1170 );
1171 detail.insert("stats".to_string(), JsonValue::Object(stats));
1172 let outcome = match reason {
1173 CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1174 CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1175 _ => crate::runtime::audit_log::Outcome::Error,
1176 };
1177 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1178 .principal(principal)
1179 .resource(lease_handle.to_string())
1180 .outcome(outcome)
1181 .detail(JsonValue::Object(detail))
1182 .build();
1183 runtime.audit_log().record_event(event);
1184}
1185
1186pub fn audit_token_expired_during_lease(
1187 runtime: &RedDBRuntime,
1188 lease_handle: &str,
1189 principal: &str,
1190 token_expiry_ms: u64,
1191) {
1192 use crate::json::{Map, Value as JsonValue};
1193 let mut detail = Map::new();
1194 detail.insert(
1195 "lease_handle".to_string(),
1196 JsonValue::String(lease_handle.to_string()),
1197 );
1198 detail.insert(
1199 "token_expiry".to_string(),
1200 JsonValue::Number(token_expiry_ms as f64),
1201 );
1202 detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1203 let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1204 .principal(principal)
1205 .resource(lease_handle.to_string())
1206 .outcome(crate::runtime::audit_log::Outcome::Success)
1207 .detail(JsonValue::Object(detail))
1208 .build();
1209 runtime.audit_log().record_event(event);
1210}
1211
1212pub fn audit_stream_capacity_refused(
1218 runtime: &RedDBRuntime,
1219 principal: &str,
1220 code: &str,
1221 limit: usize,
1222 current: usize,
1223) {
1224 use crate::json::{Map, Value as JsonValue};
1225 let mut detail = Map::new();
1226 detail.insert(
1227 "reason".to_string(),
1228 JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1229 );
1230 detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1231 detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1232 detail.insert("current".to_string(), JsonValue::Number(current as f64));
1233 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1234 .principal(principal)
1235 .outcome(crate::runtime::audit_log::Outcome::Denied)
1236 .detail(JsonValue::Object(detail))
1237 .build();
1238 runtime.audit_log().record_event(event);
1239}
1240
1241pub fn system_clock() -> Arc<dyn Clock> {
1244 static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1245 Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1246}
1247
1248#[cfg(test)]
1249mod tests {
1250 use super::*;
1251
1252 #[test]
1253 fn open_stream_refuses_when_session_has_active_transaction() {
1254 let clock = FakeClock::new(0);
1255 let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1256 assert_eq!(err, OpenStreamError::TransactionActive);
1257 assert_eq!(err.code(), "stream_in_transaction_unsupported");
1258 }
1259
1260 #[test]
1261 fn open_stream_succeeds_when_session_is_autocommit() {
1262 let clock = FakeClock::new(1_700_000_000_000);
1263 let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1264 assert_eq!(lease.snapshot_lsn, 123);
1265 assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1266 assert!(lease.id >= 1);
1267 }
1268
1269 #[test]
1270 fn lease_ids_are_unique_and_monotonic() {
1271 let clock = FakeClock::new(0);
1272 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1273 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1274 assert!(b.id > a.id);
1275 }
1276
1277 #[test]
1278 fn snapshot_expired_uses_injected_clock_and_ttl() {
1279 let clock = FakeClock::new(0);
1282 let mut config = StreamConfig::DEFAULT;
1283 config.snapshot_ttl_ms = 5_000;
1284 let lease = open_stream(config, 0, false, &clock).unwrap();
1285
1286 assert!(!lease.snapshot_expired(clock.now_ms()));
1287 clock.advance(4_999);
1288 assert!(!lease.snapshot_expired(clock.now_ms()));
1289 clock.advance(1);
1290 assert!(lease.snapshot_expired(clock.now_ms()));
1291 }
1292
1293 #[test]
1294 fn stream_config_loads_defaults_when_kv_is_empty() {
1295 let cfg = StreamConfig::DEFAULT;
1297 assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1298 assert_eq!(cfg.chunk_default_pages, 4);
1299 assert_eq!(cfg.chunk_min_pages, 1);
1300 assert_eq!(cfg.chunk_max_pages, 64);
1301 assert_eq!(cfg.chunk_max_rows, 1000);
1302 assert_eq!(cfg.chunk_max_latency_ms, 50);
1303 assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1304 }
1305
1306 #[test]
1307 fn stream_config_normalize_clamps_inconsistent_inputs() {
1308 let mut cfg = StreamConfig {
1309 snapshot_ttl_ms: 1,
1310 chunk_default_pages: 100,
1311 chunk_min_pages: 0,
1312 chunk_max_pages: 8,
1313 chunk_max_rows: 0,
1314 chunk_max_latency_ms: 1,
1315 max_global_streams: 0,
1316 max_per_principal_streams: 0,
1317 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1318 };
1319 cfg.normalize();
1320 assert_eq!(cfg.chunk_min_pages, 1);
1321 assert_eq!(cfg.chunk_max_pages, 8);
1322 assert_eq!(cfg.chunk_default_pages, 8); assert!(cfg.chunk_max_rows >= 1);
1324 assert!(cfg.max_global_streams >= 1);
1325 assert!(cfg.max_per_principal_streams >= 1);
1326 }
1327
1328 struct CapturingSink {
1333 chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1334 }
1335 impl CapturingSink {
1336 fn new() -> Self {
1337 Self {
1338 chunks: std::cell::RefCell::new(Vec::new()),
1339 }
1340 }
1341 fn len(&self) -> usize {
1342 self.chunks.borrow().len()
1343 }
1344 fn last_len(&self) -> Option<usize> {
1345 self.chunks.borrow().last().map(|c| c.len())
1346 }
1347 }
1348
1349 fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1350 move |bytes: &[u8]| {
1351 sink.chunks.borrow_mut().push(bytes.to_vec());
1352 Ok(())
1353 }
1354 }
1355
1356 #[test]
1357 fn chunk_producer_flushes_on_byte_cap() {
1358 let clock = FakeClock::new(0);
1359 let cfg = StreamConfig {
1360 chunk_default_pages: 1, chunk_min_pages: 1,
1362 chunk_max_pages: 1,
1363 chunk_max_rows: 1_000_000,
1364 chunk_max_latency_ms: 1_000_000,
1365 ..StreamConfig::DEFAULT
1366 };
1367 let sink = CapturingSink::new();
1368 let mut producer = ChunkProducer::new(&cfg, &clock);
1369 let mut flush = capture(&sink);
1370
1371 producer
1372 .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1373 .unwrap();
1374 assert_eq!(sink.len(), 0);
1375
1376 let triggered = producer
1377 .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1378 .unwrap();
1379 assert!(triggered);
1380 assert_eq!(sink.len(), 1);
1381 assert_eq!(sink.last_len(), Some(16 * 1024));
1382 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1383 }
1384
1385 #[test]
1386 fn chunk_producer_flushes_on_row_cap() {
1387 let clock = FakeClock::new(0);
1388 let cfg = StreamConfig {
1389 chunk_default_pages: 4, chunk_min_pages: 1,
1391 chunk_max_pages: 64,
1392 chunk_max_rows: 3,
1393 chunk_max_latency_ms: 1_000_000,
1394 ..StreamConfig::DEFAULT
1395 };
1396 let sink = CapturingSink::new();
1397 let mut producer = ChunkProducer::new(&cfg, &clock);
1398 let mut flush = capture(&sink);
1399 let row = b"{\"row\":{}}\n";
1400 producer.push_line(row, &mut flush).unwrap();
1401 producer.push_line(row, &mut flush).unwrap();
1402 assert_eq!(sink.len(), 0);
1403 let triggered = producer.push_line(row, &mut flush).unwrap();
1404 assert!(triggered);
1405 assert_eq!(sink.len(), 1);
1406 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1407 }
1408
1409 #[test]
1410 fn chunk_producer_flushes_on_latency_cap() {
1411 let clock = FakeClock::new(0);
1412 let cfg = StreamConfig {
1413 chunk_default_pages: 4,
1414 chunk_min_pages: 1,
1415 chunk_max_pages: 64,
1416 chunk_max_rows: 1_000_000,
1417 chunk_max_latency_ms: 50,
1418 ..StreamConfig::DEFAULT
1419 };
1420 let sink = CapturingSink::new();
1421 let mut producer = ChunkProducer::new(&cfg, &clock);
1422 let mut flush = capture(&sink);
1423 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1424 assert_eq!(sink.len(), 0);
1425 clock.advance(60);
1426 let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1427 assert!(triggered);
1428 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1429 }
1430
1431 #[test]
1432 fn chunk_producer_finish_emits_terminal_flush() {
1433 let clock = FakeClock::new(0);
1434 let cfg = StreamConfig::DEFAULT;
1435 let sink = CapturingSink::new();
1436 let mut producer = ChunkProducer::new(&cfg, &clock);
1437 let mut flush = capture(&sink);
1438 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1439 producer.finish(&mut flush).unwrap();
1440 assert_eq!(sink.len(), 1);
1441 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1442 }
1443
1444 #[test]
1445 fn write_chunked_helpers_produce_well_formed_chunks() {
1446 let mut buf: Vec<u8> = Vec::new();
1447 write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1448 write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1449 write_chunked_terminator(&mut buf).unwrap();
1450 let text = String::from_utf8(buf).unwrap();
1451 assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1452 assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1453 assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1455 assert!(text.ends_with("0\r\n\r\n"));
1456 }
1457
1458 #[test]
1461 fn capacity_registry_global_exhausted_returns_structured_error() {
1462 let reg = StreamCapacityRegistry::new();
1463 let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1464 let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1465 let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1466 assert_eq!(
1467 err,
1468 AcquireError::GlobalExhausted {
1469 limit: 2,
1470 current: 2,
1471 }
1472 );
1473 assert_eq!(err.code(), "server_stream_capacity_exhausted");
1474 }
1475
1476 #[test]
1477 fn capacity_registry_per_principal_exhausted_independent_of_global() {
1478 let reg = StreamCapacityRegistry::new();
1482 let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1483 let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1484 let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1485 assert_eq!(
1486 err,
1487 AcquireError::PrincipalExhausted {
1488 principal: "alice".to_string(),
1489 limit: 2,
1490 current: 2,
1491 }
1492 );
1493 assert_eq!(err.code(), "principal_stream_quota_exhausted");
1494
1495 let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1497 let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1498 }
1499
1500 #[test]
1501 fn capacity_registry_release_frees_both_counters() {
1502 let reg = StreamCapacityRegistry::new();
1504 let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1505 assert!(reg.try_acquire("alice", 1, 1).is_err());
1506 drop(g1);
1507 let (global, per_principal) = reg.snapshot();
1508 assert_eq!(global, 0);
1509 assert!(per_principal.is_empty());
1510 let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1512 }
1513
1514 #[test]
1515 fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1516 use std::sync::atomic::{AtomicUsize, Ordering};
1522
1523 const THREADS: usize = 16;
1524 const ITERS: usize = 200;
1525 const CAP_GLOBAL: usize = 4;
1526 const CAP_PER_PRINCIPAL: usize = 4;
1527
1528 let reg = StreamCapacityRegistry::new();
1529 let observed_max = Arc::new(AtomicUsize::new(0));
1530 let mut handles = Vec::new();
1531 for tid in 0..THREADS {
1532 let reg = Arc::clone(®);
1533 let observed_max = Arc::clone(&observed_max);
1534 let principal = format!("p{}", tid % 2);
1537 handles.push(std::thread::spawn(move || {
1538 for _ in 0..ITERS {
1539 if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1540 let (live, _) = reg.snapshot();
1541 observed_max.fetch_max(live, Ordering::SeqCst);
1542 std::thread::yield_now();
1545 drop(guard);
1546 }
1547 }
1548 }));
1549 }
1550 for h in handles {
1551 h.join().unwrap();
1552 }
1553 let (global_after, per_principal_after) = reg.snapshot();
1554 assert_eq!(global_after, 0, "global counter leaked");
1555 assert!(
1556 per_principal_after.is_empty(),
1557 "per-principal map leaked: {per_principal_after:?}"
1558 );
1559 assert!(
1560 observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1561 "global cap was breached: observed {} > {}",
1562 observed_max.load(Ordering::SeqCst),
1563 CAP_GLOBAL
1564 );
1565 }
1566
1567 #[test]
1570 fn assess_resumability_accepts_plain_select() {
1571 assert!(assess_resumability("SELECT id, name FROM t"));
1572 assert!(assess_resumability("select * from t where id > 5"));
1573 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1574 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1575 assert!(assess_resumability(
1576 "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1577 ));
1578 }
1579
1580 #[test]
1581 fn assess_resumability_rejects_aggregates_and_unordered() {
1582 assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1583 assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1584 assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1585 assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1586 assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1587 assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1588 assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1589 assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1590 assert!(!assess_resumability(
1591 "SELECT a FROM t JOIN u ON t.id = u.id"
1592 ));
1593 }
1594
1595 #[test]
1596 fn lease_registry_records_and_expires_against_ttl() {
1597 let reg = LeaseRegistry::new();
1598 reg.record(42, 1_000, 5_000);
1599 assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1600 assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1601 assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1602 assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1603 }
1604
1605 #[test]
1606 fn prefix_hasher_is_order_sensitive_and_deterministic() {
1607 let mut a = PrefixHasher::new();
1608 a.update(b"{\"row\":{\"id\":1}}");
1609 a.update(b"{\"row\":{\"id\":2}}");
1610 let hash_a = a.finalize_hex();
1611
1612 let mut b = PrefixHasher::new();
1613 b.update(b"{\"row\":{\"id\":1}}");
1614 b.update(b"{\"row\":{\"id\":2}}");
1615 let hash_b = b.finalize_hex();
1616 assert_eq!(hash_a, hash_b);
1617
1618 let mut c = PrefixHasher::new();
1619 c.update(b"{\"row\":{\"id\":2}}");
1620 c.update(b"{\"row\":{\"id\":1}}");
1621 assert_ne!(hash_a, c.finalize_hex());
1622 assert_eq!(hash_a.len(), 64);
1623 }
1624
1625 #[test]
1628 fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1629 let clock = FakeClock::new(0);
1633 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1634 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1635 assert_eq!(
1636 a.lease_handle.len(),
1637 LEASE_HANDLE_BYTES * 2,
1638 "handle must be 128 bits hex-encoded: {}",
1639 a.lease_handle
1640 );
1641 assert!(
1642 a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1643 "handle must be hex: {}",
1644 a.lease_handle
1645 );
1646 assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1647 assert!(b.id > a.id);
1650 }
1651
1652 #[test]
1653 fn generate_lease_handle_produces_high_entropy_distinct_values() {
1654 let mut seen = std::collections::HashSet::new();
1658 for _ in 0..1024 {
1659 assert!(
1660 seen.insert(generate_lease_handle()),
1661 "duplicate handle in CSPRNG sequence"
1662 );
1663 }
1664 }
1665
1666 #[test]
1667 fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1668 let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1673 assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1674 }
1675
1676 #[test]
1677 fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1678 assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1679 assert_eq!(parse_jwt_exp_ms("only.two"), None);
1680 assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1681 }
1682
1683 fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1686 reg.register(
1687 42,
1688 "acme",
1689 "bearer:abc",
1690 "SELECT id FROM users ORDER BY rid",
1691 None,
1692 None,
1693 now_ms,
1694 1_000,
1695 )
1696 }
1697
1698 #[test]
1699 fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1700 let reg = CursorRegistry::default();
1701 let a = register_default_cursor(®, 0);
1702 let b = register_default_cursor(®, 0);
1703 assert_eq!(
1704 a.len(),
1705 CURSOR_TOKEN_BYTES * 2,
1706 "token must be 192 bits hex-encoded: {a}"
1707 );
1708 assert!(
1709 a.chars().all(|c| c.is_ascii_hexdigit()),
1710 "token must be hex: {a}"
1711 );
1712 assert_ne!(a, b, "tokens must differ across registrations");
1713 assert_eq!(reg.len(), 2);
1714 }
1715
1716 #[test]
1717 fn cursor_resolves_for_owner_within_ttl() {
1718 let reg = CursorRegistry::default();
1720 let token = register_default_cursor(®, 1_000);
1721 let resume = reg
1722 .resolve(&token, "acme", "bearer:abc", 1_500)
1723 .expect("live cursor resolves for its owner");
1724 assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1725 assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1726 assert_eq!(resume.expires_at_ms, 2_000);
1727 }
1728
1729 #[test]
1730 fn cursor_rejects_after_ttl_for_owner() {
1731 let reg = CursorRegistry::default();
1734 let token = register_default_cursor(®, 0);
1735 assert_eq!(
1736 reg.resolve(&token, "acme", "bearer:abc", 1_000),
1737 Err(CursorReject::Expired),
1738 "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1739 );
1740 assert_eq!(
1741 reg.resolve(&token, "acme", "bearer:abc", 5_000),
1742 Err(CursorReject::Expired)
1743 );
1744 }
1745
1746 #[test]
1747 fn cursor_cross_tenant_is_masked_as_not_found() {
1748 let reg = CursorRegistry::default();
1752 let token = register_default_cursor(®, 0);
1753 assert_eq!(
1754 reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1755 Err(CursorReject::NotFound),
1756 "cross-tenant resume must mask existence as NotFound"
1757 );
1758 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1760 }
1761
1762 #[test]
1763 fn cursor_cross_principal_is_masked_as_not_found() {
1764 let reg = CursorRegistry::default();
1767 let token = register_default_cursor(®, 0);
1768 assert_eq!(
1769 reg.resolve(&token, "acme", "bearer:other", 100),
1770 Err(CursorReject::NotFound),
1771 "cross-principal resume must mask existence as NotFound"
1772 );
1773 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1774 }
1775
1776 #[test]
1777 fn cursor_unknown_token_is_not_found() {
1778 let reg = CursorRegistry::default();
1779 assert!(reg.is_empty());
1780 assert_eq!(
1781 reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1782 Err(CursorReject::NotFound)
1783 );
1784 }
1785
1786 #[test]
1787 fn cursor_scope_is_checked_before_expiry() {
1788 let reg = CursorRegistry::default();
1792 let token = register_default_cursor(®, 0);
1793 assert_eq!(
1794 reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1795 Err(CursorReject::NotFound),
1796 "expired + wrong scope must mask as NotFound, not Expired"
1797 );
1798 }
1799
1800 #[test]
1803 fn cancel_tombstones_cursor_and_raises_token() {
1804 let reg = CursorRegistry::default();
1807 let token = register_default_cursor(®, 0);
1808 let live = reg
1809 .cancel_token_for(&token)
1810 .expect("freshly-minted cursor exposes its token");
1811 assert!(!live.is_cancelled(), "token starts un-cancelled");
1812
1813 let returned = reg
1814 .cancel(&token, "acme", "bearer:abc")
1815 .expect("owner cancels its own cursor");
1816 assert!(returned.is_cancelled(), "cancel raises the returned token");
1817 assert!(
1818 live.is_cancelled(),
1819 "the handler-held clone observes the cancel"
1820 );
1821 }
1822
1823 #[test]
1824 fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1825 let reg = CursorRegistry::default();
1829 let token = register_default_cursor(®, 0);
1830 reg.cancel(&token, "acme", "bearer:abc")
1831 .expect("owner cancels");
1832 assert_eq!(
1833 reg.resolve(&token, "acme", "bearer:abc", 100),
1834 Err(CursorReject::Cancelled),
1835 "owner resuming a cancelled cursor sees Cancelled"
1836 );
1837 }
1838
1839 #[test]
1840 fn cancel_is_idempotent() {
1841 let reg = CursorRegistry::default();
1842 let token = register_default_cursor(®, 0);
1843 assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1844 let second = reg
1846 .cancel(&token, "acme", "bearer:abc")
1847 .expect("re-cancel is a no-op success");
1848 assert!(second.is_cancelled());
1849 }
1850
1851 #[test]
1852 fn cancel_cross_tenant_is_masked_as_not_found() {
1853 let reg = CursorRegistry::default();
1856 let token = register_default_cursor(®, 0);
1857 assert!(
1858 matches!(
1859 reg.cancel(&token, "evil-corp", "bearer:abc"),
1860 Err(CursorReject::NotFound)
1861 ),
1862 "cross-tenant cancel must mask existence as NotFound"
1863 );
1864 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1866 }
1867
1868 #[test]
1869 fn cancel_cross_principal_is_masked_as_not_found() {
1870 let reg = CursorRegistry::default();
1871 let token = register_default_cursor(®, 0);
1872 assert!(
1873 matches!(
1874 reg.cancel(&token, "acme", "bearer:other"),
1875 Err(CursorReject::NotFound)
1876 ),
1877 "cross-principal cancel must mask existence as NotFound"
1878 );
1879 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1880 }
1881
1882 #[test]
1883 fn cancel_unknown_token_is_not_found() {
1884 let reg = CursorRegistry::default();
1885 assert!(matches!(
1886 reg.cancel("deadbeef", "acme", "bearer:abc"),
1887 Err(CursorReject::NotFound)
1888 ));
1889 }
1890
1891 #[test]
1892 fn generate_cursor_token_produces_high_entropy_distinct_values() {
1893 let mut seen = std::collections::HashSet::new();
1894 for _ in 0..1024 {
1895 assert!(
1896 seen.insert(generate_cursor_token()),
1897 "duplicate cursor token in CSPRNG sequence"
1898 );
1899 }
1900 }
1901
1902 #[test]
1903 fn close_reason_as_str_covers_every_state_transition() {
1904 for (variant, expected) in [
1907 (CloseReason::Ok, "ok"),
1908 (CloseReason::Cancelled, "cancelled"),
1909 (CloseReason::Error, "error"),
1910 (CloseReason::SnapshotExpired, "snapshot_expired"),
1911 (CloseReason::CapacityRefused, "capacity_refused"),
1912 (CloseReason::IntegrityFailed, "integrity_failed"),
1913 ] {
1914 assert_eq!(variant.as_str(), expected);
1915 }
1916 }
1917
1918 #[test]
1919 fn stream_config_defaults_carry_s2_caps() {
1920 assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1921 assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1922 }
1923
1924 struct SizeSink {
1929 sizes: std::cell::RefCell<Vec<usize>>,
1930 }
1931 impl SizeSink {
1932 fn new() -> Self {
1933 Self {
1934 sizes: std::cell::RefCell::new(Vec::new()),
1935 }
1936 }
1937 fn flushes(&self) -> usize {
1938 self.sizes.borrow().len()
1939 }
1940 fn max_chunk(&self) -> usize {
1941 self.sizes.borrow().iter().copied().max().unwrap_or(0)
1942 }
1943 }
1944 fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1945 move |bytes: &[u8]| {
1946 sink.sizes.borrow_mut().push(bytes.len());
1947 Ok(())
1948 }
1949 }
1950
1951 #[test]
1952 fn drive_lines_streams_large_source_with_bounded_working_set() {
1953 let clock = FakeClock::new(0);
1960 let cfg = StreamConfig {
1961 chunk_default_pages: 1, chunk_min_pages: 1,
1963 chunk_max_pages: 1,
1964 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 1_000_000,
1966 ..StreamConfig::DEFAULT
1967 };
1968 let sink = SizeSink::new();
1969 let mut producer = ChunkProducer::new(&cfg, &clock);
1970 let mut flush = size_capture(&sink);
1971
1972 const N: u64 = 1_000_000;
1973 let source = 0..N;
1975 let consumed = producer
1976 .drive_lines(
1977 source,
1978 |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1979 &mut flush,
1980 )
1981 .unwrap();
1982 producer.finish(&mut flush).unwrap();
1983
1984 assert_eq!(consumed, N);
1985 assert_eq!(producer.total_rows(), N);
1986 assert!(
1988 sink.flushes() > 1000,
1989 "expected the source to stream across many chunks, saw {}",
1990 sink.flushes()
1991 );
1992 let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
1995 assert!(
1996 sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
1997 "chunk {} exceeded bounded working set {}",
1998 sink.max_chunk(),
1999 cfg.production_buffer_bytes() + max_line
2000 );
2001 }
2002
2003 #[test]
2004 fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2005 let clock = FakeClock::new(0);
2011 let cfg = StreamConfig {
2012 chunk_default_pages: 64, chunk_min_pages: 1,
2014 chunk_max_pages: 64,
2015 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 50,
2017 ..StreamConfig::DEFAULT
2018 };
2019 let sink = SizeSink::new();
2020 let mut producer = ChunkProducer::new(&cfg, &clock);
2021
2022 let mut first_flush_after: Option<u64> = None;
2026 let mut row = 0u64;
2027 while row < 1_000_000 {
2028 let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2029 clock.advance(20);
2030 let mut flush = size_capture(&sink);
2031 let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2032 row += 1;
2033 if flushed {
2034 first_flush_after = Some(row);
2035 break;
2036 }
2037 }
2038
2039 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2040 let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2041 assert!(
2042 rows_before_flush <= 4,
2043 "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2044 );
2045 assert!(rows_before_flush < 1_000_000);
2047 }
2048
2049 #[test]
2050 fn drive_lines_parity_with_manual_push_line() {
2051 let clock = FakeClock::new(0);
2054 let cfg = StreamConfig::DEFAULT;
2055
2056 let lines: Vec<Vec<u8>> = (0..50)
2057 .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2058 .collect();
2059
2060 let driven = CapturingSink::new();
2061 {
2062 let mut p = ChunkProducer::new(&cfg, &clock);
2063 let mut flush = capture(&driven);
2064 p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2065 .unwrap();
2066 p.finish(&mut flush).unwrap();
2067 }
2068
2069 let manual = CapturingSink::new();
2070 {
2071 let mut p = ChunkProducer::new(&cfg, &clock);
2072 let mut flush = capture(&manual);
2073 for l in &lines {
2074 p.push_line(l, &mut flush).unwrap();
2075 }
2076 p.finish(&mut flush).unwrap();
2077 }
2078
2079 assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2080 }
2081}