1use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33pub const PAGE_SIZE: usize = 16 * 1024;
36
37pub trait Clock: Send + Sync {
40 fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47 fn now_ms(&self) -> u64 {
48 SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .map(|d| d.as_millis() as u64)
51 .unwrap_or(0)
52 }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57 now_ms: AtomicU64,
58}
59
60impl FakeClock {
61 pub fn new(now_ms: u64) -> Self {
62 Self {
63 now_ms: AtomicU64::new(now_ms),
64 }
65 }
66
67 pub fn advance(&self, ms: u64) {
68 self.now_ms.fetch_add(ms, Ordering::SeqCst);
69 }
70}
71
72impl Clock for FakeClock {
73 fn now_ms(&self) -> u64 {
74 self.now_ms.load(Ordering::SeqCst)
75 }
76}
77
78#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84 pub snapshot_ttl_ms: u64,
85 pub chunk_default_pages: usize,
86 pub chunk_min_pages: usize,
87 pub chunk_max_pages: usize,
88 pub chunk_max_rows: usize,
89 pub chunk_max_latency_ms: u64,
90 pub max_global_streams: usize,
94 pub max_per_principal_streams: usize,
99 pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106 fn default() -> Self {
107 Self::DEFAULT
108 }
109}
110
111impl StreamConfig {
112 pub const DEFAULT: StreamConfig = StreamConfig {
115 snapshot_ttl_ms: 60_000,
116 chunk_default_pages: 4,
117 chunk_min_pages: 1,
118 chunk_max_pages: 64,
119 chunk_max_rows: 1000,
120 chunk_max_latency_ms: 50,
121 max_global_streams: 256,
122 max_per_principal_streams: 32,
123 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124 };
125
126 pub fn load(runtime: &RedDBRuntime) -> Self {
131 let db = runtime.db();
132 let read_u64 = |key: &str| -> Option<u64> {
133 match db.get_kv(RED_CONFIG_COLLECTION, key) {
134 Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135 Some((Value::UnsignedInteger(v), _)) => Some(v),
136 Some((Value::Text(text), _)) => text.parse().ok(),
137 _ => None,
138 }
139 };
140
141 let mut cfg = Self::DEFAULT;
142 if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143 cfg.snapshot_ttl_ms = v;
144 }
145 if let Some(v) = read_u64("stream.chunk.default_pages") {
146 cfg.chunk_default_pages = v as usize;
147 }
148 if let Some(v) = read_u64("stream.chunk.min_pages") {
149 cfg.chunk_min_pages = v as usize;
150 }
151 if let Some(v) = read_u64("stream.chunk.max_pages") {
152 cfg.chunk_max_pages = v as usize;
153 }
154 if let Some(v) = read_u64("stream.chunk.max_rows") {
155 cfg.chunk_max_rows = v as usize;
156 }
157 if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158 cfg.chunk_max_latency_ms = v;
159 }
160 if let Some(v) = read_u64("stream.max_global") {
161 cfg.max_global_streams = v as usize;
162 }
163 if let Some(v) = read_u64("stream.max_per_principal") {
164 cfg.max_per_principal_streams = v as usize;
165 }
166 if let Some((Value::Text(text), _)) =
169 db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170 {
171 cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172 }
173 cfg.normalize();
174 cfg
175 }
176
177 fn normalize(&mut self) {
181 if self.chunk_min_pages == 0 {
182 self.chunk_min_pages = 1;
183 }
184 if self.chunk_max_pages < self.chunk_min_pages {
185 self.chunk_max_pages = self.chunk_min_pages;
186 }
187 if self.chunk_default_pages < self.chunk_min_pages {
188 self.chunk_default_pages = self.chunk_min_pages;
189 }
190 if self.chunk_default_pages > self.chunk_max_pages {
191 self.chunk_default_pages = self.chunk_max_pages;
192 }
193 if self.chunk_max_rows == 0 {
194 self.chunk_max_rows = 1;
195 }
196 if self.max_global_streams == 0 {
197 self.max_global_streams = 1;
198 }
199 if self.max_per_principal_streams == 0 {
200 self.max_per_principal_streams = 1;
201 }
202 }
203
204 pub fn production_buffer_bytes(&self) -> usize {
207 self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208 }
209}
210
211static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220pub const LEASE_HANDLE_BYTES: usize = 16;
226
227pub fn generate_lease_handle() -> String {
231 let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233 let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239 let now = crate::utils::now_unix_nanos().to_le_bytes();
240 bytes[..8].copy_from_slice(&lo);
241 bytes[8..].copy_from_slice(&now);
242 }
243 crate::utils::to_hex(&bytes)
244}
245
246pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256 let parts: Vec<&str> = token.split('.').collect();
257 if parts.len() != 3 {
258 return None;
259 }
260 let payload = base64url_decode_padded(parts[1])?;
261 let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262 let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263 if exp_secs <= 0 {
264 return None;
265 }
266 Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270 let mut s = String::with_capacity(input.len() + 4);
271 for ch in input.chars() {
272 match ch {
273 '-' => s.push('+'),
274 '_' => s.push('/'),
275 _ => s.push(ch),
276 }
277 }
278 while !s.len().is_multiple_of(4) {
279 s.push('=');
280 }
281 crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289 Ok,
290 Cancelled,
291 Error,
292 SnapshotExpired,
293 CapacityRefused,
294 IntegrityFailed,
295}
296
297impl CloseReason {
298 pub fn as_str(&self) -> &'static str {
299 match self {
300 CloseReason::Ok => "ok",
301 CloseReason::Cancelled => "cancelled",
302 CloseReason::Error => "error",
303 CloseReason::SnapshotExpired => "snapshot_expired",
304 CloseReason::CapacityRefused => "capacity_refused",
305 CloseReason::IntegrityFailed => "integrity_failed",
306 }
307 }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312 pub id: u64,
313 pub lease_handle: String,
317 pub snapshot_lsn: u64,
318 pub opened_at_ms: u64,
319 pub config: StreamConfig,
320}
321
322impl StreamLease {
323 pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330 now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331 }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336 TransactionActive,
340}
341
342impl OpenStreamError {
343 pub fn code(&self) -> &'static str {
344 match self {
345 OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346 }
347 }
348
349 pub fn message(&self) -> &'static str {
350 match self {
351 OpenStreamError::TransactionActive => {
352 "cannot open output stream while a transaction is active on this session"
353 }
354 }
355 }
356}
357
358pub fn open_stream(
363 config: StreamConfig,
364 snapshot_lsn: u64,
365 in_transaction: bool,
366 clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368 if in_transaction {
369 return Err(OpenStreamError::TransactionActive);
370 }
371 Ok(StreamLease {
372 id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373 lease_handle: generate_lease_handle(),
374 snapshot_lsn,
375 opened_at_ms: clock.now_ms(),
376 config,
377 })
378}
379
380pub struct ChunkProducer<'a> {
388 buf: Vec<u8>,
389 rows: usize,
390 window_started_ms: u64,
391 cap_bytes: usize,
392 cap_rows: usize,
393 cap_latency_ms: u64,
394 clock: &'a dyn Clock,
395 total_flushes: u64,
396 total_bytes: u64,
397 total_rows: u64,
398 last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403 Byte,
404 Row,
405 Latency,
406 Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410 pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411 let cap_bytes = config.production_buffer_bytes();
412 Self {
413 buf: Vec::with_capacity(cap_bytes),
414 rows: 0,
415 window_started_ms: clock.now_ms(),
416 cap_bytes,
417 cap_rows: config.chunk_max_rows,
418 cap_latency_ms: config.chunk_max_latency_ms,
419 clock,
420 total_flushes: 0,
421 total_bytes: 0,
422 total_rows: 0,
423 last_flush_reason: None,
424 }
425 }
426
427 pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430 where
431 F: FnMut(&[u8]) -> std::io::Result<()>,
432 {
433 self.buf.extend_from_slice(line);
434 self.rows += 1;
435 self.total_rows += 1;
436
437 if self.buf.len() >= self.cap_bytes {
438 self.flush(flush, FlushReason::Byte)?;
439 return Ok(true);
440 }
441 if self.rows >= self.cap_rows {
442 self.flush(flush, FlushReason::Row)?;
443 return Ok(true);
444 }
445 let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446 if elapsed >= self.cap_latency_ms {
447 self.flush(flush, FlushReason::Latency)?;
448 return Ok(true);
449 }
450 Ok(false)
451 }
452
453 pub fn drive_lines<S, R, Enc, F>(
471 &mut self,
472 source: S,
473 mut encode: Enc,
474 flush: &mut F,
475 ) -> std::io::Result<u64>
476 where
477 S: IntoIterator<Item = R>,
478 Enc: FnMut(&R) -> Vec<u8>,
479 F: FnMut(&[u8]) -> std::io::Result<()>,
480 {
481 let mut count = 0u64;
482 for record in source {
483 let line = encode(&record);
484 self.push_line(&line, flush)?;
485 count += 1;
486 }
487 Ok(count)
488 }
489
490 pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494 where
495 F: FnMut(&[u8]) -> std::io::Result<()>,
496 {
497 if !self.buf.is_empty() {
498 self.flush(flush, FlushReason::Terminal)?;
499 }
500 Ok(())
501 }
502
503 fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504 where
505 F: FnMut(&[u8]) -> std::io::Result<()>,
506 {
507 flush(&self.buf)?;
508 self.total_bytes += self.buf.len() as u64;
509 self.total_flushes += 1;
510 self.last_flush_reason = Some(reason);
511 self.buf.clear();
512 self.rows = 0;
513 self.window_started_ms = self.clock.now_ms();
514 Ok(())
515 }
516
517 pub fn total_flushes(&self) -> u64 {
518 self.total_flushes
519 }
520 pub fn total_bytes(&self) -> u64 {
521 self.total_bytes
522 }
523 pub fn total_rows(&self) -> u64 {
524 self.total_rows
525 }
526 pub fn last_flush_reason(&self) -> Option<FlushReason> {
527 self.last_flush_reason
528 }
529}
530
531pub fn write_chunked_response_header<W: std::io::Write>(
535 writer: &mut W,
536 status: u16,
537 content_type: &str,
538) -> std::io::Result<()> {
539 let header = format!(
547 "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n\
548 Access-Control-Allow-Origin: *\r\n\
549 Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n\
550 Access-Control-Allow-Headers: *\r\n\
551 Access-Control-Max-Age: 86400\r\n\r\n",
552 status,
553 crate::server::transport::status_text(status),
554 content_type,
555 );
556 writer.write_all(header.as_bytes())?;
557 writer.flush()
558}
559
560pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
564 if bytes.is_empty() {
565 return Ok(());
566 }
567 let size = format!("{:x}\r\n", bytes.len());
568 writer.write_all(size.as_bytes())?;
569 writer.write_all(bytes)?;
570 writer.write_all(b"\r\n")?;
571 writer.flush()
572}
573
574pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
576 writer.write_all(b"0\r\n\r\n")?;
577 writer.flush()
578}
579
580#[derive(Debug, Default)]
588pub struct StreamCapacityRegistry {
589 inner: Mutex<CapacityInner>,
590}
591
592#[derive(Debug, Default)]
593struct CapacityInner {
594 global_count: usize,
595 per_principal: HashMap<String, usize>,
596}
597
598#[derive(Debug, Clone, PartialEq, Eq)]
603pub enum AcquireError {
604 GlobalExhausted { limit: usize, current: usize },
606 PrincipalExhausted {
610 principal: String,
611 limit: usize,
612 current: usize,
613 },
614}
615
616impl AcquireError {
617 pub fn code(&self) -> &'static str {
618 match self {
619 AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
620 AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
621 }
622 }
623
624 pub fn message(&self) -> String {
625 match self {
626 AcquireError::GlobalExhausted { limit, current } => {
627 format!("server stream capacity exhausted (limit {limit}, current {current})")
628 }
629 AcquireError::PrincipalExhausted {
630 principal,
631 limit,
632 current,
633 } => format!(
634 "principal {principal} stream quota exhausted (limit {limit}, current {current})"
635 ),
636 }
637 }
638}
639
640impl StreamCapacityRegistry {
641 pub fn new() -> Arc<Self> {
642 Arc::new(Self::default())
643 }
644
645 pub fn try_acquire(
649 self: &Arc<Self>,
650 principal: &str,
651 max_global: usize,
652 max_per_principal: usize,
653 ) -> Result<StreamCapacityGuard, AcquireError> {
654 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
655 if inner.global_count >= max_global {
656 return Err(AcquireError::GlobalExhausted {
657 limit: max_global,
658 current: inner.global_count,
659 });
660 }
661 let current = inner.per_principal.get(principal).copied().unwrap_or(0);
662 if current >= max_per_principal {
663 return Err(AcquireError::PrincipalExhausted {
664 principal: principal.to_string(),
665 limit: max_per_principal,
666 current,
667 });
668 }
669 inner.global_count += 1;
670 inner
671 .per_principal
672 .insert(principal.to_string(), current + 1);
673 Ok(StreamCapacityGuard {
674 registry: Arc::clone(self),
675 principal: principal.to_string(),
676 released: false,
677 })
678 }
679
680 fn release(&self, principal: &str) {
681 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
682 if inner.global_count > 0 {
683 inner.global_count -= 1;
684 }
685 if let Some(count) = inner.per_principal.get_mut(principal) {
686 if *count > 0 {
687 *count -= 1;
688 }
689 if *count == 0 {
690 inner.per_principal.remove(principal);
691 }
692 }
693 }
694
695 pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
697 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
698 (inner.global_count, inner.per_principal.clone())
699 }
700}
701
702#[must_use = "dropping the guard immediately releases the stream slot"]
707#[derive(Debug)]
708pub struct StreamCapacityGuard {
709 registry: Arc<StreamCapacityRegistry>,
710 principal: String,
711 released: bool,
712}
713
714impl StreamCapacityGuard {
715 pub fn principal(&self) -> &str {
716 &self.principal
717 }
718}
719
720impl Drop for StreamCapacityGuard {
721 fn drop(&mut self) {
722 if !self.released {
723 self.registry.release(&self.principal);
724 self.released = true;
725 }
726 }
727}
728
729pub fn assess_resumability(query: &str) -> bool {
738 let upper = query.to_uppercase();
739 let trimmed = upper.trim_start();
740 if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
741 return false;
742 }
743 const FORBIDDEN: &[&str] = &[
744 " GROUP BY ",
745 " HAVING ",
746 " DISTINCT ",
747 "DISTINCT ",
748 "COUNT(",
749 "SUM(",
750 "AVG(",
751 "MIN(",
752 "MAX(",
753 "ARRAY_AGG(",
754 "JSON_AGG(",
755 "OVER(",
756 " OVER (",
757 " JOIN ",
758 ];
759 for kw in FORBIDDEN {
760 if upper.contains(kw) {
761 return false;
762 }
763 }
764 if let Some(idx) = upper.find("ORDER BY") {
765 let tail = &upper[idx + "ORDER BY".len()..];
766 let mut clause = tail.to_string();
768 if let Some(lim) = clause.find(" LIMIT ") {
769 clause.truncate(lim);
770 }
771 if let Some(semi) = clause.find(';') {
772 clause.truncate(semi);
773 }
774 let clause = clause.trim();
775 if !matches!(clause, "RID" | "RID ASC") {
776 return false;
777 }
778 }
779 true
780}
781
782#[derive(Debug, Default)]
788pub struct LeaseRegistry {
789 inner: Mutex<HashMap<u64, LeaseEntry>>,
790}
791
792#[derive(Debug, Clone, Copy)]
793struct LeaseEntry {
794 opened_at_ms: u64,
795 ttl_ms: u64,
796}
797
798#[derive(Debug, Clone, Copy, PartialEq, Eq)]
799pub enum LeaseLookup {
800 Unknown,
802 Expired,
804 Live,
806}
807
808impl LeaseRegistry {
809 pub fn new() -> Arc<Self> {
810 Arc::new(Self::default())
811 }
812
813 pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
818 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
819 inner.insert(
820 snapshot_lsn,
821 LeaseEntry {
822 opened_at_ms,
823 ttl_ms,
824 },
825 );
826 }
827
828 pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
831 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
832 match inner.get(&snapshot_lsn) {
833 None => LeaseLookup::Unknown,
834 Some(entry) => {
835 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
836 LeaseLookup::Expired
837 } else {
838 LeaseLookup::Live
839 }
840 }
841 }
842 }
843
844 #[doc(hidden)]
846 pub fn len(&self) -> usize {
847 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
848 }
849}
850
851pub const CURSOR_TOKEN_BYTES: usize = 24;
858
859static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
860
861pub fn generate_cursor_token() -> String {
866 let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
867 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
868 let salt = NEXT_CURSOR_SALT
869 .fetch_add(1, Ordering::SeqCst)
870 .to_le_bytes();
871 let now = crate::utils::now_unix_nanos().to_le_bytes();
872 bytes[..8].copy_from_slice(&salt);
873 bytes[8..16].copy_from_slice(&now);
874 }
875 crate::utils::to_hex(&bytes)
876}
877
878#[derive(Debug, Clone)]
882struct CursorEntry {
883 snapshot_lsn: u64,
884 tenant: String,
885 principal: String,
886 query: String,
887 entity_types: Option<Vec<String>>,
888 capabilities: Option<Vec<String>>,
889 opened_at_ms: u64,
890 ttl_ms: u64,
891 cancel_token: CancelToken,
896 cancelled: bool,
900}
901
902#[derive(Debug, Clone, PartialEq, Eq)]
906pub struct CursorResume {
907 pub snapshot_lsn: u64,
908 pub query: String,
909 pub entity_types: Option<Vec<String>>,
910 pub capabilities: Option<Vec<String>>,
911 pub expires_at_ms: u64,
912}
913
914#[derive(Debug, Clone, Copy, PartialEq, Eq)]
919pub enum CursorReject {
920 NotFound,
923 Expired,
925 Cancelled,
930}
931
932#[derive(Debug, Default)]
943pub struct CursorRegistry {
944 inner: Mutex<HashMap<String, CursorEntry>>,
945}
946
947impl CursorRegistry {
948 pub fn new() -> Arc<Self> {
949 Arc::new(Self::default())
950 }
951
952 #[allow(clippy::too_many_arguments)]
956 pub fn register(
957 &self,
958 snapshot_lsn: u64,
959 tenant: &str,
960 principal: &str,
961 query: &str,
962 entity_types: Option<Vec<String>>,
963 capabilities: Option<Vec<String>>,
964 opened_at_ms: u64,
965 ttl_ms: u64,
966 ) -> String {
967 let token = generate_cursor_token();
968 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
969 inner.insert(
970 token.clone(),
971 CursorEntry {
972 snapshot_lsn,
973 tenant: tenant.to_string(),
974 principal: principal.to_string(),
975 query: query.to_string(),
976 entity_types,
977 capabilities,
978 opened_at_ms,
979 ttl_ms,
980 cancel_token: CancelToken::new(),
981 cancelled: false,
982 },
983 );
984 token
985 }
986
987 pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
992 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
993 inner.get(token).map(|e| e.cancel_token.clone())
994 }
995
996 pub fn cancel(
1007 &self,
1008 token: &str,
1009 tenant: &str,
1010 principal: &str,
1011 ) -> Result<CancelToken, CursorReject> {
1012 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1013 let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1014 if entry.tenant != tenant || entry.principal != principal {
1015 return Err(CursorReject::NotFound);
1016 }
1017 entry.cancelled = true;
1018 entry.cancel_token.cancel();
1019 Ok(entry.cancel_token.clone())
1020 }
1021
1022 pub fn resolve(
1030 &self,
1031 token: &str,
1032 tenant: &str,
1033 principal: &str,
1034 now_ms: u64,
1035 ) -> Result<CursorResume, CursorReject> {
1036 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1037 let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1038 if entry.tenant != tenant || entry.principal != principal {
1042 return Err(CursorReject::NotFound);
1043 }
1044 if entry.cancelled {
1048 return Err(CursorReject::Cancelled);
1049 }
1050 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1051 return Err(CursorReject::Expired);
1052 }
1053 Ok(CursorResume {
1054 snapshot_lsn: entry.snapshot_lsn,
1055 query: entry.query.clone(),
1056 entity_types: entry.entity_types.clone(),
1057 capabilities: entry.capabilities.clone(),
1058 expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1059 })
1060 }
1061
1062 #[doc(hidden)]
1064 pub fn len(&self) -> usize {
1065 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1066 }
1067
1068 #[doc(hidden)]
1069 pub fn is_empty(&self) -> bool {
1070 self.len() == 0
1071 }
1072}
1073
1074#[derive(Debug, Default)]
1080pub struct PrefixHasher {
1081 inner: Option<sha2::Sha256>,
1082 rows: u64,
1083}
1084
1085impl PrefixHasher {
1086 pub fn new() -> Self {
1087 use sha2::Digest;
1088 Self {
1089 inner: Some(sha2::Sha256::new()),
1090 rows: 0,
1091 }
1092 }
1093
1094 pub fn update(&mut self, line: &[u8]) {
1095 use sha2::Digest;
1096 if let Some(h) = self.inner.as_mut() {
1097 h.update(line);
1098 }
1099 self.rows += 1;
1100 }
1101
1102 pub fn rows(&self) -> u64 {
1103 self.rows
1104 }
1105
1106 pub fn finalize_hex(mut self) -> String {
1109 use sha2::Digest;
1110 let hasher = self
1111 .inner
1112 .take()
1113 .expect("PrefixHasher::finalize_hex called twice");
1114 let digest = hasher.finalize();
1115 let mut out = String::with_capacity(64);
1116 for b in digest.iter() {
1117 out.push_str(&format!("{b:02x}"));
1118 }
1119 out
1120 }
1121}
1122
1123pub fn audit_stream_opened(
1129 runtime: &RedDBRuntime,
1130 lease_handle: &str,
1131 principal: &str,
1132 snapshot_lsn: u64,
1133 query_hash: &str,
1134) {
1135 use crate::json::{Map, Value as JsonValue};
1136 let mut detail = Map::new();
1137 detail.insert(
1138 "lease_handle".to_string(),
1139 JsonValue::String(lease_handle.to_string()),
1140 );
1141 detail.insert(
1142 "snapshot_lsn".to_string(),
1143 JsonValue::Number(snapshot_lsn as f64),
1144 );
1145 detail.insert(
1146 "query_hash".to_string(),
1147 JsonValue::String(query_hash.to_string()),
1148 );
1149 let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1150 .principal(principal)
1151 .resource(lease_handle.to_string())
1152 .outcome(crate::runtime::audit_log::Outcome::Success)
1153 .detail(JsonValue::Object(detail))
1154 .build();
1155 runtime.audit_log().record_event(event);
1156}
1157
1158pub fn audit_stream_closed(
1159 runtime: &RedDBRuntime,
1160 lease_handle: &str,
1161 principal: &str,
1162 reason: CloseReason,
1163 row_count: u64,
1164 bytes_written: u64,
1165) {
1166 use crate::json::{Map, Value as JsonValue};
1167 let mut stats = Map::new();
1168 stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1169 stats.insert(
1170 "bytes_written".to_string(),
1171 JsonValue::Number(bytes_written as f64),
1172 );
1173 let mut detail = Map::new();
1174 detail.insert(
1175 "lease_handle".to_string(),
1176 JsonValue::String(lease_handle.to_string()),
1177 );
1178 detail.insert(
1179 "reason".to_string(),
1180 JsonValue::String(reason.as_str().to_string()),
1181 );
1182 detail.insert("stats".to_string(), JsonValue::Object(stats));
1183 let outcome = match reason {
1184 CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1185 CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1186 _ => crate::runtime::audit_log::Outcome::Error,
1187 };
1188 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1189 .principal(principal)
1190 .resource(lease_handle.to_string())
1191 .outcome(outcome)
1192 .detail(JsonValue::Object(detail))
1193 .build();
1194 runtime.audit_log().record_event(event);
1195}
1196
1197pub fn audit_token_expired_during_lease(
1198 runtime: &RedDBRuntime,
1199 lease_handle: &str,
1200 principal: &str,
1201 token_expiry_ms: u64,
1202) {
1203 use crate::json::{Map, Value as JsonValue};
1204 let mut detail = Map::new();
1205 detail.insert(
1206 "lease_handle".to_string(),
1207 JsonValue::String(lease_handle.to_string()),
1208 );
1209 detail.insert(
1210 "token_expiry".to_string(),
1211 JsonValue::Number(token_expiry_ms as f64),
1212 );
1213 detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1214 let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1215 .principal(principal)
1216 .resource(lease_handle.to_string())
1217 .outcome(crate::runtime::audit_log::Outcome::Success)
1218 .detail(JsonValue::Object(detail))
1219 .build();
1220 runtime.audit_log().record_event(event);
1221}
1222
1223pub fn audit_stream_capacity_refused(
1229 runtime: &RedDBRuntime,
1230 principal: &str,
1231 code: &str,
1232 limit: usize,
1233 current: usize,
1234) {
1235 use crate::json::{Map, Value as JsonValue};
1236 let mut detail = Map::new();
1237 detail.insert(
1238 "reason".to_string(),
1239 JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1240 );
1241 detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1242 detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1243 detail.insert("current".to_string(), JsonValue::Number(current as f64));
1244 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1245 .principal(principal)
1246 .outcome(crate::runtime::audit_log::Outcome::Denied)
1247 .detail(JsonValue::Object(detail))
1248 .build();
1249 runtime.audit_log().record_event(event);
1250}
1251
1252pub fn system_clock() -> Arc<dyn Clock> {
1255 static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1256 Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1257}
1258
1259#[cfg(test)]
1260mod tests {
1261 use super::*;
1262
1263 #[test]
1264 fn open_stream_refuses_when_session_has_active_transaction() {
1265 let clock = FakeClock::new(0);
1266 let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1267 assert_eq!(err, OpenStreamError::TransactionActive);
1268 assert_eq!(err.code(), "stream_in_transaction_unsupported");
1269 }
1270
1271 #[test]
1272 fn open_stream_succeeds_when_session_is_autocommit() {
1273 let clock = FakeClock::new(1_700_000_000_000);
1274 let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1275 assert_eq!(lease.snapshot_lsn, 123);
1276 assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1277 assert!(lease.id >= 1);
1278 }
1279
1280 #[test]
1281 fn lease_ids_are_unique_and_monotonic() {
1282 let clock = FakeClock::new(0);
1283 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1284 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1285 assert!(b.id > a.id);
1286 }
1287
1288 #[test]
1289 fn snapshot_expired_uses_injected_clock_and_ttl() {
1290 let clock = FakeClock::new(0);
1293 let mut config = StreamConfig::DEFAULT;
1294 config.snapshot_ttl_ms = 5_000;
1295 let lease = open_stream(config, 0, false, &clock).unwrap();
1296
1297 assert!(!lease.snapshot_expired(clock.now_ms()));
1298 clock.advance(4_999);
1299 assert!(!lease.snapshot_expired(clock.now_ms()));
1300 clock.advance(1);
1301 assert!(lease.snapshot_expired(clock.now_ms()));
1302 }
1303
1304 #[test]
1305 fn stream_config_loads_defaults_when_kv_is_empty() {
1306 let cfg = StreamConfig::DEFAULT;
1308 assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1309 assert_eq!(cfg.chunk_default_pages, 4);
1310 assert_eq!(cfg.chunk_min_pages, 1);
1311 assert_eq!(cfg.chunk_max_pages, 64);
1312 assert_eq!(cfg.chunk_max_rows, 1000);
1313 assert_eq!(cfg.chunk_max_latency_ms, 50);
1314 assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1315 }
1316
1317 #[test]
1318 fn stream_config_normalize_clamps_inconsistent_inputs() {
1319 let mut cfg = StreamConfig {
1320 snapshot_ttl_ms: 1,
1321 chunk_default_pages: 100,
1322 chunk_min_pages: 0,
1323 chunk_max_pages: 8,
1324 chunk_max_rows: 0,
1325 chunk_max_latency_ms: 1,
1326 max_global_streams: 0,
1327 max_per_principal_streams: 0,
1328 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1329 };
1330 cfg.normalize();
1331 assert_eq!(cfg.chunk_min_pages, 1);
1332 assert_eq!(cfg.chunk_max_pages, 8);
1333 assert_eq!(cfg.chunk_default_pages, 8); assert!(cfg.chunk_max_rows >= 1);
1335 assert!(cfg.max_global_streams >= 1);
1336 assert!(cfg.max_per_principal_streams >= 1);
1337 }
1338
1339 struct CapturingSink {
1344 chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1345 }
1346 impl CapturingSink {
1347 fn new() -> Self {
1348 Self {
1349 chunks: std::cell::RefCell::new(Vec::new()),
1350 }
1351 }
1352 fn len(&self) -> usize {
1353 self.chunks.borrow().len()
1354 }
1355 fn last_len(&self) -> Option<usize> {
1356 self.chunks.borrow().last().map(|c| c.len())
1357 }
1358 }
1359
1360 fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1361 move |bytes: &[u8]| {
1362 sink.chunks.borrow_mut().push(bytes.to_vec());
1363 Ok(())
1364 }
1365 }
1366
1367 #[test]
1368 fn chunk_producer_flushes_on_byte_cap() {
1369 let clock = FakeClock::new(0);
1370 let cfg = StreamConfig {
1371 chunk_default_pages: 1, chunk_min_pages: 1,
1373 chunk_max_pages: 1,
1374 chunk_max_rows: 1_000_000,
1375 chunk_max_latency_ms: 1_000_000,
1376 ..StreamConfig::DEFAULT
1377 };
1378 let sink = CapturingSink::new();
1379 let mut producer = ChunkProducer::new(&cfg, &clock);
1380 let mut flush = capture(&sink);
1381
1382 producer
1383 .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1384 .unwrap();
1385 assert_eq!(sink.len(), 0);
1386
1387 let triggered = producer
1388 .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1389 .unwrap();
1390 assert!(triggered);
1391 assert_eq!(sink.len(), 1);
1392 assert_eq!(sink.last_len(), Some(16 * 1024));
1393 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1394 }
1395
1396 #[test]
1397 fn chunk_producer_flushes_on_row_cap() {
1398 let clock = FakeClock::new(0);
1399 let cfg = StreamConfig {
1400 chunk_default_pages: 4, chunk_min_pages: 1,
1402 chunk_max_pages: 64,
1403 chunk_max_rows: 3,
1404 chunk_max_latency_ms: 1_000_000,
1405 ..StreamConfig::DEFAULT
1406 };
1407 let sink = CapturingSink::new();
1408 let mut producer = ChunkProducer::new(&cfg, &clock);
1409 let mut flush = capture(&sink);
1410 let row = b"{\"row\":{}}\n";
1411 producer.push_line(row, &mut flush).unwrap();
1412 producer.push_line(row, &mut flush).unwrap();
1413 assert_eq!(sink.len(), 0);
1414 let triggered = producer.push_line(row, &mut flush).unwrap();
1415 assert!(triggered);
1416 assert_eq!(sink.len(), 1);
1417 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1418 }
1419
1420 #[test]
1421 fn chunk_producer_flushes_on_latency_cap() {
1422 let clock = FakeClock::new(0);
1423 let cfg = StreamConfig {
1424 chunk_default_pages: 4,
1425 chunk_min_pages: 1,
1426 chunk_max_pages: 64,
1427 chunk_max_rows: 1_000_000,
1428 chunk_max_latency_ms: 50,
1429 ..StreamConfig::DEFAULT
1430 };
1431 let sink = CapturingSink::new();
1432 let mut producer = ChunkProducer::new(&cfg, &clock);
1433 let mut flush = capture(&sink);
1434 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1435 assert_eq!(sink.len(), 0);
1436 clock.advance(60);
1437 let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1438 assert!(triggered);
1439 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1440 }
1441
1442 #[test]
1443 fn chunk_producer_finish_emits_terminal_flush() {
1444 let clock = FakeClock::new(0);
1445 let cfg = StreamConfig::DEFAULT;
1446 let sink = CapturingSink::new();
1447 let mut producer = ChunkProducer::new(&cfg, &clock);
1448 let mut flush = capture(&sink);
1449 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1450 producer.finish(&mut flush).unwrap();
1451 assert_eq!(sink.len(), 1);
1452 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1453 }
1454
1455 #[test]
1456 fn write_chunked_helpers_produce_well_formed_chunks() {
1457 let mut buf: Vec<u8> = Vec::new();
1458 write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1459 write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1460 write_chunked_terminator(&mut buf).unwrap();
1461 let text = String::from_utf8(buf).unwrap();
1462 assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1463 assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1464 assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1466 assert!(text.ends_with("0\r\n\r\n"));
1467 }
1468
1469 #[test]
1472 fn capacity_registry_global_exhausted_returns_structured_error() {
1473 let reg = StreamCapacityRegistry::new();
1474 let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1475 let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1476 let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1477 assert_eq!(
1478 err,
1479 AcquireError::GlobalExhausted {
1480 limit: 2,
1481 current: 2,
1482 }
1483 );
1484 assert_eq!(err.code(), "server_stream_capacity_exhausted");
1485 }
1486
1487 #[test]
1488 fn capacity_registry_per_principal_exhausted_independent_of_global() {
1489 let reg = StreamCapacityRegistry::new();
1493 let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1494 let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1495 let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1496 assert_eq!(
1497 err,
1498 AcquireError::PrincipalExhausted {
1499 principal: "alice".to_string(),
1500 limit: 2,
1501 current: 2,
1502 }
1503 );
1504 assert_eq!(err.code(), "principal_stream_quota_exhausted");
1505
1506 let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1508 let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1509 }
1510
1511 #[test]
1512 fn capacity_registry_release_frees_both_counters() {
1513 let reg = StreamCapacityRegistry::new();
1515 let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1516 assert!(reg.try_acquire("alice", 1, 1).is_err());
1517 drop(g1);
1518 let (global, per_principal) = reg.snapshot();
1519 assert_eq!(global, 0);
1520 assert!(per_principal.is_empty());
1521 let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1523 }
1524
1525 #[test]
1526 fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1527 use std::sync::atomic::{AtomicUsize, Ordering};
1533
1534 const THREADS: usize = 16;
1535 const ITERS: usize = 200;
1536 const CAP_GLOBAL: usize = 4;
1537 const CAP_PER_PRINCIPAL: usize = 4;
1538
1539 let reg = StreamCapacityRegistry::new();
1540 let observed_max = Arc::new(AtomicUsize::new(0));
1541 let mut handles = Vec::new();
1542 for tid in 0..THREADS {
1543 let reg = Arc::clone(®);
1544 let observed_max = Arc::clone(&observed_max);
1545 let principal = format!("p{}", tid % 2);
1548 handles.push(std::thread::spawn(move || {
1549 for _ in 0..ITERS {
1550 if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1551 let (live, _) = reg.snapshot();
1552 observed_max.fetch_max(live, Ordering::SeqCst);
1553 std::thread::yield_now();
1556 drop(guard);
1557 }
1558 }
1559 }));
1560 }
1561 for h in handles {
1562 h.join().unwrap();
1563 }
1564 let (global_after, per_principal_after) = reg.snapshot();
1565 assert_eq!(global_after, 0, "global counter leaked");
1566 assert!(
1567 per_principal_after.is_empty(),
1568 "per-principal map leaked: {per_principal_after:?}"
1569 );
1570 assert!(
1571 observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1572 "global cap was breached: observed {} > {}",
1573 observed_max.load(Ordering::SeqCst),
1574 CAP_GLOBAL
1575 );
1576 }
1577
1578 #[test]
1581 fn assess_resumability_accepts_plain_select() {
1582 assert!(assess_resumability("SELECT id, name FROM t"));
1583 assert!(assess_resumability("select * from t where id > 5"));
1584 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1585 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1586 assert!(assess_resumability(
1587 "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1588 ));
1589 }
1590
1591 #[test]
1592 fn assess_resumability_rejects_aggregates_and_unordered() {
1593 assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1594 assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1595 assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1596 assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1597 assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1598 assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1599 assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1600 assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1601 assert!(!assess_resumability(
1602 "SELECT a FROM t JOIN u ON t.id = u.id"
1603 ));
1604 }
1605
1606 #[test]
1607 fn lease_registry_records_and_expires_against_ttl() {
1608 let reg = LeaseRegistry::new();
1609 reg.record(42, 1_000, 5_000);
1610 assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1611 assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1612 assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1613 assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1614 }
1615
1616 #[test]
1617 fn prefix_hasher_is_order_sensitive_and_deterministic() {
1618 let mut a = PrefixHasher::new();
1619 a.update(b"{\"row\":{\"id\":1}}");
1620 a.update(b"{\"row\":{\"id\":2}}");
1621 let hash_a = a.finalize_hex();
1622
1623 let mut b = PrefixHasher::new();
1624 b.update(b"{\"row\":{\"id\":1}}");
1625 b.update(b"{\"row\":{\"id\":2}}");
1626 let hash_b = b.finalize_hex();
1627 assert_eq!(hash_a, hash_b);
1628
1629 let mut c = PrefixHasher::new();
1630 c.update(b"{\"row\":{\"id\":2}}");
1631 c.update(b"{\"row\":{\"id\":1}}");
1632 assert_ne!(hash_a, c.finalize_hex());
1633 assert_eq!(hash_a.len(), 64);
1634 }
1635
1636 #[test]
1639 fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1640 let clock = FakeClock::new(0);
1644 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1645 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1646 assert_eq!(
1647 a.lease_handle.len(),
1648 LEASE_HANDLE_BYTES * 2,
1649 "handle must be 128 bits hex-encoded: {}",
1650 a.lease_handle
1651 );
1652 assert!(
1653 a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1654 "handle must be hex: {}",
1655 a.lease_handle
1656 );
1657 assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1658 assert!(b.id > a.id);
1661 }
1662
1663 #[test]
1664 fn generate_lease_handle_produces_high_entropy_distinct_values() {
1665 let mut seen = std::collections::HashSet::new();
1669 for _ in 0..1024 {
1670 assert!(
1671 seen.insert(generate_lease_handle()),
1672 "duplicate handle in CSPRNG sequence"
1673 );
1674 }
1675 }
1676
1677 #[test]
1678 fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1679 let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1684 assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1685 }
1686
1687 #[test]
1688 fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1689 assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1690 assert_eq!(parse_jwt_exp_ms("only.two"), None);
1691 assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1692 }
1693
1694 fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1697 reg.register(
1698 42,
1699 "acme",
1700 "bearer:abc",
1701 "SELECT id FROM users ORDER BY rid",
1702 None,
1703 None,
1704 now_ms,
1705 1_000,
1706 )
1707 }
1708
1709 #[test]
1710 fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1711 let reg = CursorRegistry::default();
1712 let a = register_default_cursor(®, 0);
1713 let b = register_default_cursor(®, 0);
1714 assert_eq!(
1715 a.len(),
1716 CURSOR_TOKEN_BYTES * 2,
1717 "token must be 192 bits hex-encoded: {a}"
1718 );
1719 assert!(
1720 a.chars().all(|c| c.is_ascii_hexdigit()),
1721 "token must be hex: {a}"
1722 );
1723 assert_ne!(a, b, "tokens must differ across registrations");
1724 assert_eq!(reg.len(), 2);
1725 }
1726
1727 #[test]
1728 fn cursor_resolves_for_owner_within_ttl() {
1729 let reg = CursorRegistry::default();
1731 let token = register_default_cursor(®, 1_000);
1732 let resume = reg
1733 .resolve(&token, "acme", "bearer:abc", 1_500)
1734 .expect("live cursor resolves for its owner");
1735 assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1736 assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1737 assert_eq!(resume.expires_at_ms, 2_000);
1738 }
1739
1740 #[test]
1741 fn cursor_rejects_after_ttl_for_owner() {
1742 let reg = CursorRegistry::default();
1745 let token = register_default_cursor(®, 0);
1746 assert_eq!(
1747 reg.resolve(&token, "acme", "bearer:abc", 1_000),
1748 Err(CursorReject::Expired),
1749 "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1750 );
1751 assert_eq!(
1752 reg.resolve(&token, "acme", "bearer:abc", 5_000),
1753 Err(CursorReject::Expired)
1754 );
1755 }
1756
1757 #[test]
1758 fn cursor_cross_tenant_is_masked_as_not_found() {
1759 let reg = CursorRegistry::default();
1763 let token = register_default_cursor(®, 0);
1764 assert_eq!(
1765 reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1766 Err(CursorReject::NotFound),
1767 "cross-tenant resume must mask existence as NotFound"
1768 );
1769 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1771 }
1772
1773 #[test]
1774 fn cursor_cross_principal_is_masked_as_not_found() {
1775 let reg = CursorRegistry::default();
1778 let token = register_default_cursor(®, 0);
1779 assert_eq!(
1780 reg.resolve(&token, "acme", "bearer:other", 100),
1781 Err(CursorReject::NotFound),
1782 "cross-principal resume must mask existence as NotFound"
1783 );
1784 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1785 }
1786
1787 #[test]
1788 fn cursor_unknown_token_is_not_found() {
1789 let reg = CursorRegistry::default();
1790 assert!(reg.is_empty());
1791 assert_eq!(
1792 reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1793 Err(CursorReject::NotFound)
1794 );
1795 }
1796
1797 #[test]
1798 fn cursor_scope_is_checked_before_expiry() {
1799 let reg = CursorRegistry::default();
1803 let token = register_default_cursor(®, 0);
1804 assert_eq!(
1805 reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1806 Err(CursorReject::NotFound),
1807 "expired + wrong scope must mask as NotFound, not Expired"
1808 );
1809 }
1810
1811 #[test]
1814 fn cancel_tombstones_cursor_and_raises_token() {
1815 let reg = CursorRegistry::default();
1818 let token = register_default_cursor(®, 0);
1819 let live = reg
1820 .cancel_token_for(&token)
1821 .expect("freshly-minted cursor exposes its token");
1822 assert!(!live.is_cancelled(), "token starts un-cancelled");
1823
1824 let returned = reg
1825 .cancel(&token, "acme", "bearer:abc")
1826 .expect("owner cancels its own cursor");
1827 assert!(returned.is_cancelled(), "cancel raises the returned token");
1828 assert!(
1829 live.is_cancelled(),
1830 "the handler-held clone observes the cancel"
1831 );
1832 }
1833
1834 #[test]
1835 fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1836 let reg = CursorRegistry::default();
1840 let token = register_default_cursor(®, 0);
1841 reg.cancel(&token, "acme", "bearer:abc")
1842 .expect("owner cancels");
1843 assert_eq!(
1844 reg.resolve(&token, "acme", "bearer:abc", 100),
1845 Err(CursorReject::Cancelled),
1846 "owner resuming a cancelled cursor sees Cancelled"
1847 );
1848 }
1849
1850 #[test]
1851 fn cancel_is_idempotent() {
1852 let reg = CursorRegistry::default();
1853 let token = register_default_cursor(®, 0);
1854 assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1855 let second = reg
1857 .cancel(&token, "acme", "bearer:abc")
1858 .expect("re-cancel is a no-op success");
1859 assert!(second.is_cancelled());
1860 }
1861
1862 #[test]
1863 fn cancel_cross_tenant_is_masked_as_not_found() {
1864 let reg = CursorRegistry::default();
1867 let token = register_default_cursor(®, 0);
1868 assert!(
1869 matches!(
1870 reg.cancel(&token, "evil-corp", "bearer:abc"),
1871 Err(CursorReject::NotFound)
1872 ),
1873 "cross-tenant cancel must mask existence as NotFound"
1874 );
1875 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1877 }
1878
1879 #[test]
1880 fn cancel_cross_principal_is_masked_as_not_found() {
1881 let reg = CursorRegistry::default();
1882 let token = register_default_cursor(®, 0);
1883 assert!(
1884 matches!(
1885 reg.cancel(&token, "acme", "bearer:other"),
1886 Err(CursorReject::NotFound)
1887 ),
1888 "cross-principal cancel must mask existence as NotFound"
1889 );
1890 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1891 }
1892
1893 #[test]
1894 fn cancel_unknown_token_is_not_found() {
1895 let reg = CursorRegistry::default();
1896 assert!(matches!(
1897 reg.cancel("deadbeef", "acme", "bearer:abc"),
1898 Err(CursorReject::NotFound)
1899 ));
1900 }
1901
1902 #[test]
1903 fn generate_cursor_token_produces_high_entropy_distinct_values() {
1904 let mut seen = std::collections::HashSet::new();
1905 for _ in 0..1024 {
1906 assert!(
1907 seen.insert(generate_cursor_token()),
1908 "duplicate cursor token in CSPRNG sequence"
1909 );
1910 }
1911 }
1912
1913 #[test]
1914 fn close_reason_as_str_covers_every_state_transition() {
1915 for (variant, expected) in [
1918 (CloseReason::Ok, "ok"),
1919 (CloseReason::Cancelled, "cancelled"),
1920 (CloseReason::Error, "error"),
1921 (CloseReason::SnapshotExpired, "snapshot_expired"),
1922 (CloseReason::CapacityRefused, "capacity_refused"),
1923 (CloseReason::IntegrityFailed, "integrity_failed"),
1924 ] {
1925 assert_eq!(variant.as_str(), expected);
1926 }
1927 }
1928
1929 #[test]
1930 fn stream_config_defaults_carry_s2_caps() {
1931 assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1932 assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1933 }
1934
1935 struct SizeSink {
1940 sizes: std::cell::RefCell<Vec<usize>>,
1941 }
1942 impl SizeSink {
1943 fn new() -> Self {
1944 Self {
1945 sizes: std::cell::RefCell::new(Vec::new()),
1946 }
1947 }
1948 fn flushes(&self) -> usize {
1949 self.sizes.borrow().len()
1950 }
1951 fn max_chunk(&self) -> usize {
1952 self.sizes.borrow().iter().copied().max().unwrap_or(0)
1953 }
1954 }
1955 fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1956 move |bytes: &[u8]| {
1957 sink.sizes.borrow_mut().push(bytes.len());
1958 Ok(())
1959 }
1960 }
1961
1962 #[test]
1963 fn drive_lines_streams_large_source_with_bounded_working_set() {
1964 let clock = FakeClock::new(0);
1971 let cfg = StreamConfig {
1972 chunk_default_pages: 1, chunk_min_pages: 1,
1974 chunk_max_pages: 1,
1975 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 1_000_000,
1977 ..StreamConfig::DEFAULT
1978 };
1979 let sink = SizeSink::new();
1980 let mut producer = ChunkProducer::new(&cfg, &clock);
1981 let mut flush = size_capture(&sink);
1982
1983 const N: u64 = 1_000_000;
1984 let source = 0..N;
1986 let consumed = producer
1987 .drive_lines(
1988 source,
1989 |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1990 &mut flush,
1991 )
1992 .unwrap();
1993 producer.finish(&mut flush).unwrap();
1994
1995 assert_eq!(consumed, N);
1996 assert_eq!(producer.total_rows(), N);
1997 assert!(
1999 sink.flushes() > 1000,
2000 "expected the source to stream across many chunks, saw {}",
2001 sink.flushes()
2002 );
2003 let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
2006 assert!(
2007 sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
2008 "chunk {} exceeded bounded working set {}",
2009 sink.max_chunk(),
2010 cfg.production_buffer_bytes() + max_line
2011 );
2012 }
2013
2014 #[test]
2015 fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2016 let clock = FakeClock::new(0);
2022 let cfg = StreamConfig {
2023 chunk_default_pages: 64, chunk_min_pages: 1,
2025 chunk_max_pages: 64,
2026 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 50,
2028 ..StreamConfig::DEFAULT
2029 };
2030 let sink = SizeSink::new();
2031 let mut producer = ChunkProducer::new(&cfg, &clock);
2032
2033 let mut first_flush_after: Option<u64> = None;
2037 let mut row = 0u64;
2038 while row < 1_000_000 {
2039 let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2040 clock.advance(20);
2041 let mut flush = size_capture(&sink);
2042 let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2043 row += 1;
2044 if flushed {
2045 first_flush_after = Some(row);
2046 break;
2047 }
2048 }
2049
2050 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2051 let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2052 assert!(
2053 rows_before_flush <= 4,
2054 "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2055 );
2056 assert!(rows_before_flush < 1_000_000);
2058 }
2059
2060 #[test]
2061 fn drive_lines_parity_with_manual_push_line() {
2062 let clock = FakeClock::new(0);
2065 let cfg = StreamConfig::DEFAULT;
2066
2067 let lines: Vec<Vec<u8>> = (0..50)
2068 .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2069 .collect();
2070
2071 let driven = CapturingSink::new();
2072 {
2073 let mut p = ChunkProducer::new(&cfg, &clock);
2074 let mut flush = capture(&driven);
2075 p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2076 .unwrap();
2077 p.finish(&mut flush).unwrap();
2078 }
2079
2080 let manual = CapturingSink::new();
2081 {
2082 let mut p = ChunkProducer::new(&cfg, &clock);
2083 let mut flush = capture(&manual);
2084 for l in &lines {
2085 p.push_line(l, &mut flush).unwrap();
2086 }
2087 p.finish(&mut flush).unwrap();
2088 }
2089
2090 assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2091 }
2092}