1use std::collections::HashMap;
23use std::sync::atomic::{AtomicU64, Ordering};
24use std::sync::{Arc, Mutex};
25use std::time::{SystemTime, UNIX_EPOCH};
26
27use crate::runtime::RedDBRuntime;
28use crate::storage::query::engine::cancel::CancelToken;
29use crate::storage::schema::types::Value;
30
31const RED_CONFIG_COLLECTION: &str = "red_config";
32
33pub const PAGE_SIZE: usize = 16 * 1024;
36
37pub trait Clock: Send + Sync {
40 fn now_ms(&self) -> u64;
41}
42
43#[derive(Debug, Default)]
44pub struct SystemClock;
45
46impl Clock for SystemClock {
47 fn now_ms(&self) -> u64 {
48 SystemTime::now()
49 .duration_since(UNIX_EPOCH)
50 .map(|d| d.as_millis() as u64)
51 .unwrap_or(0)
52 }
53}
54
55#[derive(Debug)]
56pub struct FakeClock {
57 now_ms: AtomicU64,
58}
59
60impl FakeClock {
61 pub fn new(now_ms: u64) -> Self {
62 Self {
63 now_ms: AtomicU64::new(now_ms),
64 }
65 }
66
67 pub fn advance(&self, ms: u64) {
68 self.now_ms.fetch_add(ms, Ordering::SeqCst);
69 }
70}
71
72impl Clock for FakeClock {
73 fn now_ms(&self) -> u64 {
74 self.now_ms.load(Ordering::SeqCst)
75 }
76}
77
78#[derive(Debug, Clone, Copy)]
83pub struct StreamConfig {
84 pub snapshot_ttl_ms: u64,
85 pub chunk_default_pages: usize,
86 pub chunk_min_pages: usize,
87 pub chunk_max_pages: usize,
88 pub chunk_max_rows: usize,
89 pub chunk_max_latency_ms: u64,
90 pub max_global_streams: usize,
94 pub max_per_principal_streams: usize,
99 pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
103}
104
105impl Default for StreamConfig {
106 fn default() -> Self {
107 Self::DEFAULT
108 }
109}
110
111impl StreamConfig {
112 pub const DEFAULT: StreamConfig = StreamConfig {
115 snapshot_ttl_ms: 60_000,
116 chunk_default_pages: 4,
117 chunk_min_pages: 1,
118 chunk_max_pages: 64,
119 chunk_max_rows: 1000,
120 chunk_max_latency_ms: 50,
121 max_global_streams: 256,
122 max_per_principal_streams: 32,
123 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
124 };
125
126 pub fn load(runtime: &RedDBRuntime) -> Self {
131 let db = runtime.db();
132 let read_u64 = |key: &str| -> Option<u64> {
133 match db.get_kv(RED_CONFIG_COLLECTION, key) {
134 Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
135 Some((Value::UnsignedInteger(v), _)) => Some(v),
136 Some((Value::Text(text), _)) => text.parse().ok(),
137 _ => None,
138 }
139 };
140
141 let mut cfg = Self::DEFAULT;
142 if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
143 cfg.snapshot_ttl_ms = v;
144 }
145 if let Some(v) = read_u64("stream.chunk.default_pages") {
146 cfg.chunk_default_pages = v as usize;
147 }
148 if let Some(v) = read_u64("stream.chunk.min_pages") {
149 cfg.chunk_min_pages = v as usize;
150 }
151 if let Some(v) = read_u64("stream.chunk.max_pages") {
152 cfg.chunk_max_pages = v as usize;
153 }
154 if let Some(v) = read_u64("stream.chunk.max_rows") {
155 cfg.chunk_max_rows = v as usize;
156 }
157 if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
158 cfg.chunk_max_latency_ms = v;
159 }
160 if let Some(v) = read_u64("stream.max_global") {
161 cfg.max_global_streams = v as usize;
162 }
163 if let Some(v) = read_u64("stream.max_per_principal") {
164 cfg.max_per_principal_streams = v as usize;
165 }
166 if let Some((Value::Text(text), _)) =
169 db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
170 {
171 cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
172 }
173 cfg.normalize();
174 cfg
175 }
176
177 fn normalize(&mut self) {
181 if self.chunk_min_pages == 0 {
182 self.chunk_min_pages = 1;
183 }
184 if self.chunk_max_pages < self.chunk_min_pages {
185 self.chunk_max_pages = self.chunk_min_pages;
186 }
187 if self.chunk_default_pages < self.chunk_min_pages {
188 self.chunk_default_pages = self.chunk_min_pages;
189 }
190 if self.chunk_default_pages > self.chunk_max_pages {
191 self.chunk_default_pages = self.chunk_max_pages;
192 }
193 if self.chunk_max_rows == 0 {
194 self.chunk_max_rows = 1;
195 }
196 if self.max_global_streams == 0 {
197 self.max_global_streams = 1;
198 }
199 if self.max_per_principal_streams == 0 {
200 self.max_per_principal_streams = 1;
201 }
202 }
203
204 pub fn production_buffer_bytes(&self) -> usize {
207 self.chunk_default_pages.saturating_mul(PAGE_SIZE)
208 }
209}
210
211static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
219
220pub const LEASE_HANDLE_BYTES: usize = 16;
226
227pub fn generate_lease_handle() -> String {
231 let mut bytes = [0u8; LEASE_HANDLE_BYTES];
232 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
233 let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
239 let now = crate::utils::now_unix_nanos().to_le_bytes();
240 bytes[..8].copy_from_slice(&lo);
241 bytes[8..].copy_from_slice(&now);
242 }
243 crate::utils::to_hex(&bytes)
244}
245
246pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
256 let parts: Vec<&str> = token.split('.').collect();
257 if parts.len() != 3 {
258 return None;
259 }
260 let payload = base64url_decode_padded(parts[1])?;
261 let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
262 let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
263 if exp_secs <= 0 {
264 return None;
265 }
266 Some((exp_secs as u64).saturating_mul(1000))
267}
268
269fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
270 let mut s = String::with_capacity(input.len() + 4);
271 for ch in input.chars() {
272 match ch {
273 '-' => s.push('+'),
274 '_' => s.push('/'),
275 _ => s.push(ch),
276 }
277 }
278 while !s.len().is_multiple_of(4) {
279 s.push('=');
280 }
281 crate::wire::redwire::auth::base64_std_decode(&s)
282}
283
284#[derive(Debug, Clone, Copy, PartialEq, Eq)]
288pub enum CloseReason {
289 Ok,
290 Cancelled,
291 Error,
292 SnapshotExpired,
293 CapacityRefused,
294 IntegrityFailed,
295}
296
297impl CloseReason {
298 pub fn as_str(&self) -> &'static str {
299 match self {
300 CloseReason::Ok => "ok",
301 CloseReason::Cancelled => "cancelled",
302 CloseReason::Error => "error",
303 CloseReason::SnapshotExpired => "snapshot_expired",
304 CloseReason::CapacityRefused => "capacity_refused",
305 CloseReason::IntegrityFailed => "integrity_failed",
306 }
307 }
308}
309
310#[derive(Debug)]
311pub struct StreamLease {
312 pub id: u64,
313 pub lease_handle: String,
317 pub snapshot_lsn: u64,
318 pub opened_at_ms: u64,
319 pub config: StreamConfig,
320}
321
322impl StreamLease {
323 pub fn snapshot_expired(&self, now_ms: u64) -> bool {
330 now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
331 }
332}
333
334#[derive(Debug, PartialEq, Eq)]
335pub enum OpenStreamError {
336 TransactionActive,
340}
341
342impl OpenStreamError {
343 pub fn code(&self) -> &'static str {
344 match self {
345 OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
346 }
347 }
348
349 pub fn message(&self) -> &'static str {
350 match self {
351 OpenStreamError::TransactionActive => {
352 "cannot open output stream while a transaction is active on this session"
353 }
354 }
355 }
356}
357
358pub fn open_stream(
363 config: StreamConfig,
364 snapshot_lsn: u64,
365 in_transaction: bool,
366 clock: &dyn Clock,
367) -> Result<StreamLease, OpenStreamError> {
368 if in_transaction {
369 return Err(OpenStreamError::TransactionActive);
370 }
371 Ok(StreamLease {
372 id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
373 lease_handle: generate_lease_handle(),
374 snapshot_lsn,
375 opened_at_ms: clock.now_ms(),
376 config,
377 })
378}
379
380pub struct ChunkProducer<'a> {
388 buf: Vec<u8>,
389 rows: usize,
390 window_started_ms: u64,
391 cap_bytes: usize,
392 cap_rows: usize,
393 cap_latency_ms: u64,
394 clock: &'a dyn Clock,
395 total_flushes: u64,
396 total_bytes: u64,
397 total_rows: u64,
398 last_flush_reason: Option<FlushReason>,
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum FlushReason {
403 Byte,
404 Row,
405 Latency,
406 Terminal,
407}
408
409impl<'a> ChunkProducer<'a> {
410 pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
411 let cap_bytes = config.production_buffer_bytes();
412 Self {
413 buf: Vec::with_capacity(cap_bytes),
414 rows: 0,
415 window_started_ms: clock.now_ms(),
416 cap_bytes,
417 cap_rows: config.chunk_max_rows,
418 cap_latency_ms: config.chunk_max_latency_ms,
419 clock,
420 total_flushes: 0,
421 total_bytes: 0,
422 total_rows: 0,
423 last_flush_reason: None,
424 }
425 }
426
427 pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
430 where
431 F: FnMut(&[u8]) -> std::io::Result<()>,
432 {
433 self.buf.extend_from_slice(line);
434 self.rows += 1;
435 self.total_rows += 1;
436
437 if self.buf.len() >= self.cap_bytes {
438 self.flush(flush, FlushReason::Byte)?;
439 return Ok(true);
440 }
441 if self.rows >= self.cap_rows {
442 self.flush(flush, FlushReason::Row)?;
443 return Ok(true);
444 }
445 let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
446 if elapsed >= self.cap_latency_ms {
447 self.flush(flush, FlushReason::Latency)?;
448 return Ok(true);
449 }
450 Ok(false)
451 }
452
453 pub fn drive_lines<S, R, Enc, F>(
471 &mut self,
472 source: S,
473 mut encode: Enc,
474 flush: &mut F,
475 ) -> std::io::Result<u64>
476 where
477 S: IntoIterator<Item = R>,
478 Enc: FnMut(&R) -> Vec<u8>,
479 F: FnMut(&[u8]) -> std::io::Result<()>,
480 {
481 let mut count = 0u64;
482 for record in source {
483 let line = encode(&record);
484 self.push_line(&line, flush)?;
485 count += 1;
486 }
487 Ok(count)
488 }
489
490 pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
494 where
495 F: FnMut(&[u8]) -> std::io::Result<()>,
496 {
497 if !self.buf.is_empty() {
498 self.flush(flush, FlushReason::Terminal)?;
499 }
500 Ok(())
501 }
502
503 fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
504 where
505 F: FnMut(&[u8]) -> std::io::Result<()>,
506 {
507 flush(&self.buf)?;
508 self.total_bytes += self.buf.len() as u64;
509 self.total_flushes += 1;
510 self.last_flush_reason = Some(reason);
511 self.buf.clear();
512 self.rows = 0;
513 self.window_started_ms = self.clock.now_ms();
514 Ok(())
515 }
516
517 pub fn total_flushes(&self) -> u64 {
518 self.total_flushes
519 }
520 pub fn total_bytes(&self) -> u64 {
521 self.total_bytes
522 }
523 pub fn total_rows(&self) -> u64 {
524 self.total_rows
525 }
526 pub fn last_flush_reason(&self) -> Option<FlushReason> {
527 self.last_flush_reason
528 }
529}
530
531pub fn write_chunked_response_header<W: std::io::Write>(
535 writer: &mut W,
536 status: u16,
537 content_type: &str,
538) -> std::io::Result<()> {
539 let mut header = format!(
547 "HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n",
548 status,
549 crate::server::transport::status_text(status),
550 content_type,
551 );
552 for (name, value) in crate::server::transport::CORS_HEADER_PAIRS {
556 header.push_str(name);
557 header.push_str(": ");
558 header.push_str(value);
559 header.push_str("\r\n");
560 }
561 header.push_str("\r\n");
562 writer.write_all(header.as_bytes())?;
563 writer.flush()
564}
565
566pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
570 if bytes.is_empty() {
571 return Ok(());
572 }
573 let size = format!("{:x}\r\n", bytes.len());
574 writer.write_all(size.as_bytes())?;
575 writer.write_all(bytes)?;
576 writer.write_all(b"\r\n")?;
577 writer.flush()
578}
579
580pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
582 writer.write_all(b"0\r\n\r\n")?;
583 writer.flush()
584}
585
586#[derive(Debug, Default)]
594pub struct StreamCapacityRegistry {
595 inner: Mutex<CapacityInner>,
596}
597
598#[derive(Debug, Default)]
599struct CapacityInner {
600 global_count: usize,
601 per_principal: HashMap<String, usize>,
602}
603
604#[derive(Debug, Clone, PartialEq, Eq)]
609pub enum AcquireError {
610 GlobalExhausted { limit: usize, current: usize },
612 PrincipalExhausted {
616 principal: String,
617 limit: usize,
618 current: usize,
619 },
620}
621
622impl AcquireError {
623 pub fn code(&self) -> &'static str {
624 match self {
625 AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
626 AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
627 }
628 }
629
630 pub fn message(&self) -> String {
631 match self {
632 AcquireError::GlobalExhausted { limit, current } => {
633 format!("server stream capacity exhausted (limit {limit}, current {current})")
634 }
635 AcquireError::PrincipalExhausted {
636 principal,
637 limit,
638 current,
639 } => format!(
640 "principal {principal} stream quota exhausted (limit {limit}, current {current})"
641 ),
642 }
643 }
644}
645
646impl StreamCapacityRegistry {
647 pub fn new() -> Arc<Self> {
648 Arc::new(Self::default())
649 }
650
651 pub fn try_acquire(
655 self: &Arc<Self>,
656 principal: &str,
657 max_global: usize,
658 max_per_principal: usize,
659 ) -> Result<StreamCapacityGuard, AcquireError> {
660 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
661 if inner.global_count >= max_global {
662 return Err(AcquireError::GlobalExhausted {
663 limit: max_global,
664 current: inner.global_count,
665 });
666 }
667 let current = inner.per_principal.get(principal).copied().unwrap_or(0);
668 if current >= max_per_principal {
669 return Err(AcquireError::PrincipalExhausted {
670 principal: principal.to_string(),
671 limit: max_per_principal,
672 current,
673 });
674 }
675 inner.global_count += 1;
676 inner
677 .per_principal
678 .insert(principal.to_string(), current + 1);
679 Ok(StreamCapacityGuard {
680 registry: Arc::clone(self),
681 principal: principal.to_string(),
682 released: false,
683 })
684 }
685
686 fn release(&self, principal: &str) {
687 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
688 if inner.global_count > 0 {
689 inner.global_count -= 1;
690 }
691 if let Some(count) = inner.per_principal.get_mut(principal) {
692 if *count > 0 {
693 *count -= 1;
694 }
695 if *count == 0 {
696 inner.per_principal.remove(principal);
697 }
698 }
699 }
700
701 pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
703 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
704 (inner.global_count, inner.per_principal.clone())
705 }
706}
707
708#[must_use = "dropping the guard immediately releases the stream slot"]
713#[derive(Debug)]
714pub struct StreamCapacityGuard {
715 registry: Arc<StreamCapacityRegistry>,
716 principal: String,
717 released: bool,
718}
719
720impl StreamCapacityGuard {
721 pub fn principal(&self) -> &str {
722 &self.principal
723 }
724}
725
726impl Drop for StreamCapacityGuard {
727 fn drop(&mut self) {
728 if !self.released {
729 self.registry.release(&self.principal);
730 self.released = true;
731 }
732 }
733}
734
735pub fn assess_resumability(query: &str) -> bool {
744 let upper = query.to_uppercase();
745 let trimmed = upper.trim_start();
746 if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
747 return false;
748 }
749 const FORBIDDEN: &[&str] = &[
750 " GROUP BY ",
751 " HAVING ",
752 " DISTINCT ",
753 "DISTINCT ",
754 "COUNT(",
755 "SUM(",
756 "AVG(",
757 "MIN(",
758 "MAX(",
759 "ARRAY_AGG(",
760 "JSON_AGG(",
761 "OVER(",
762 " OVER (",
763 " JOIN ",
764 ];
765 for kw in FORBIDDEN {
766 if upper.contains(kw) {
767 return false;
768 }
769 }
770 if let Some(idx) = upper.find("ORDER BY") {
771 let tail = &upper[idx + "ORDER BY".len()..];
772 let mut clause = tail.to_string();
774 if let Some(lim) = clause.find(" LIMIT ") {
775 clause.truncate(lim);
776 }
777 if let Some(semi) = clause.find(';') {
778 clause.truncate(semi);
779 }
780 let clause = clause.trim();
781 if !matches!(clause, "RID" | "RID ASC") {
782 return false;
783 }
784 }
785 true
786}
787
788#[derive(Debug, Default)]
794pub struct LeaseRegistry {
795 inner: Mutex<HashMap<u64, LeaseEntry>>,
796}
797
798#[derive(Debug, Clone, Copy)]
799struct LeaseEntry {
800 opened_at_ms: u64,
801 ttl_ms: u64,
802}
803
804#[derive(Debug, Clone, Copy, PartialEq, Eq)]
805pub enum LeaseLookup {
806 Unknown,
808 Expired,
810 Live,
812}
813
814impl LeaseRegistry {
815 pub fn new() -> Arc<Self> {
816 Arc::new(Self::default())
817 }
818
819 pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
824 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
825 inner.insert(
826 snapshot_lsn,
827 LeaseEntry {
828 opened_at_ms,
829 ttl_ms,
830 },
831 );
832 }
833
834 pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
837 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
838 match inner.get(&snapshot_lsn) {
839 None => LeaseLookup::Unknown,
840 Some(entry) => {
841 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
842 LeaseLookup::Expired
843 } else {
844 LeaseLookup::Live
845 }
846 }
847 }
848 }
849
850 #[doc(hidden)]
852 pub fn len(&self) -> usize {
853 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
854 }
855}
856
857pub const CURSOR_TOKEN_BYTES: usize = 24;
864
865static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
866
867pub fn generate_cursor_token() -> String {
872 let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
873 if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
874 let salt = NEXT_CURSOR_SALT
875 .fetch_add(1, Ordering::SeqCst)
876 .to_le_bytes();
877 let now = crate::utils::now_unix_nanos().to_le_bytes();
878 bytes[..8].copy_from_slice(&salt);
879 bytes[8..16].copy_from_slice(&now);
880 }
881 crate::utils::to_hex(&bytes)
882}
883
884#[derive(Debug, Clone)]
888struct CursorEntry {
889 snapshot_lsn: u64,
890 tenant: String,
891 principal: String,
892 query: String,
893 entity_types: Option<Vec<String>>,
894 capabilities: Option<Vec<String>>,
895 opened_at_ms: u64,
896 ttl_ms: u64,
897 cancel_token: CancelToken,
902 cancelled: bool,
906}
907
908#[derive(Debug, Clone, PartialEq, Eq)]
912pub struct CursorResume {
913 pub snapshot_lsn: u64,
914 pub query: String,
915 pub entity_types: Option<Vec<String>>,
916 pub capabilities: Option<Vec<String>>,
917 pub expires_at_ms: u64,
918}
919
920#[derive(Debug, Clone, Copy, PartialEq, Eq)]
925pub enum CursorReject {
926 NotFound,
929 Expired,
931 Cancelled,
936}
937
938#[derive(Debug, Default)]
949pub struct CursorRegistry {
950 inner: Mutex<HashMap<String, CursorEntry>>,
951}
952
953impl CursorRegistry {
954 pub fn new() -> Arc<Self> {
955 Arc::new(Self::default())
956 }
957
958 #[allow(clippy::too_many_arguments)]
962 pub fn register(
963 &self,
964 snapshot_lsn: u64,
965 tenant: &str,
966 principal: &str,
967 query: &str,
968 entity_types: Option<Vec<String>>,
969 capabilities: Option<Vec<String>>,
970 opened_at_ms: u64,
971 ttl_ms: u64,
972 ) -> String {
973 let token = generate_cursor_token();
974 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
975 inner.insert(
976 token.clone(),
977 CursorEntry {
978 snapshot_lsn,
979 tenant: tenant.to_string(),
980 principal: principal.to_string(),
981 query: query.to_string(),
982 entity_types,
983 capabilities,
984 opened_at_ms,
985 ttl_ms,
986 cancel_token: CancelToken::new(),
987 cancelled: false,
988 },
989 );
990 token
991 }
992
993 pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
998 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
999 inner.get(token).map(|e| e.cancel_token.clone())
1000 }
1001
1002 pub fn cancel(
1013 &self,
1014 token: &str,
1015 tenant: &str,
1016 principal: &str,
1017 ) -> Result<CancelToken, CursorReject> {
1018 let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1019 let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
1020 if entry.tenant != tenant || entry.principal != principal {
1021 return Err(CursorReject::NotFound);
1022 }
1023 entry.cancelled = true;
1024 entry.cancel_token.cancel();
1025 Ok(entry.cancel_token.clone())
1026 }
1027
1028 pub fn resolve(
1036 &self,
1037 token: &str,
1038 tenant: &str,
1039 principal: &str,
1040 now_ms: u64,
1041 ) -> Result<CursorResume, CursorReject> {
1042 let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
1043 let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
1044 if entry.tenant != tenant || entry.principal != principal {
1048 return Err(CursorReject::NotFound);
1049 }
1050 if entry.cancelled {
1054 return Err(CursorReject::Cancelled);
1055 }
1056 if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
1057 return Err(CursorReject::Expired);
1058 }
1059 Ok(CursorResume {
1060 snapshot_lsn: entry.snapshot_lsn,
1061 query: entry.query.clone(),
1062 entity_types: entry.entity_types.clone(),
1063 capabilities: entry.capabilities.clone(),
1064 expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
1065 })
1066 }
1067
1068 #[doc(hidden)]
1070 pub fn len(&self) -> usize {
1071 self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
1072 }
1073
1074 #[doc(hidden)]
1075 pub fn is_empty(&self) -> bool {
1076 self.len() == 0
1077 }
1078}
1079
1080#[derive(Debug, Default)]
1086pub struct PrefixHasher {
1087 inner: Option<sha2::Sha256>,
1088 rows: u64,
1089}
1090
1091impl PrefixHasher {
1092 pub fn new() -> Self {
1093 use sha2::Digest;
1094 Self {
1095 inner: Some(sha2::Sha256::new()),
1096 rows: 0,
1097 }
1098 }
1099
1100 pub fn update(&mut self, line: &[u8]) {
1101 use sha2::Digest;
1102 if let Some(h) = self.inner.as_mut() {
1103 h.update(line);
1104 }
1105 self.rows += 1;
1106 }
1107
1108 pub fn rows(&self) -> u64 {
1109 self.rows
1110 }
1111
1112 pub fn finalize_hex(mut self) -> String {
1115 use sha2::Digest;
1116 let hasher = self
1117 .inner
1118 .take()
1119 .expect("PrefixHasher::finalize_hex called twice");
1120 let digest = hasher.finalize();
1121 let mut out = String::with_capacity(64);
1122 for b in digest.iter() {
1123 out.push_str(&format!("{b:02x}"));
1124 }
1125 out
1126 }
1127}
1128
1129pub fn audit_stream_opened(
1135 runtime: &RedDBRuntime,
1136 lease_handle: &str,
1137 principal: &str,
1138 snapshot_lsn: u64,
1139 query_hash: &str,
1140) {
1141 use crate::json::{Map, Value as JsonValue};
1142 let mut detail = Map::new();
1143 detail.insert(
1144 "lease_handle".to_string(),
1145 JsonValue::String(lease_handle.to_string()),
1146 );
1147 detail.insert(
1148 "snapshot_lsn".to_string(),
1149 JsonValue::Number(snapshot_lsn as f64),
1150 );
1151 detail.insert(
1152 "query_hash".to_string(),
1153 JsonValue::String(query_hash.to_string()),
1154 );
1155 let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
1156 .principal(principal)
1157 .resource(lease_handle.to_string())
1158 .outcome(crate::runtime::audit_log::Outcome::Success)
1159 .detail(JsonValue::Object(detail))
1160 .build();
1161 runtime.audit_log().record_event(event);
1162}
1163
1164pub fn audit_stream_closed(
1165 runtime: &RedDBRuntime,
1166 lease_handle: &str,
1167 principal: &str,
1168 reason: CloseReason,
1169 row_count: u64,
1170 bytes_written: u64,
1171) {
1172 use crate::json::{Map, Value as JsonValue};
1173 let mut stats = Map::new();
1174 stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
1175 stats.insert(
1176 "bytes_written".to_string(),
1177 JsonValue::Number(bytes_written as f64),
1178 );
1179 let mut detail = Map::new();
1180 detail.insert(
1181 "lease_handle".to_string(),
1182 JsonValue::String(lease_handle.to_string()),
1183 );
1184 detail.insert(
1185 "reason".to_string(),
1186 JsonValue::String(reason.as_str().to_string()),
1187 );
1188 detail.insert("stats".to_string(), JsonValue::Object(stats));
1189 let outcome = match reason {
1190 CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
1191 CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
1192 _ => crate::runtime::audit_log::Outcome::Error,
1193 };
1194 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1195 .principal(principal)
1196 .resource(lease_handle.to_string())
1197 .outcome(outcome)
1198 .detail(JsonValue::Object(detail))
1199 .build();
1200 runtime.audit_log().record_event(event);
1201}
1202
1203pub fn audit_token_expired_during_lease(
1204 runtime: &RedDBRuntime,
1205 lease_handle: &str,
1206 principal: &str,
1207 token_expiry_ms: u64,
1208) {
1209 use crate::json::{Map, Value as JsonValue};
1210 let mut detail = Map::new();
1211 detail.insert(
1212 "lease_handle".to_string(),
1213 JsonValue::String(lease_handle.to_string()),
1214 );
1215 detail.insert(
1216 "token_expiry".to_string(),
1217 JsonValue::Number(token_expiry_ms as f64),
1218 );
1219 detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
1220 let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
1221 .principal(principal)
1222 .resource(lease_handle.to_string())
1223 .outcome(crate::runtime::audit_log::Outcome::Success)
1224 .detail(JsonValue::Object(detail))
1225 .build();
1226 runtime.audit_log().record_event(event);
1227}
1228
1229pub fn audit_stream_capacity_refused(
1235 runtime: &RedDBRuntime,
1236 principal: &str,
1237 code: &str,
1238 limit: usize,
1239 current: usize,
1240) {
1241 use crate::json::{Map, Value as JsonValue};
1242 let mut detail = Map::new();
1243 detail.insert(
1244 "reason".to_string(),
1245 JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
1246 );
1247 detail.insert("code".to_string(), JsonValue::String(code.to_string()));
1248 detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
1249 detail.insert("current".to_string(), JsonValue::Number(current as f64));
1250 let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
1251 .principal(principal)
1252 .outcome(crate::runtime::audit_log::Outcome::Denied)
1253 .detail(JsonValue::Object(detail))
1254 .build();
1255 runtime.audit_log().record_event(event);
1256}
1257
1258pub fn system_clock() -> Arc<dyn Clock> {
1261 static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
1262 Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267 use super::*;
1268
1269 #[test]
1270 fn open_stream_refuses_when_session_has_active_transaction() {
1271 let clock = FakeClock::new(0);
1272 let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
1273 assert_eq!(err, OpenStreamError::TransactionActive);
1274 assert_eq!(err.code(), "stream_in_transaction_unsupported");
1275 }
1276
1277 #[test]
1278 fn open_stream_succeeds_when_session_is_autocommit() {
1279 let clock = FakeClock::new(1_700_000_000_000);
1280 let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
1281 assert_eq!(lease.snapshot_lsn, 123);
1282 assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
1283 assert!(lease.id >= 1);
1284 }
1285
1286 #[test]
1287 fn lease_ids_are_unique_and_monotonic() {
1288 let clock = FakeClock::new(0);
1289 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1290 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1291 assert!(b.id > a.id);
1292 }
1293
1294 #[test]
1295 fn snapshot_expired_uses_injected_clock_and_ttl() {
1296 let clock = FakeClock::new(0);
1299 let mut config = StreamConfig::DEFAULT;
1300 config.snapshot_ttl_ms = 5_000;
1301 let lease = open_stream(config, 0, false, &clock).unwrap();
1302
1303 assert!(!lease.snapshot_expired(clock.now_ms()));
1304 clock.advance(4_999);
1305 assert!(!lease.snapshot_expired(clock.now_ms()));
1306 clock.advance(1);
1307 assert!(lease.snapshot_expired(clock.now_ms()));
1308 }
1309
1310 #[test]
1311 fn stream_config_loads_defaults_when_kv_is_empty() {
1312 let cfg = StreamConfig::DEFAULT;
1314 assert_eq!(cfg.snapshot_ttl_ms, 60_000);
1315 assert_eq!(cfg.chunk_default_pages, 4);
1316 assert_eq!(cfg.chunk_min_pages, 1);
1317 assert_eq!(cfg.chunk_max_pages, 64);
1318 assert_eq!(cfg.chunk_max_rows, 1000);
1319 assert_eq!(cfg.chunk_max_latency_ms, 50);
1320 assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
1321 }
1322
1323 #[test]
1324 fn stream_config_normalize_clamps_inconsistent_inputs() {
1325 let mut cfg = StreamConfig {
1326 snapshot_ttl_ms: 1,
1327 chunk_default_pages: 100,
1328 chunk_min_pages: 0,
1329 chunk_max_pages: 8,
1330 chunk_max_rows: 0,
1331 chunk_max_latency_ms: 1,
1332 max_global_streams: 0,
1333 max_per_principal_streams: 0,
1334 default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
1335 };
1336 cfg.normalize();
1337 assert_eq!(cfg.chunk_min_pages, 1);
1338 assert_eq!(cfg.chunk_max_pages, 8);
1339 assert_eq!(cfg.chunk_default_pages, 8); assert!(cfg.chunk_max_rows >= 1);
1341 assert!(cfg.max_global_streams >= 1);
1342 assert!(cfg.max_per_principal_streams >= 1);
1343 }
1344
1345 struct CapturingSink {
1350 chunks: std::cell::RefCell<Vec<Vec<u8>>>,
1351 }
1352 impl CapturingSink {
1353 fn new() -> Self {
1354 Self {
1355 chunks: std::cell::RefCell::new(Vec::new()),
1356 }
1357 }
1358 fn len(&self) -> usize {
1359 self.chunks.borrow().len()
1360 }
1361 fn last_len(&self) -> Option<usize> {
1362 self.chunks.borrow().last().map(|c| c.len())
1363 }
1364 }
1365
1366 fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1367 move |bytes: &[u8]| {
1368 sink.chunks.borrow_mut().push(bytes.to_vec());
1369 Ok(())
1370 }
1371 }
1372
1373 #[test]
1374 fn chunk_producer_flushes_on_byte_cap() {
1375 let clock = FakeClock::new(0);
1376 let cfg = StreamConfig {
1377 chunk_default_pages: 1, chunk_min_pages: 1,
1379 chunk_max_pages: 1,
1380 chunk_max_rows: 1_000_000,
1381 chunk_max_latency_ms: 1_000_000,
1382 ..StreamConfig::DEFAULT
1383 };
1384 let sink = CapturingSink::new();
1385 let mut producer = ChunkProducer::new(&cfg, &clock);
1386 let mut flush = capture(&sink);
1387
1388 producer
1389 .push_line(&vec![b'x'; 8 * 1024], &mut flush)
1390 .unwrap();
1391 assert_eq!(sink.len(), 0);
1392
1393 let triggered = producer
1394 .push_line(&vec![b'y'; 8 * 1024], &mut flush)
1395 .unwrap();
1396 assert!(triggered);
1397 assert_eq!(sink.len(), 1);
1398 assert_eq!(sink.last_len(), Some(16 * 1024));
1399 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
1400 }
1401
1402 #[test]
1403 fn chunk_producer_flushes_on_row_cap() {
1404 let clock = FakeClock::new(0);
1405 let cfg = StreamConfig {
1406 chunk_default_pages: 4, chunk_min_pages: 1,
1408 chunk_max_pages: 64,
1409 chunk_max_rows: 3,
1410 chunk_max_latency_ms: 1_000_000,
1411 ..StreamConfig::DEFAULT
1412 };
1413 let sink = CapturingSink::new();
1414 let mut producer = ChunkProducer::new(&cfg, &clock);
1415 let mut flush = capture(&sink);
1416 let row = b"{\"row\":{}}\n";
1417 producer.push_line(row, &mut flush).unwrap();
1418 producer.push_line(row, &mut flush).unwrap();
1419 assert_eq!(sink.len(), 0);
1420 let triggered = producer.push_line(row, &mut flush).unwrap();
1421 assert!(triggered);
1422 assert_eq!(sink.len(), 1);
1423 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
1424 }
1425
1426 #[test]
1427 fn chunk_producer_flushes_on_latency_cap() {
1428 let clock = FakeClock::new(0);
1429 let cfg = StreamConfig {
1430 chunk_default_pages: 4,
1431 chunk_min_pages: 1,
1432 chunk_max_pages: 64,
1433 chunk_max_rows: 1_000_000,
1434 chunk_max_latency_ms: 50,
1435 ..StreamConfig::DEFAULT
1436 };
1437 let sink = CapturingSink::new();
1438 let mut producer = ChunkProducer::new(&cfg, &clock);
1439 let mut flush = capture(&sink);
1440 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1441 assert_eq!(sink.len(), 0);
1442 clock.advance(60);
1443 let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1444 assert!(triggered);
1445 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
1446 }
1447
1448 #[test]
1449 fn chunk_producer_finish_emits_terminal_flush() {
1450 let clock = FakeClock::new(0);
1451 let cfg = StreamConfig::DEFAULT;
1452 let sink = CapturingSink::new();
1453 let mut producer = ChunkProducer::new(&cfg, &clock);
1454 let mut flush = capture(&sink);
1455 producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
1456 producer.finish(&mut flush).unwrap();
1457 assert_eq!(sink.len(), 1);
1458 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
1459 }
1460
1461 #[test]
1462 fn write_chunked_helpers_produce_well_formed_chunks() {
1463 let mut buf: Vec<u8> = Vec::new();
1464 write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
1465 write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
1466 write_chunked_terminator(&mut buf).unwrap();
1467 let text = String::from_utf8(buf).unwrap();
1468 assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
1469 assert!(text.contains("Transfer-Encoding: chunked\r\n"));
1470 assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
1472 assert!(text.ends_with("0\r\n\r\n"));
1473 }
1474
1475 #[test]
1478 fn capacity_registry_global_exhausted_returns_structured_error() {
1479 let reg = StreamCapacityRegistry::new();
1480 let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
1481 let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
1482 let err = reg.try_acquire("alice", 2, 32).unwrap_err();
1483 assert_eq!(
1484 err,
1485 AcquireError::GlobalExhausted {
1486 limit: 2,
1487 current: 2,
1488 }
1489 );
1490 assert_eq!(err.code(), "server_stream_capacity_exhausted");
1491 }
1492
1493 #[test]
1494 fn capacity_registry_per_principal_exhausted_independent_of_global() {
1495 let reg = StreamCapacityRegistry::new();
1499 let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
1500 let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
1501 let err = reg.try_acquire("alice", 100, 2).unwrap_err();
1502 assert_eq!(
1503 err,
1504 AcquireError::PrincipalExhausted {
1505 principal: "alice".to_string(),
1506 limit: 2,
1507 current: 2,
1508 }
1509 );
1510 assert_eq!(err.code(), "principal_stream_quota_exhausted");
1511
1512 let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
1514 let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
1515 }
1516
1517 #[test]
1518 fn capacity_registry_release_frees_both_counters() {
1519 let reg = StreamCapacityRegistry::new();
1521 let g1 = reg.try_acquire("alice", 1, 1).unwrap();
1522 assert!(reg.try_acquire("alice", 1, 1).is_err());
1523 drop(g1);
1524 let (global, per_principal) = reg.snapshot();
1525 assert_eq!(global, 0);
1526 assert!(per_principal.is_empty());
1527 let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
1529 }
1530
1531 #[test]
1532 fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
1533 use std::sync::atomic::{AtomicUsize, Ordering};
1539
1540 const THREADS: usize = 16;
1541 const ITERS: usize = 200;
1542 const CAP_GLOBAL: usize = 4;
1543 const CAP_PER_PRINCIPAL: usize = 4;
1544
1545 let reg = StreamCapacityRegistry::new();
1546 let observed_max = Arc::new(AtomicUsize::new(0));
1547 let mut handles = Vec::new();
1548 for tid in 0..THREADS {
1549 let reg = Arc::clone(®);
1550 let observed_max = Arc::clone(&observed_max);
1551 let principal = format!("p{}", tid % 2);
1554 handles.push(std::thread::spawn(move || {
1555 for _ in 0..ITERS {
1556 if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
1557 let (live, _) = reg.snapshot();
1558 observed_max.fetch_max(live, Ordering::SeqCst);
1559 std::thread::yield_now();
1562 drop(guard);
1563 }
1564 }
1565 }));
1566 }
1567 for h in handles {
1568 h.join().unwrap();
1569 }
1570 let (global_after, per_principal_after) = reg.snapshot();
1571 assert_eq!(global_after, 0, "global counter leaked");
1572 assert!(
1573 per_principal_after.is_empty(),
1574 "per-principal map leaked: {per_principal_after:?}"
1575 );
1576 assert!(
1577 observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
1578 "global cap was breached: observed {} > {}",
1579 observed_max.load(Ordering::SeqCst),
1580 CAP_GLOBAL
1581 );
1582 }
1583
1584 #[test]
1587 fn assess_resumability_accepts_plain_select() {
1588 assert!(assess_resumability("SELECT id, name FROM t"));
1589 assert!(assess_resumability("select * from t where id > 5"));
1590 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
1591 assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
1592 assert!(assess_resumability(
1593 "SELECT a FROM t ORDER BY rid ASC LIMIT 10"
1594 ));
1595 }
1596
1597 #[test]
1598 fn assess_resumability_rejects_aggregates_and_unordered() {
1599 assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
1600 assert!(!assess_resumability("SELECT SUM(x) FROM t"));
1601 assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
1602 assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
1603 assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
1604 assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
1605 assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
1606 assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
1607 assert!(!assess_resumability(
1608 "SELECT a FROM t JOIN u ON t.id = u.id"
1609 ));
1610 }
1611
1612 #[test]
1613 fn lease_registry_records_and_expires_against_ttl() {
1614 let reg = LeaseRegistry::new();
1615 reg.record(42, 1_000, 5_000);
1616 assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
1617 assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
1618 assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
1619 assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
1620 }
1621
1622 #[test]
1623 fn prefix_hasher_is_order_sensitive_and_deterministic() {
1624 let mut a = PrefixHasher::new();
1625 a.update(b"{\"row\":{\"id\":1}}");
1626 a.update(b"{\"row\":{\"id\":2}}");
1627 let hash_a = a.finalize_hex();
1628
1629 let mut b = PrefixHasher::new();
1630 b.update(b"{\"row\":{\"id\":1}}");
1631 b.update(b"{\"row\":{\"id\":2}}");
1632 let hash_b = b.finalize_hex();
1633 assert_eq!(hash_a, hash_b);
1634
1635 let mut c = PrefixHasher::new();
1636 c.update(b"{\"row\":{\"id\":2}}");
1637 c.update(b"{\"row\":{\"id\":1}}");
1638 assert_ne!(hash_a, c.finalize_hex());
1639 assert_eq!(hash_a.len(), 64);
1640 }
1641
1642 #[test]
1645 fn lease_handle_is_128_bit_hex_and_unique_per_open() {
1646 let clock = FakeClock::new(0);
1650 let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1651 let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
1652 assert_eq!(
1653 a.lease_handle.len(),
1654 LEASE_HANDLE_BYTES * 2,
1655 "handle must be 128 bits hex-encoded: {}",
1656 a.lease_handle
1657 );
1658 assert!(
1659 a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
1660 "handle must be hex: {}",
1661 a.lease_handle
1662 );
1663 assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
1664 assert!(b.id > a.id);
1667 }
1668
1669 #[test]
1670 fn generate_lease_handle_produces_high_entropy_distinct_values() {
1671 let mut seen = std::collections::HashSet::new();
1675 for _ in 0..1024 {
1676 assert!(
1677 seen.insert(generate_lease_handle()),
1678 "duplicate handle in CSPRNG sequence"
1679 );
1680 }
1681 }
1682
1683 #[test]
1684 fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
1685 let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
1690 assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
1691 }
1692
1693 #[test]
1694 fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
1695 assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
1696 assert_eq!(parse_jwt_exp_ms("only.two"), None);
1697 assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
1698 }
1699
1700 fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
1703 reg.register(
1704 42,
1705 "acme",
1706 "bearer:abc",
1707 "SELECT id FROM users ORDER BY rid",
1708 None,
1709 None,
1710 now_ms,
1711 1_000,
1712 )
1713 }
1714
1715 #[test]
1716 fn cursor_token_is_192_bit_hex_and_unique_per_register() {
1717 let reg = CursorRegistry::default();
1718 let a = register_default_cursor(®, 0);
1719 let b = register_default_cursor(®, 0);
1720 assert_eq!(
1721 a.len(),
1722 CURSOR_TOKEN_BYTES * 2,
1723 "token must be 192 bits hex-encoded: {a}"
1724 );
1725 assert!(
1726 a.chars().all(|c| c.is_ascii_hexdigit()),
1727 "token must be hex: {a}"
1728 );
1729 assert_ne!(a, b, "tokens must differ across registrations");
1730 assert_eq!(reg.len(), 2);
1731 }
1732
1733 #[test]
1734 fn cursor_resolves_for_owner_within_ttl() {
1735 let reg = CursorRegistry::default();
1737 let token = register_default_cursor(®, 1_000);
1738 let resume = reg
1739 .resolve(&token, "acme", "bearer:abc", 1_500)
1740 .expect("live cursor resolves for its owner");
1741 assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
1742 assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
1743 assert_eq!(resume.expires_at_ms, 2_000);
1744 }
1745
1746 #[test]
1747 fn cursor_rejects_after_ttl_for_owner() {
1748 let reg = CursorRegistry::default();
1751 let token = register_default_cursor(®, 0);
1752 assert_eq!(
1753 reg.resolve(&token, "acme", "bearer:abc", 1_000),
1754 Err(CursorReject::Expired),
1755 "TTL boundary is inclusive — cursor is dead at opened_at + ttl"
1756 );
1757 assert_eq!(
1758 reg.resolve(&token, "acme", "bearer:abc", 5_000),
1759 Err(CursorReject::Expired)
1760 );
1761 }
1762
1763 #[test]
1764 fn cursor_cross_tenant_is_masked_as_not_found() {
1765 let reg = CursorRegistry::default();
1769 let token = register_default_cursor(®, 0);
1770 assert_eq!(
1771 reg.resolve(&token, "evil-corp", "bearer:abc", 100),
1772 Err(CursorReject::NotFound),
1773 "cross-tenant resume must mask existence as NotFound"
1774 );
1775 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1777 }
1778
1779 #[test]
1780 fn cursor_cross_principal_is_masked_as_not_found() {
1781 let reg = CursorRegistry::default();
1784 let token = register_default_cursor(®, 0);
1785 assert_eq!(
1786 reg.resolve(&token, "acme", "bearer:other", 100),
1787 Err(CursorReject::NotFound),
1788 "cross-principal resume must mask existence as NotFound"
1789 );
1790 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1791 }
1792
1793 #[test]
1794 fn cursor_unknown_token_is_not_found() {
1795 let reg = CursorRegistry::default();
1796 assert!(reg.is_empty());
1797 assert_eq!(
1798 reg.resolve("deadbeef", "acme", "bearer:abc", 0),
1799 Err(CursorReject::NotFound)
1800 );
1801 }
1802
1803 #[test]
1804 fn cursor_scope_is_checked_before_expiry() {
1805 let reg = CursorRegistry::default();
1809 let token = register_default_cursor(®, 0);
1810 assert_eq!(
1811 reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
1812 Err(CursorReject::NotFound),
1813 "expired + wrong scope must mask as NotFound, not Expired"
1814 );
1815 }
1816
1817 #[test]
1820 fn cancel_tombstones_cursor_and_raises_token() {
1821 let reg = CursorRegistry::default();
1824 let token = register_default_cursor(®, 0);
1825 let live = reg
1826 .cancel_token_for(&token)
1827 .expect("freshly-minted cursor exposes its token");
1828 assert!(!live.is_cancelled(), "token starts un-cancelled");
1829
1830 let returned = reg
1831 .cancel(&token, "acme", "bearer:abc")
1832 .expect("owner cancels its own cursor");
1833 assert!(returned.is_cancelled(), "cancel raises the returned token");
1834 assert!(
1835 live.is_cancelled(),
1836 "the handler-held clone observes the cancel"
1837 );
1838 }
1839
1840 #[test]
1841 fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
1842 let reg = CursorRegistry::default();
1846 let token = register_default_cursor(®, 0);
1847 reg.cancel(&token, "acme", "bearer:abc")
1848 .expect("owner cancels");
1849 assert_eq!(
1850 reg.resolve(&token, "acme", "bearer:abc", 100),
1851 Err(CursorReject::Cancelled),
1852 "owner resuming a cancelled cursor sees Cancelled"
1853 );
1854 }
1855
1856 #[test]
1857 fn cancel_is_idempotent() {
1858 let reg = CursorRegistry::default();
1859 let token = register_default_cursor(®, 0);
1860 assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
1861 let second = reg
1863 .cancel(&token, "acme", "bearer:abc")
1864 .expect("re-cancel is a no-op success");
1865 assert!(second.is_cancelled());
1866 }
1867
1868 #[test]
1869 fn cancel_cross_tenant_is_masked_as_not_found() {
1870 let reg = CursorRegistry::default();
1873 let token = register_default_cursor(®, 0);
1874 assert!(
1875 matches!(
1876 reg.cancel(&token, "evil-corp", "bearer:abc"),
1877 Err(CursorReject::NotFound)
1878 ),
1879 "cross-tenant cancel must mask existence as NotFound"
1880 );
1881 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1883 }
1884
1885 #[test]
1886 fn cancel_cross_principal_is_masked_as_not_found() {
1887 let reg = CursorRegistry::default();
1888 let token = register_default_cursor(®, 0);
1889 assert!(
1890 matches!(
1891 reg.cancel(&token, "acme", "bearer:other"),
1892 Err(CursorReject::NotFound)
1893 ),
1894 "cross-principal cancel must mask existence as NotFound"
1895 );
1896 assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
1897 }
1898
1899 #[test]
1900 fn cancel_unknown_token_is_not_found() {
1901 let reg = CursorRegistry::default();
1902 assert!(matches!(
1903 reg.cancel("deadbeef", "acme", "bearer:abc"),
1904 Err(CursorReject::NotFound)
1905 ));
1906 }
1907
1908 #[test]
1909 fn generate_cursor_token_produces_high_entropy_distinct_values() {
1910 let mut seen = std::collections::HashSet::new();
1911 for _ in 0..1024 {
1912 assert!(
1913 seen.insert(generate_cursor_token()),
1914 "duplicate cursor token in CSPRNG sequence"
1915 );
1916 }
1917 }
1918
1919 #[test]
1920 fn close_reason_as_str_covers_every_state_transition() {
1921 for (variant, expected) in [
1924 (CloseReason::Ok, "ok"),
1925 (CloseReason::Cancelled, "cancelled"),
1926 (CloseReason::Error, "error"),
1927 (CloseReason::SnapshotExpired, "snapshot_expired"),
1928 (CloseReason::CapacityRefused, "capacity_refused"),
1929 (CloseReason::IntegrityFailed, "integrity_failed"),
1930 ] {
1931 assert_eq!(variant.as_str(), expected);
1932 }
1933 }
1934
1935 #[test]
1936 fn stream_config_defaults_carry_s2_caps() {
1937 assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
1938 assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
1939 }
1940
1941 struct SizeSink {
1946 sizes: std::cell::RefCell<Vec<usize>>,
1947 }
1948 impl SizeSink {
1949 fn new() -> Self {
1950 Self {
1951 sizes: std::cell::RefCell::new(Vec::new()),
1952 }
1953 }
1954 fn flushes(&self) -> usize {
1955 self.sizes.borrow().len()
1956 }
1957 fn max_chunk(&self) -> usize {
1958 self.sizes.borrow().iter().copied().max().unwrap_or(0)
1959 }
1960 }
1961 fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
1962 move |bytes: &[u8]| {
1963 sink.sizes.borrow_mut().push(bytes.len());
1964 Ok(())
1965 }
1966 }
1967
1968 #[test]
1969 fn drive_lines_streams_large_source_with_bounded_working_set() {
1970 let clock = FakeClock::new(0);
1977 let cfg = StreamConfig {
1978 chunk_default_pages: 1, chunk_min_pages: 1,
1980 chunk_max_pages: 1,
1981 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 1_000_000,
1983 ..StreamConfig::DEFAULT
1984 };
1985 let sink = SizeSink::new();
1986 let mut producer = ChunkProducer::new(&cfg, &clock);
1987 let mut flush = size_capture(&sink);
1988
1989 const N: u64 = 1_000_000;
1990 let source = 0..N;
1992 let consumed = producer
1993 .drive_lines(
1994 source,
1995 |i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
1996 &mut flush,
1997 )
1998 .unwrap();
1999 producer.finish(&mut flush).unwrap();
2000
2001 assert_eq!(consumed, N);
2002 assert_eq!(producer.total_rows(), N);
2003 assert!(
2005 sink.flushes() > 1000,
2006 "expected the source to stream across many chunks, saw {}",
2007 sink.flushes()
2008 );
2009 let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
2012 assert!(
2013 sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
2014 "chunk {} exceeded bounded working set {}",
2015 sink.max_chunk(),
2016 cfg.production_buffer_bytes() + max_line
2017 );
2018 }
2019
2020 #[test]
2021 fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
2022 let clock = FakeClock::new(0);
2028 let cfg = StreamConfig {
2029 chunk_default_pages: 64, chunk_min_pages: 1,
2031 chunk_max_pages: 64,
2032 chunk_max_rows: 1_000_000, chunk_max_latency_ms: 50,
2034 ..StreamConfig::DEFAULT
2035 };
2036 let sink = SizeSink::new();
2037 let mut producer = ChunkProducer::new(&cfg, &clock);
2038
2039 let mut first_flush_after: Option<u64> = None;
2043 let mut row = 0u64;
2044 while row < 1_000_000 {
2045 let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
2046 clock.advance(20);
2047 let mut flush = size_capture(&sink);
2048 let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
2049 row += 1;
2050 if flushed {
2051 first_flush_after = Some(row);
2052 break;
2053 }
2054 }
2055
2056 assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
2057 let rows_before_flush = first_flush_after.expect("a latency flush must occur");
2058 assert!(
2059 rows_before_flush <= 4,
2060 "first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
2061 );
2062 assert!(rows_before_flush < 1_000_000);
2064 }
2065
2066 #[test]
2067 fn drive_lines_parity_with_manual_push_line() {
2068 let clock = FakeClock::new(0);
2071 let cfg = StreamConfig::DEFAULT;
2072
2073 let lines: Vec<Vec<u8>> = (0..50)
2074 .map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
2075 .collect();
2076
2077 let driven = CapturingSink::new();
2078 {
2079 let mut p = ChunkProducer::new(&cfg, &clock);
2080 let mut flush = capture(&driven);
2081 p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
2082 .unwrap();
2083 p.finish(&mut flush).unwrap();
2084 }
2085
2086 let manual = CapturingSink::new();
2087 {
2088 let mut p = ChunkProducer::new(&cfg, &clock);
2089 let mut flush = capture(&manual);
2090 for l in &lines {
2091 p.push_line(l, &mut flush).unwrap();
2092 }
2093 p.finish(&mut flush).unwrap();
2094 }
2095
2096 assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
2097 }
2098}