use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::runtime::RedDBRuntime;
use crate::storage::query::engine::cancel::CancelToken;
use crate::storage::schema::types::Value;
const RED_CONFIG_COLLECTION: &str = "red_config";
pub const PAGE_SIZE: usize = 16 * 1024;
pub trait Clock: Send + Sync {
fn now_ms(&self) -> u64;
}
#[derive(Debug, Default)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now_ms(&self) -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
}
#[derive(Debug)]
pub struct FakeClock {
now_ms: AtomicU64,
}
impl FakeClock {
pub fn new(now_ms: u64) -> Self {
Self {
now_ms: AtomicU64::new(now_ms),
}
}
pub fn advance(&self, ms: u64) {
self.now_ms.fetch_add(ms, Ordering::SeqCst);
}
}
impl Clock for FakeClock {
fn now_ms(&self) -> u64 {
self.now_ms.load(Ordering::SeqCst)
}
}
#[derive(Debug, Clone, Copy)]
pub struct StreamConfig {
pub snapshot_ttl_ms: u64,
pub chunk_default_pages: usize,
pub chunk_min_pages: usize,
pub chunk_max_pages: usize,
pub chunk_max_rows: usize,
pub chunk_max_latency_ms: u64,
pub max_global_streams: usize,
pub max_per_principal_streams: usize,
pub default_verify: crate::runtime::integrity_tombstone::VerifyMode,
}
impl Default for StreamConfig {
fn default() -> Self {
Self::DEFAULT
}
}
impl StreamConfig {
pub const DEFAULT: StreamConfig = StreamConfig {
snapshot_ttl_ms: 60_000,
chunk_default_pages: 4,
chunk_min_pages: 1,
chunk_max_pages: 64,
chunk_max_rows: 1000,
chunk_max_latency_ms: 50,
max_global_streams: 256,
max_per_principal_streams: 32,
default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
};
pub fn load(runtime: &RedDBRuntime) -> Self {
let db = runtime.db();
let read_u64 = |key: &str| -> Option<u64> {
match db.get_kv(RED_CONFIG_COLLECTION, key) {
Some((Value::Integer(v), _)) if v >= 0 => Some(v as u64),
Some((Value::UnsignedInteger(v), _)) => Some(v),
Some((Value::Text(text), _)) => text.parse().ok(),
_ => None,
}
};
let mut cfg = Self::DEFAULT;
if let Some(v) = read_u64("stream.snapshot.ttl_ms") {
cfg.snapshot_ttl_ms = v;
}
if let Some(v) = read_u64("stream.chunk.default_pages") {
cfg.chunk_default_pages = v as usize;
}
if let Some(v) = read_u64("stream.chunk.min_pages") {
cfg.chunk_min_pages = v as usize;
}
if let Some(v) = read_u64("stream.chunk.max_pages") {
cfg.chunk_max_pages = v as usize;
}
if let Some(v) = read_u64("stream.chunk.max_rows") {
cfg.chunk_max_rows = v as usize;
}
if let Some(v) = read_u64("stream.chunk.max_latency_ms") {
cfg.chunk_max_latency_ms = v;
}
if let Some(v) = read_u64("stream.max_global") {
cfg.max_global_streams = v as usize;
}
if let Some(v) = read_u64("stream.max_per_principal") {
cfg.max_per_principal_streams = v as usize;
}
if let Some((Value::Text(text), _)) =
db.get_kv(RED_CONFIG_COLLECTION, "stream.integrity.default_verify")
{
cfg.default_verify = crate::runtime::integrity_tombstone::VerifyMode::parse(&text);
}
cfg.normalize();
cfg
}
fn normalize(&mut self) {
if self.chunk_min_pages == 0 {
self.chunk_min_pages = 1;
}
if self.chunk_max_pages < self.chunk_min_pages {
self.chunk_max_pages = self.chunk_min_pages;
}
if self.chunk_default_pages < self.chunk_min_pages {
self.chunk_default_pages = self.chunk_min_pages;
}
if self.chunk_default_pages > self.chunk_max_pages {
self.chunk_default_pages = self.chunk_max_pages;
}
if self.chunk_max_rows == 0 {
self.chunk_max_rows = 1;
}
if self.max_global_streams == 0 {
self.max_global_streams = 1;
}
if self.max_per_principal_streams == 0 {
self.max_per_principal_streams = 1;
}
}
pub fn production_buffer_bytes(&self) -> usize {
self.chunk_default_pages.saturating_mul(PAGE_SIZE)
}
}
static NEXT_LEASE_ID: AtomicU64 = AtomicU64::new(1);
pub const LEASE_HANDLE_BYTES: usize = 16;
pub fn generate_lease_handle() -> String {
let mut bytes = [0u8; LEASE_HANDLE_BYTES];
if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
let lo = NEXT_LEASE_ID.fetch_add(0, Ordering::SeqCst).to_le_bytes();
let now = crate::utils::now_unix_nanos().to_le_bytes();
bytes[..8].copy_from_slice(&lo);
bytes[8..].copy_from_slice(&now);
}
crate::utils::to_hex(&bytes)
}
pub fn parse_jwt_exp_ms(token: &str) -> Option<u64> {
let parts: Vec<&str> = token.split('.').collect();
if parts.len() != 3 {
return None;
}
let payload = base64url_decode_padded(parts[1])?;
let v: crate::json::Value = crate::json::from_slice(&payload).ok()?;
let exp_secs = v.get("exp").and_then(|n| n.as_f64())? as i64;
if exp_secs <= 0 {
return None;
}
Some((exp_secs as u64).saturating_mul(1000))
}
fn base64url_decode_padded(input: &str) -> Option<Vec<u8>> {
let mut s = String::with_capacity(input.len() + 4);
for ch in input.chars() {
match ch {
'-' => s.push('+'),
'_' => s.push('/'),
_ => s.push(ch),
}
}
while !s.len().is_multiple_of(4) {
s.push('=');
}
crate::wire::redwire::auth::base64_std_decode(&s)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CloseReason {
Ok,
Cancelled,
Error,
SnapshotExpired,
CapacityRefused,
IntegrityFailed,
}
impl CloseReason {
pub fn as_str(&self) -> &'static str {
match self {
CloseReason::Ok => "ok",
CloseReason::Cancelled => "cancelled",
CloseReason::Error => "error",
CloseReason::SnapshotExpired => "snapshot_expired",
CloseReason::CapacityRefused => "capacity_refused",
CloseReason::IntegrityFailed => "integrity_failed",
}
}
}
#[derive(Debug)]
pub struct StreamLease {
pub id: u64,
pub lease_handle: String,
pub snapshot_lsn: u64,
pub opened_at_ms: u64,
pub config: StreamConfig,
}
impl StreamLease {
pub fn snapshot_expired(&self, now_ms: u64) -> bool {
now_ms.saturating_sub(self.opened_at_ms) >= self.config.snapshot_ttl_ms
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum OpenStreamError {
TransactionActive,
}
impl OpenStreamError {
pub fn code(&self) -> &'static str {
match self {
OpenStreamError::TransactionActive => "stream_in_transaction_unsupported",
}
}
pub fn message(&self) -> &'static str {
match self {
OpenStreamError::TransactionActive => {
"cannot open output stream while a transaction is active on this session"
}
}
}
}
pub fn open_stream(
config: StreamConfig,
snapshot_lsn: u64,
in_transaction: bool,
clock: &dyn Clock,
) -> Result<StreamLease, OpenStreamError> {
if in_transaction {
return Err(OpenStreamError::TransactionActive);
}
Ok(StreamLease {
id: NEXT_LEASE_ID.fetch_add(1, Ordering::SeqCst),
lease_handle: generate_lease_handle(),
snapshot_lsn,
opened_at_ms: clock.now_ms(),
config,
})
}
pub struct ChunkProducer<'a> {
buf: Vec<u8>,
rows: usize,
window_started_ms: u64,
cap_bytes: usize,
cap_rows: usize,
cap_latency_ms: u64,
clock: &'a dyn Clock,
total_flushes: u64,
total_bytes: u64,
total_rows: u64,
last_flush_reason: Option<FlushReason>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
Byte,
Row,
Latency,
Terminal,
}
impl<'a> ChunkProducer<'a> {
pub fn new(config: &StreamConfig, clock: &'a dyn Clock) -> Self {
let cap_bytes = config.production_buffer_bytes();
Self {
buf: Vec::with_capacity(cap_bytes),
rows: 0,
window_started_ms: clock.now_ms(),
cap_bytes,
cap_rows: config.chunk_max_rows,
cap_latency_ms: config.chunk_max_latency_ms,
clock,
total_flushes: 0,
total_bytes: 0,
total_rows: 0,
last_flush_reason: None,
}
}
pub fn push_line<F>(&mut self, line: &[u8], flush: &mut F) -> std::io::Result<bool>
where
F: FnMut(&[u8]) -> std::io::Result<()>,
{
self.buf.extend_from_slice(line);
self.rows += 1;
self.total_rows += 1;
if self.buf.len() >= self.cap_bytes {
self.flush(flush, FlushReason::Byte)?;
return Ok(true);
}
if self.rows >= self.cap_rows {
self.flush(flush, FlushReason::Row)?;
return Ok(true);
}
let elapsed = self.clock.now_ms().saturating_sub(self.window_started_ms);
if elapsed >= self.cap_latency_ms {
self.flush(flush, FlushReason::Latency)?;
return Ok(true);
}
Ok(false)
}
pub fn drive_lines<S, R, Enc, F>(
&mut self,
source: S,
mut encode: Enc,
flush: &mut F,
) -> std::io::Result<u64>
where
S: IntoIterator<Item = R>,
Enc: FnMut(&R) -> Vec<u8>,
F: FnMut(&[u8]) -> std::io::Result<()>,
{
let mut count = 0u64;
for record in source {
let line = encode(&record);
self.push_line(&line, flush)?;
count += 1;
}
Ok(count)
}
pub fn finish<F>(&mut self, flush: &mut F) -> std::io::Result<()>
where
F: FnMut(&[u8]) -> std::io::Result<()>,
{
if !self.buf.is_empty() {
self.flush(flush, FlushReason::Terminal)?;
}
Ok(())
}
fn flush<F>(&mut self, flush: &mut F, reason: FlushReason) -> std::io::Result<()>
where
F: FnMut(&[u8]) -> std::io::Result<()>,
{
flush(&self.buf)?;
self.total_bytes += self.buf.len() as u64;
self.total_flushes += 1;
self.last_flush_reason = Some(reason);
self.buf.clear();
self.rows = 0;
self.window_started_ms = self.clock.now_ms();
Ok(())
}
pub fn total_flushes(&self) -> u64 {
self.total_flushes
}
pub fn total_bytes(&self) -> u64 {
self.total_bytes
}
pub fn total_rows(&self) -> u64 {
self.total_rows
}
pub fn last_flush_reason(&self) -> Option<FlushReason> {
self.last_flush_reason
}
}
pub fn write_chunked_response_header<W: std::io::Write>(
writer: &mut W,
status: u16,
content_type: &str,
) -> std::io::Result<()> {
let header = format!(
"HTTP/1.1 {} {}\r\nContent-Type: {}\r\nTransfer-Encoding: chunked\r\nCache-Control: no-cache\r\nConnection: close\r\n\
Access-Control-Allow-Origin: *\r\n\
Access-Control-Allow-Methods: GET, POST, PUT, PATCH, DELETE, OPTIONS\r\n\
Access-Control-Allow-Headers: *\r\n\
Access-Control-Max-Age: 86400\r\n\r\n",
status,
crate::server::transport::status_text(status),
content_type,
);
writer.write_all(header.as_bytes())?;
writer.flush()
}
pub fn write_chunk<W: std::io::Write>(writer: &mut W, bytes: &[u8]) -> std::io::Result<()> {
if bytes.is_empty() {
return Ok(());
}
let size = format!("{:x}\r\n", bytes.len());
writer.write_all(size.as_bytes())?;
writer.write_all(bytes)?;
writer.write_all(b"\r\n")?;
writer.flush()
}
pub fn write_chunked_terminator<W: std::io::Write>(writer: &mut W) -> std::io::Result<()> {
writer.write_all(b"0\r\n\r\n")?;
writer.flush()
}
#[derive(Debug, Default)]
pub struct StreamCapacityRegistry {
inner: Mutex<CapacityInner>,
}
#[derive(Debug, Default)]
struct CapacityInner {
global_count: usize,
per_principal: HashMap<String, usize>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AcquireError {
GlobalExhausted { limit: usize, current: usize },
PrincipalExhausted {
principal: String,
limit: usize,
current: usize,
},
}
impl AcquireError {
pub fn code(&self) -> &'static str {
match self {
AcquireError::GlobalExhausted { .. } => "server_stream_capacity_exhausted",
AcquireError::PrincipalExhausted { .. } => "principal_stream_quota_exhausted",
}
}
pub fn message(&self) -> String {
match self {
AcquireError::GlobalExhausted { limit, current } => {
format!("server stream capacity exhausted (limit {limit}, current {current})")
}
AcquireError::PrincipalExhausted {
principal,
limit,
current,
} => format!(
"principal {principal} stream quota exhausted (limit {limit}, current {current})"
),
}
}
}
impl StreamCapacityRegistry {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn try_acquire(
self: &Arc<Self>,
principal: &str,
max_global: usize,
max_per_principal: usize,
) -> Result<StreamCapacityGuard, AcquireError> {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if inner.global_count >= max_global {
return Err(AcquireError::GlobalExhausted {
limit: max_global,
current: inner.global_count,
});
}
let current = inner.per_principal.get(principal).copied().unwrap_or(0);
if current >= max_per_principal {
return Err(AcquireError::PrincipalExhausted {
principal: principal.to_string(),
limit: max_per_principal,
current,
});
}
inner.global_count += 1;
inner
.per_principal
.insert(principal.to_string(), current + 1);
Ok(StreamCapacityGuard {
registry: Arc::clone(self),
principal: principal.to_string(),
released: false,
})
}
fn release(&self, principal: &str) {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if inner.global_count > 0 {
inner.global_count -= 1;
}
if let Some(count) = inner.per_principal.get_mut(principal) {
if *count > 0 {
*count -= 1;
}
if *count == 0 {
inner.per_principal.remove(principal);
}
}
}
pub fn snapshot(&self) -> (usize, HashMap<String, usize>) {
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
(inner.global_count, inner.per_principal.clone())
}
}
#[must_use = "dropping the guard immediately releases the stream slot"]
#[derive(Debug)]
pub struct StreamCapacityGuard {
registry: Arc<StreamCapacityRegistry>,
principal: String,
released: bool,
}
impl StreamCapacityGuard {
pub fn principal(&self) -> &str {
&self.principal
}
}
impl Drop for StreamCapacityGuard {
fn drop(&mut self) {
if !self.released {
self.registry.release(&self.principal);
self.released = true;
}
}
}
pub fn assess_resumability(query: &str) -> bool {
let upper = query.to_uppercase();
let trimmed = upper.trim_start();
if !trimmed.starts_with("SELECT ") && !trimmed.starts_with("SELECT\n") {
return false;
}
const FORBIDDEN: &[&str] = &[
" GROUP BY ",
" HAVING ",
" DISTINCT ",
"DISTINCT ",
"COUNT(",
"SUM(",
"AVG(",
"MIN(",
"MAX(",
"ARRAY_AGG(",
"JSON_AGG(",
"OVER(",
" OVER (",
" JOIN ",
];
for kw in FORBIDDEN {
if upper.contains(kw) {
return false;
}
}
if let Some(idx) = upper.find("ORDER BY") {
let tail = &upper[idx + "ORDER BY".len()..];
let mut clause = tail.to_string();
if let Some(lim) = clause.find(" LIMIT ") {
clause.truncate(lim);
}
if let Some(semi) = clause.find(';') {
clause.truncate(semi);
}
let clause = clause.trim();
if !matches!(clause, "RID" | "RID ASC") {
return false;
}
}
true
}
#[derive(Debug, Default)]
pub struct LeaseRegistry {
inner: Mutex<HashMap<u64, LeaseEntry>>,
}
#[derive(Debug, Clone, Copy)]
struct LeaseEntry {
opened_at_ms: u64,
ttl_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LeaseLookup {
Unknown,
Expired,
Live,
}
impl LeaseRegistry {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
pub fn record(&self, snapshot_lsn: u64, opened_at_ms: u64, ttl_ms: u64) {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
inner.insert(
snapshot_lsn,
LeaseEntry {
opened_at_ms,
ttl_ms,
},
);
}
pub fn lookup(&self, snapshot_lsn: u64, now_ms: u64) -> LeaseLookup {
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
match inner.get(&snapshot_lsn) {
None => LeaseLookup::Unknown,
Some(entry) => {
if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
LeaseLookup::Expired
} else {
LeaseLookup::Live
}
}
}
}
#[doc(hidden)]
pub fn len(&self) -> usize {
self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
}
}
pub const CURSOR_TOKEN_BYTES: usize = 24;
static NEXT_CURSOR_SALT: AtomicU64 = AtomicU64::new(1);
pub fn generate_cursor_token() -> String {
let mut bytes = [0u8; CURSOR_TOKEN_BYTES];
if crate::crypto::os_random::fill_bytes(&mut bytes).is_err() {
let salt = NEXT_CURSOR_SALT
.fetch_add(1, Ordering::SeqCst)
.to_le_bytes();
let now = crate::utils::now_unix_nanos().to_le_bytes();
bytes[..8].copy_from_slice(&salt);
bytes[8..16].copy_from_slice(&now);
}
crate::utils::to_hex(&bytes)
}
#[derive(Debug, Clone)]
struct CursorEntry {
snapshot_lsn: u64,
tenant: String,
principal: String,
query: String,
entity_types: Option<Vec<String>>,
capabilities: Option<Vec<String>>,
opened_at_ms: u64,
ttl_ms: u64,
cancel_token: CancelToken,
cancelled: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CursorResume {
pub snapshot_lsn: u64,
pub query: String,
pub entity_types: Option<Vec<String>>,
pub capabilities: Option<Vec<String>>,
pub expires_at_ms: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CursorReject {
NotFound,
Expired,
Cancelled,
}
#[derive(Debug, Default)]
pub struct CursorRegistry {
inner: Mutex<HashMap<String, CursorEntry>>,
}
impl CursorRegistry {
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
#[allow(clippy::too_many_arguments)]
pub fn register(
&self,
snapshot_lsn: u64,
tenant: &str,
principal: &str,
query: &str,
entity_types: Option<Vec<String>>,
capabilities: Option<Vec<String>>,
opened_at_ms: u64,
ttl_ms: u64,
) -> String {
let token = generate_cursor_token();
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
inner.insert(
token.clone(),
CursorEntry {
snapshot_lsn,
tenant: tenant.to_string(),
principal: principal.to_string(),
query: query.to_string(),
entity_types,
capabilities,
opened_at_ms,
ttl_ms,
cancel_token: CancelToken::new(),
cancelled: false,
},
);
token
}
pub fn cancel_token_for(&self, token: &str) -> Option<CancelToken> {
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
inner.get(token).map(|e| e.cancel_token.clone())
}
pub fn cancel(
&self,
token: &str,
tenant: &str,
principal: &str,
) -> Result<CancelToken, CursorReject> {
let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
let entry = inner.get_mut(token).ok_or(CursorReject::NotFound)?;
if entry.tenant != tenant || entry.principal != principal {
return Err(CursorReject::NotFound);
}
entry.cancelled = true;
entry.cancel_token.cancel();
Ok(entry.cancel_token.clone())
}
pub fn resolve(
&self,
token: &str,
tenant: &str,
principal: &str,
now_ms: u64,
) -> Result<CursorResume, CursorReject> {
let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
let entry = inner.get(token).ok_or(CursorReject::NotFound)?;
if entry.tenant != tenant || entry.principal != principal {
return Err(CursorReject::NotFound);
}
if entry.cancelled {
return Err(CursorReject::Cancelled);
}
if now_ms.saturating_sub(entry.opened_at_ms) >= entry.ttl_ms {
return Err(CursorReject::Expired);
}
Ok(CursorResume {
snapshot_lsn: entry.snapshot_lsn,
query: entry.query.clone(),
entity_types: entry.entity_types.clone(),
capabilities: entry.capabilities.clone(),
expires_at_ms: entry.opened_at_ms.saturating_add(entry.ttl_ms),
})
}
#[doc(hidden)]
pub fn len(&self) -> usize {
self.inner.lock().unwrap_or_else(|e| e.into_inner()).len()
}
#[doc(hidden)]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[derive(Debug, Default)]
pub struct PrefixHasher {
inner: Option<sha2::Sha256>,
rows: u64,
}
impl PrefixHasher {
pub fn new() -> Self {
use sha2::Digest;
Self {
inner: Some(sha2::Sha256::new()),
rows: 0,
}
}
pub fn update(&mut self, line: &[u8]) {
use sha2::Digest;
if let Some(h) = self.inner.as_mut() {
h.update(line);
}
self.rows += 1;
}
pub fn rows(&self) -> u64 {
self.rows
}
pub fn finalize_hex(mut self) -> String {
use sha2::Digest;
let hasher = self
.inner
.take()
.expect("PrefixHasher::finalize_hex called twice");
let digest = hasher.finalize();
let mut out = String::with_capacity(64);
for b in digest.iter() {
out.push_str(&format!("{b:02x}"));
}
out
}
}
pub fn audit_stream_opened(
runtime: &RedDBRuntime,
lease_handle: &str,
principal: &str,
snapshot_lsn: u64,
query_hash: &str,
) {
use crate::json::{Map, Value as JsonValue};
let mut detail = Map::new();
detail.insert(
"lease_handle".to_string(),
JsonValue::String(lease_handle.to_string()),
);
detail.insert(
"snapshot_lsn".to_string(),
JsonValue::Number(snapshot_lsn as f64),
);
detail.insert(
"query_hash".to_string(),
JsonValue::String(query_hash.to_string()),
);
let event = crate::runtime::audit_log::AuditEvent::builder("stream.opened")
.principal(principal)
.resource(lease_handle.to_string())
.outcome(crate::runtime::audit_log::Outcome::Success)
.detail(JsonValue::Object(detail))
.build();
runtime.audit_log().record_event(event);
}
pub fn audit_stream_closed(
runtime: &RedDBRuntime,
lease_handle: &str,
principal: &str,
reason: CloseReason,
row_count: u64,
bytes_written: u64,
) {
use crate::json::{Map, Value as JsonValue};
let mut stats = Map::new();
stats.insert("row_count".to_string(), JsonValue::Number(row_count as f64));
stats.insert(
"bytes_written".to_string(),
JsonValue::Number(bytes_written as f64),
);
let mut detail = Map::new();
detail.insert(
"lease_handle".to_string(),
JsonValue::String(lease_handle.to_string()),
);
detail.insert(
"reason".to_string(),
JsonValue::String(reason.as_str().to_string()),
);
detail.insert("stats".to_string(), JsonValue::Object(stats));
let outcome = match reason {
CloseReason::Ok => crate::runtime::audit_log::Outcome::Success,
CloseReason::CapacityRefused => crate::runtime::audit_log::Outcome::Denied,
_ => crate::runtime::audit_log::Outcome::Error,
};
let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
.principal(principal)
.resource(lease_handle.to_string())
.outcome(outcome)
.detail(JsonValue::Object(detail))
.build();
runtime.audit_log().record_event(event);
}
pub fn audit_token_expired_during_lease(
runtime: &RedDBRuntime,
lease_handle: &str,
principal: &str,
token_expiry_ms: u64,
) {
use crate::json::{Map, Value as JsonValue};
let mut detail = Map::new();
detail.insert(
"lease_handle".to_string(),
JsonValue::String(lease_handle.to_string()),
);
detail.insert(
"token_expiry".to_string(),
JsonValue::Number(token_expiry_ms as f64),
);
detail.insert("lease_continued".to_string(), JsonValue::Bool(true));
let event = crate::runtime::audit_log::AuditEvent::builder("stream.token_expired_during_lease")
.principal(principal)
.resource(lease_handle.to_string())
.outcome(crate::runtime::audit_log::Outcome::Success)
.detail(JsonValue::Object(detail))
.build();
runtime.audit_log().record_event(event);
}
pub fn audit_stream_capacity_refused(
runtime: &RedDBRuntime,
principal: &str,
code: &str,
limit: usize,
current: usize,
) {
use crate::json::{Map, Value as JsonValue};
let mut detail = Map::new();
detail.insert(
"reason".to_string(),
JsonValue::String(CloseReason::CapacityRefused.as_str().to_string()),
);
detail.insert("code".to_string(), JsonValue::String(code.to_string()));
detail.insert("limit".to_string(), JsonValue::Number(limit as f64));
detail.insert("current".to_string(), JsonValue::Number(current as f64));
let event = crate::runtime::audit_log::AuditEvent::builder("stream.closed")
.principal(principal)
.outcome(crate::runtime::audit_log::Outcome::Denied)
.detail(JsonValue::Object(detail))
.build();
runtime.audit_log().record_event(event);
}
pub fn system_clock() -> Arc<dyn Clock> {
static INSTANCE: std::sync::OnceLock<Arc<dyn Clock>> = std::sync::OnceLock::new();
Arc::clone(INSTANCE.get_or_init(|| Arc::new(SystemClock)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn open_stream_refuses_when_session_has_active_transaction() {
let clock = FakeClock::new(0);
let err = open_stream(StreamConfig::DEFAULT, 42, true, &clock).unwrap_err();
assert_eq!(err, OpenStreamError::TransactionActive);
assert_eq!(err.code(), "stream_in_transaction_unsupported");
}
#[test]
fn open_stream_succeeds_when_session_is_autocommit() {
let clock = FakeClock::new(1_700_000_000_000);
let lease = open_stream(StreamConfig::DEFAULT, 123, false, &clock).unwrap();
assert_eq!(lease.snapshot_lsn, 123);
assert_eq!(lease.opened_at_ms, 1_700_000_000_000);
assert!(lease.id >= 1);
}
#[test]
fn lease_ids_are_unique_and_monotonic() {
let clock = FakeClock::new(0);
let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
assert!(b.id > a.id);
}
#[test]
fn snapshot_expired_uses_injected_clock_and_ttl() {
let clock = FakeClock::new(0);
let mut config = StreamConfig::DEFAULT;
config.snapshot_ttl_ms = 5_000;
let lease = open_stream(config, 0, false, &clock).unwrap();
assert!(!lease.snapshot_expired(clock.now_ms()));
clock.advance(4_999);
assert!(!lease.snapshot_expired(clock.now_ms()));
clock.advance(1);
assert!(lease.snapshot_expired(clock.now_ms()));
}
#[test]
fn stream_config_loads_defaults_when_kv_is_empty() {
let cfg = StreamConfig::DEFAULT;
assert_eq!(cfg.snapshot_ttl_ms, 60_000);
assert_eq!(cfg.chunk_default_pages, 4);
assert_eq!(cfg.chunk_min_pages, 1);
assert_eq!(cfg.chunk_max_pages, 64);
assert_eq!(cfg.chunk_max_rows, 1000);
assert_eq!(cfg.chunk_max_latency_ms, 50);
assert_eq!(cfg.production_buffer_bytes(), 64 * 1024);
}
#[test]
fn stream_config_normalize_clamps_inconsistent_inputs() {
let mut cfg = StreamConfig {
snapshot_ttl_ms: 1,
chunk_default_pages: 100,
chunk_min_pages: 0,
chunk_max_pages: 8,
chunk_max_rows: 0,
chunk_max_latency_ms: 1,
max_global_streams: 0,
max_per_principal_streams: 0,
default_verify: crate::runtime::integrity_tombstone::VerifyMode::None,
};
cfg.normalize();
assert_eq!(cfg.chunk_min_pages, 1);
assert_eq!(cfg.chunk_max_pages, 8);
assert_eq!(cfg.chunk_default_pages, 8); assert!(cfg.chunk_max_rows >= 1);
assert!(cfg.max_global_streams >= 1);
assert!(cfg.max_per_principal_streams >= 1);
}
struct CapturingSink {
chunks: std::cell::RefCell<Vec<Vec<u8>>>,
}
impl CapturingSink {
fn new() -> Self {
Self {
chunks: std::cell::RefCell::new(Vec::new()),
}
}
fn len(&self) -> usize {
self.chunks.borrow().len()
}
fn last_len(&self) -> Option<usize> {
self.chunks.borrow().last().map(|c| c.len())
}
}
fn capture<'a>(sink: &'a CapturingSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
move |bytes: &[u8]| {
sink.chunks.borrow_mut().push(bytes.to_vec());
Ok(())
}
}
#[test]
fn chunk_producer_flushes_on_byte_cap() {
let clock = FakeClock::new(0);
let cfg = StreamConfig {
chunk_default_pages: 1, chunk_min_pages: 1,
chunk_max_pages: 1,
chunk_max_rows: 1_000_000,
chunk_max_latency_ms: 1_000_000,
..StreamConfig::DEFAULT
};
let sink = CapturingSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&sink);
producer
.push_line(&vec![b'x'; 8 * 1024], &mut flush)
.unwrap();
assert_eq!(sink.len(), 0);
let triggered = producer
.push_line(&vec![b'y'; 8 * 1024], &mut flush)
.unwrap();
assert!(triggered);
assert_eq!(sink.len(), 1);
assert_eq!(sink.last_len(), Some(16 * 1024));
assert_eq!(producer.last_flush_reason(), Some(FlushReason::Byte));
}
#[test]
fn chunk_producer_flushes_on_row_cap() {
let clock = FakeClock::new(0);
let cfg = StreamConfig {
chunk_default_pages: 4, chunk_min_pages: 1,
chunk_max_pages: 64,
chunk_max_rows: 3,
chunk_max_latency_ms: 1_000_000,
..StreamConfig::DEFAULT
};
let sink = CapturingSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&sink);
let row = b"{\"row\":{}}\n";
producer.push_line(row, &mut flush).unwrap();
producer.push_line(row, &mut flush).unwrap();
assert_eq!(sink.len(), 0);
let triggered = producer.push_line(row, &mut flush).unwrap();
assert!(triggered);
assert_eq!(sink.len(), 1);
assert_eq!(producer.last_flush_reason(), Some(FlushReason::Row));
}
#[test]
fn chunk_producer_flushes_on_latency_cap() {
let clock = FakeClock::new(0);
let cfg = StreamConfig {
chunk_default_pages: 4,
chunk_min_pages: 1,
chunk_max_pages: 64,
chunk_max_rows: 1_000_000,
chunk_max_latency_ms: 50,
..StreamConfig::DEFAULT
};
let sink = CapturingSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&sink);
producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
assert_eq!(sink.len(), 0);
clock.advance(60);
let triggered = producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
assert!(triggered);
assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
}
#[test]
fn chunk_producer_finish_emits_terminal_flush() {
let clock = FakeClock::new(0);
let cfg = StreamConfig::DEFAULT;
let sink = CapturingSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&sink);
producer.push_line(b"{\"row\":{}}\n", &mut flush).unwrap();
producer.finish(&mut flush).unwrap();
assert_eq!(sink.len(), 1);
assert_eq!(producer.last_flush_reason(), Some(FlushReason::Terminal));
}
#[test]
fn write_chunked_helpers_produce_well_formed_chunks() {
let mut buf: Vec<u8> = Vec::new();
write_chunked_response_header(&mut buf, 200, "application/x-ndjson").unwrap();
write_chunk(&mut buf, b"{\"row\":{}}\n").unwrap();
write_chunked_terminator(&mut buf).unwrap();
let text = String::from_utf8(buf).unwrap();
assert!(text.starts_with("HTTP/1.1 200 OK\r\n"));
assert!(text.contains("Transfer-Encoding: chunked\r\n"));
assert!(text.contains("\r\nb\r\n{\"row\":{}}\n\r\n"));
assert!(text.ends_with("0\r\n\r\n"));
}
#[test]
fn capacity_registry_global_exhausted_returns_structured_error() {
let reg = StreamCapacityRegistry::new();
let _g1 = reg.try_acquire("alice", 2, 32).unwrap();
let _g2 = reg.try_acquire("alice", 2, 32).unwrap();
let err = reg.try_acquire("alice", 2, 32).unwrap_err();
assert_eq!(
err,
AcquireError::GlobalExhausted {
limit: 2,
current: 2,
}
);
assert_eq!(err.code(), "server_stream_capacity_exhausted");
}
#[test]
fn capacity_registry_per_principal_exhausted_independent_of_global() {
let reg = StreamCapacityRegistry::new();
let _a1 = reg.try_acquire("alice", 100, 2).unwrap();
let _a2 = reg.try_acquire("alice", 100, 2).unwrap();
let err = reg.try_acquire("alice", 100, 2).unwrap_err();
assert_eq!(
err,
AcquireError::PrincipalExhausted {
principal: "alice".to_string(),
limit: 2,
current: 2,
}
);
assert_eq!(err.code(), "principal_stream_quota_exhausted");
let _b1 = reg.try_acquire("bob", 100, 2).unwrap();
let _b2 = reg.try_acquire("bob", 100, 2).unwrap();
}
#[test]
fn capacity_registry_release_frees_both_counters() {
let reg = StreamCapacityRegistry::new();
let g1 = reg.try_acquire("alice", 1, 1).unwrap();
assert!(reg.try_acquire("alice", 1, 1).is_err());
drop(g1);
let (global, per_principal) = reg.snapshot();
assert_eq!(global, 0);
assert!(per_principal.is_empty());
let _g2 = reg.try_acquire("alice", 1, 1).unwrap();
}
#[test]
fn capacity_registry_concurrent_acquire_release_does_not_over_issue() {
use std::sync::atomic::{AtomicUsize, Ordering};
const THREADS: usize = 16;
const ITERS: usize = 200;
const CAP_GLOBAL: usize = 4;
const CAP_PER_PRINCIPAL: usize = 4;
let reg = StreamCapacityRegistry::new();
let observed_max = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for tid in 0..THREADS {
let reg = Arc::clone(®);
let observed_max = Arc::clone(&observed_max);
let principal = format!("p{}", tid % 2);
handles.push(std::thread::spawn(move || {
for _ in 0..ITERS {
if let Ok(guard) = reg.try_acquire(&principal, CAP_GLOBAL, CAP_PER_PRINCIPAL) {
let (live, _) = reg.snapshot();
observed_max.fetch_max(live, Ordering::SeqCst);
std::thread::yield_now();
drop(guard);
}
}
}));
}
for h in handles {
h.join().unwrap();
}
let (global_after, per_principal_after) = reg.snapshot();
assert_eq!(global_after, 0, "global counter leaked");
assert!(
per_principal_after.is_empty(),
"per-principal map leaked: {per_principal_after:?}"
);
assert!(
observed_max.load(Ordering::SeqCst) <= CAP_GLOBAL,
"global cap was breached: observed {} > {}",
observed_max.load(Ordering::SeqCst),
CAP_GLOBAL
);
}
#[test]
fn assess_resumability_accepts_plain_select() {
assert!(assess_resumability("SELECT id, name FROM t"));
assert!(assess_resumability("select * from t where id > 5"));
assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid"));
assert!(assess_resumability("SELECT a, b FROM t ORDER BY rid ASC"));
assert!(assess_resumability(
"SELECT a FROM t ORDER BY rid ASC LIMIT 10"
));
}
#[test]
fn assess_resumability_rejects_aggregates_and_unordered() {
assert!(!assess_resumability("SELECT COUNT(*) FROM t"));
assert!(!assess_resumability("SELECT SUM(x) FROM t"));
assert!(!assess_resumability("SELECT a, COUNT(b) FROM t GROUP BY a"));
assert!(!assess_resumability("SELECT DISTINCT a FROM t"));
assert!(!assess_resumability("SELECT a FROM t ORDER BY name"));
assert!(!assess_resumability("SELECT a FROM t ORDER BY rid DESC"));
assert!(!assess_resumability("SELECT a FROM t ORDER BY a, b"));
assert!(!assess_resumability("INSERT INTO t (a) VALUES (1)"));
assert!(!assess_resumability(
"SELECT a FROM t JOIN u ON t.id = u.id"
));
}
#[test]
fn lease_registry_records_and_expires_against_ttl() {
let reg = LeaseRegistry::new();
reg.record(42, 1_000, 5_000);
assert_eq!(reg.lookup(42, 1_000), LeaseLookup::Live);
assert_eq!(reg.lookup(42, 5_999), LeaseLookup::Live);
assert_eq!(reg.lookup(42, 6_000), LeaseLookup::Expired);
assert_eq!(reg.lookup(99, 1_000), LeaseLookup::Unknown);
}
#[test]
fn prefix_hasher_is_order_sensitive_and_deterministic() {
let mut a = PrefixHasher::new();
a.update(b"{\"row\":{\"id\":1}}");
a.update(b"{\"row\":{\"id\":2}}");
let hash_a = a.finalize_hex();
let mut b = PrefixHasher::new();
b.update(b"{\"row\":{\"id\":1}}");
b.update(b"{\"row\":{\"id\":2}}");
let hash_b = b.finalize_hex();
assert_eq!(hash_a, hash_b);
let mut c = PrefixHasher::new();
c.update(b"{\"row\":{\"id\":2}}");
c.update(b"{\"row\":{\"id\":1}}");
assert_ne!(hash_a, c.finalize_hex());
assert_eq!(hash_a.len(), 64);
}
#[test]
fn lease_handle_is_128_bit_hex_and_unique_per_open() {
let clock = FakeClock::new(0);
let a = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
let b = open_stream(StreamConfig::DEFAULT, 1, false, &clock).unwrap();
assert_eq!(
a.lease_handle.len(),
LEASE_HANDLE_BYTES * 2,
"handle must be 128 bits hex-encoded: {}",
a.lease_handle
);
assert!(
a.lease_handle.chars().all(|c| c.is_ascii_hexdigit()),
"handle must be hex: {}",
a.lease_handle
);
assert_ne!(a.lease_handle, b.lease_handle, "handles must differ");
assert!(b.id > a.id);
}
#[test]
fn generate_lease_handle_produces_high_entropy_distinct_values() {
let mut seen = std::collections::HashSet::new();
for _ in 0..1024 {
assert!(
seen.insert(generate_lease_handle()),
"duplicate handle in CSPRNG sequence"
);
}
}
#[test]
fn parse_jwt_exp_ms_extracts_seconds_to_ms() {
let token = "eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MDAwMDAwMDB9.sig";
assert_eq!(parse_jwt_exp_ms(token), Some(1_700_000_000_000));
}
#[test]
fn parse_jwt_exp_ms_returns_none_for_opaque_tokens() {
assert_eq!(parse_jwt_exp_ms("not-a-jwt"), None);
assert_eq!(parse_jwt_exp_ms("only.two"), None);
assert_eq!(parse_jwt_exp_ms("a.b.c"), None);
}
fn register_default_cursor(reg: &CursorRegistry, now_ms: u64) -> String {
reg.register(
42,
"acme",
"bearer:abc",
"SELECT id FROM users ORDER BY rid",
None,
None,
now_ms,
1_000,
)
}
#[test]
fn cursor_token_is_192_bit_hex_and_unique_per_register() {
let reg = CursorRegistry::default();
let a = register_default_cursor(®, 0);
let b = register_default_cursor(®, 0);
assert_eq!(
a.len(),
CURSOR_TOKEN_BYTES * 2,
"token must be 192 bits hex-encoded: {a}"
);
assert!(
a.chars().all(|c| c.is_ascii_hexdigit()),
"token must be hex: {a}"
);
assert_ne!(a, b, "tokens must differ across registrations");
assert_eq!(reg.len(), 2);
}
#[test]
fn cursor_resolves_for_owner_within_ttl() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 1_000);
let resume = reg
.resolve(&token, "acme", "bearer:abc", 1_500)
.expect("live cursor resolves for its owner");
assert_eq!(resume.snapshot_lsn, 42, "resume re-pins the same snapshot");
assert_eq!(resume.query, "SELECT id FROM users ORDER BY rid");
assert_eq!(resume.expires_at_ms, 2_000);
}
#[test]
fn cursor_rejects_after_ttl_for_owner() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert_eq!(
reg.resolve(&token, "acme", "bearer:abc", 1_000),
Err(CursorReject::Expired),
"TTL boundary is inclusive — cursor is dead at opened_at + ttl"
);
assert_eq!(
reg.resolve(&token, "acme", "bearer:abc", 5_000),
Err(CursorReject::Expired)
);
}
#[test]
fn cursor_cross_tenant_is_masked_as_not_found() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert_eq!(
reg.resolve(&token, "evil-corp", "bearer:abc", 100),
Err(CursorReject::NotFound),
"cross-tenant resume must mask existence as NotFound"
);
assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
}
#[test]
fn cursor_cross_principal_is_masked_as_not_found() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert_eq!(
reg.resolve(&token, "acme", "bearer:other", 100),
Err(CursorReject::NotFound),
"cross-principal resume must mask existence as NotFound"
);
assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
}
#[test]
fn cursor_unknown_token_is_not_found() {
let reg = CursorRegistry::default();
assert!(reg.is_empty());
assert_eq!(
reg.resolve("deadbeef", "acme", "bearer:abc", 0),
Err(CursorReject::NotFound)
);
}
#[test]
fn cursor_scope_is_checked_before_expiry() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert_eq!(
reg.resolve(&token, "evil-corp", "bearer:abc", 10_000),
Err(CursorReject::NotFound),
"expired + wrong scope must mask as NotFound, not Expired"
);
}
#[test]
fn cancel_tombstones_cursor_and_raises_token() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
let live = reg
.cancel_token_for(&token)
.expect("freshly-minted cursor exposes its token");
assert!(!live.is_cancelled(), "token starts un-cancelled");
let returned = reg
.cancel(&token, "acme", "bearer:abc")
.expect("owner cancels its own cursor");
assert!(returned.is_cancelled(), "cancel raises the returned token");
assert!(
live.is_cancelled(),
"the handler-held clone observes the cancel"
);
}
#[test]
fn cancelled_cursor_rejects_resume_with_cancelled_reason() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
reg.cancel(&token, "acme", "bearer:abc")
.expect("owner cancels");
assert_eq!(
reg.resolve(&token, "acme", "bearer:abc", 100),
Err(CursorReject::Cancelled),
"owner resuming a cancelled cursor sees Cancelled"
);
}
#[test]
fn cancel_is_idempotent() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert!(reg.cancel(&token, "acme", "bearer:abc").is_ok());
let second = reg
.cancel(&token, "acme", "bearer:abc")
.expect("re-cancel is a no-op success");
assert!(second.is_cancelled());
}
#[test]
fn cancel_cross_tenant_is_masked_as_not_found() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert!(
matches!(
reg.cancel(&token, "evil-corp", "bearer:abc"),
Err(CursorReject::NotFound)
),
"cross-tenant cancel must mask existence as NotFound"
);
assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
}
#[test]
fn cancel_cross_principal_is_masked_as_not_found() {
let reg = CursorRegistry::default();
let token = register_default_cursor(®, 0);
assert!(
matches!(
reg.cancel(&token, "acme", "bearer:other"),
Err(CursorReject::NotFound)
),
"cross-principal cancel must mask existence as NotFound"
);
assert!(reg.resolve(&token, "acme", "bearer:abc", 100).is_ok());
}
#[test]
fn cancel_unknown_token_is_not_found() {
let reg = CursorRegistry::default();
assert!(matches!(
reg.cancel("deadbeef", "acme", "bearer:abc"),
Err(CursorReject::NotFound)
));
}
#[test]
fn generate_cursor_token_produces_high_entropy_distinct_values() {
let mut seen = std::collections::HashSet::new();
for _ in 0..1024 {
assert!(
seen.insert(generate_cursor_token()),
"duplicate cursor token in CSPRNG sequence"
);
}
}
#[test]
fn close_reason_as_str_covers_every_state_transition() {
for (variant, expected) in [
(CloseReason::Ok, "ok"),
(CloseReason::Cancelled, "cancelled"),
(CloseReason::Error, "error"),
(CloseReason::SnapshotExpired, "snapshot_expired"),
(CloseReason::CapacityRefused, "capacity_refused"),
(CloseReason::IntegrityFailed, "integrity_failed"),
] {
assert_eq!(variant.as_str(), expected);
}
}
#[test]
fn stream_config_defaults_carry_s2_caps() {
assert_eq!(StreamConfig::DEFAULT.max_global_streams, 256);
assert_eq!(StreamConfig::DEFAULT.max_per_principal_streams, 32);
}
struct SizeSink {
sizes: std::cell::RefCell<Vec<usize>>,
}
impl SizeSink {
fn new() -> Self {
Self {
sizes: std::cell::RefCell::new(Vec::new()),
}
}
fn flushes(&self) -> usize {
self.sizes.borrow().len()
}
fn max_chunk(&self) -> usize {
self.sizes.borrow().iter().copied().max().unwrap_or(0)
}
}
fn size_capture<'a>(sink: &'a SizeSink) -> impl FnMut(&[u8]) -> std::io::Result<()> + 'a {
move |bytes: &[u8]| {
sink.sizes.borrow_mut().push(bytes.len());
Ok(())
}
}
#[test]
fn drive_lines_streams_large_source_with_bounded_working_set() {
let clock = FakeClock::new(0);
let cfg = StreamConfig {
chunk_default_pages: 1, chunk_min_pages: 1,
chunk_max_pages: 1,
chunk_max_rows: 1_000_000, chunk_max_latency_ms: 1_000_000,
..StreamConfig::DEFAULT
};
let sink = SizeSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut flush = size_capture(&sink);
const N: u64 = 1_000_000;
let source = 0..N;
let consumed = producer
.drive_lines(
source,
|i: &u64| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes(),
&mut flush,
)
.unwrap();
producer.finish(&mut flush).unwrap();
assert_eq!(consumed, N);
assert_eq!(producer.total_rows(), N);
assert!(
sink.flushes() > 1000,
"expected the source to stream across many chunks, saw {}",
sink.flushes()
);
let max_line = format!("{{\"row\":{{\"id\":{}}}}}\n", N - 1).len();
assert!(
sink.max_chunk() <= cfg.production_buffer_bytes() + max_line,
"chunk {} exceeded bounded working set {}",
sink.max_chunk(),
cfg.production_buffer_bytes() + max_line
);
}
#[test]
fn drive_lines_first_chunk_flushes_on_latency_before_source_drains() {
let clock = FakeClock::new(0);
let cfg = StreamConfig {
chunk_default_pages: 64, chunk_min_pages: 1,
chunk_max_pages: 64,
chunk_max_rows: 1_000_000, chunk_max_latency_ms: 50,
..StreamConfig::DEFAULT
};
let sink = SizeSink::new();
let mut producer = ChunkProducer::new(&cfg, &clock);
let mut first_flush_after: Option<u64> = None;
let mut row = 0u64;
while row < 1_000_000 {
let line = format!("{{\"row\":{{\"id\":{row}}}}}\n");
clock.advance(20);
let mut flush = size_capture(&sink);
let flushed = producer.push_line(line.as_bytes(), &mut flush).unwrap();
row += 1;
if flushed {
first_flush_after = Some(row);
break;
}
}
assert_eq!(producer.last_flush_reason(), Some(FlushReason::Latency));
let rows_before_flush = first_flush_after.expect("a latency flush must occur");
assert!(
rows_before_flush <= 4,
"first chunk flushed only after {rows_before_flush} rows; latency bound not honoured"
);
assert!(rows_before_flush < 1_000_000);
}
#[test]
fn drive_lines_parity_with_manual_push_line() {
let clock = FakeClock::new(0);
let cfg = StreamConfig::DEFAULT;
let lines: Vec<Vec<u8>> = (0..50)
.map(|i| format!("{{\"row\":{{\"id\":{i}}}}}\n").into_bytes())
.collect();
let driven = CapturingSink::new();
{
let mut p = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&driven);
p.drive_lines(lines.iter().cloned(), |l: &Vec<u8>| l.clone(), &mut flush)
.unwrap();
p.finish(&mut flush).unwrap();
}
let manual = CapturingSink::new();
{
let mut p = ChunkProducer::new(&cfg, &clock);
let mut flush = capture(&manual);
for l in &lines {
p.push_line(l, &mut flush).unwrap();
}
p.finish(&mut flush).unwrap();
}
assert_eq!(*driven.chunks.borrow(), *manual.chunks.borrow());
}
}