use super::*;
use std::cell::RefCell;
use std::rc::Rc;
pub(crate) const DEFAULT_HIGH_WATER_MARK: usize = 1024;
#[derive(Debug, Default)]
pub(crate) struct RowBufferArena {
free: Vec<Vec<UnifiedRecord>>,
}
impl RowBufferArena {
const MAX_BUFFERS: usize = 4;
const MAX_BUFFER_CAPACITY: usize = DEFAULT_HIGH_WATER_MARK * 4;
pub(crate) fn new() -> Self {
Self { free: Vec::new() }
}
pub(crate) fn lease(&mut self) -> Vec<UnifiedRecord> {
match self.free.pop() {
Some(mut buf) => {
buf.clear();
buf
}
None => Vec::new(),
}
}
pub(crate) fn recycle(&mut self, mut buf: Vec<UnifiedRecord>) {
if self.free.len() >= Self::MAX_BUFFERS || buf.capacity() > Self::MAX_BUFFER_CAPACITY {
return;
}
buf.clear();
self.free.push(buf);
}
pub(crate) fn reset(&mut self) {
self.free.clear();
}
#[cfg(test)]
pub(crate) fn pooled(&self) -> usize {
self.free.len()
}
}
#[derive(Debug, Default)]
pub(crate) struct RowChunk {
pub(crate) records: Vec<UnifiedRecord>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum StreamTerminal {
End { row_count: u64 },
Error { code: String, message: String },
}
fn terminal_error_code(err: &RedDBError) -> &'static str {
match err {
RedDBError::NotFound(_) => "not_found",
RedDBError::Query(_) => "query_error",
RedDBError::MaterializationLimitExceeded { .. } => "materialization_limit_exceeded",
_ => "internal_error",
}
}
pub(crate) struct RowStream {
columns: Vec<String>,
stats: crate::storage::query::unified::QueryStats,
pre_serialized_json: Option<String>,
high_water_mark: usize,
source: Box<dyn Iterator<Item = RedDBResult<UnifiedRecord>>>,
row_count: u64,
#[cfg_attr(not(test), allow(dead_code))]
peak_buffered: usize,
terminal: Option<StreamTerminal>,
arena: Option<Rc<RefCell<RowBufferArena>>>,
}
impl RowStream {
pub(crate) fn from_lazy(
columns: Vec<String>,
stats: crate::storage::query::unified::QueryStats,
high_water_mark: usize,
source: Box<dyn Iterator<Item = RedDBResult<UnifiedRecord>>>,
) -> Self {
Self {
columns,
stats,
pre_serialized_json: None,
high_water_mark: high_water_mark.max(1),
source,
row_count: 0,
peak_buffered: 0,
terminal: None,
arena: None,
}
}
pub(crate) fn from_unified(result: UnifiedResult, high_water_mark: usize) -> Self {
let UnifiedResult {
columns,
records,
stats,
pre_serialized_json,
} = result;
Self {
columns,
stats,
pre_serialized_json,
high_water_mark: high_water_mark.max(1),
source: Box::new(records.into_iter().map(Ok)),
row_count: 0,
peak_buffered: 0,
terminal: None,
arena: None,
}
}
pub(crate) fn with_arena(mut self, arena: Rc<RefCell<RowBufferArena>>) -> Self {
self.arena = Some(arena);
self
}
#[cfg(test)]
pub(crate) fn peak_buffered(&self) -> usize {
self.peak_buffered
}
#[cfg(test)]
pub(crate) fn terminal(&self) -> Option<&StreamTerminal> {
self.terminal.as_ref()
}
pub(crate) fn next_chunk(&mut self) -> Option<RowChunk> {
if self.terminal.is_some() {
return None;
}
let mut records: Vec<UnifiedRecord> = match &self.arena {
Some(arena) => arena.borrow_mut().lease(),
None => Vec::new(),
};
while records.len() < self.high_water_mark {
match self.source.next() {
Some(Ok(record)) => records.push(record),
Some(Err(err)) => {
self.terminal = Some(StreamTerminal::Error {
code: terminal_error_code(&err).to_string(),
message: err.to_string(),
});
break;
}
None => {
self.terminal = Some(StreamTerminal::End {
row_count: self.row_count + records.len() as u64,
});
break;
}
}
}
self.peak_buffered = self.peak_buffered.max(records.len());
self.row_count += records.len() as u64;
if records.is_empty() {
if let Some(arena) = &self.arena {
arena.borrow_mut().recycle(records);
}
if self.terminal.is_none() {
self.terminal = Some(StreamTerminal::End {
row_count: self.row_count,
});
}
return None;
}
Some(RowChunk { records })
}
pub(crate) fn collect_unified(mut self) -> RedDBResult<UnifiedResult> {
let mut records: Vec<UnifiedRecord> = Vec::new();
while let Some(chunk) = self.next_chunk() {
let mut buf = chunk.records;
records.append(&mut buf);
if let Some(arena) = &self.arena {
arena.borrow_mut().recycle(buf);
}
}
if let Some(StreamTerminal::Error { message, .. }) = self.terminal {
return Err(RedDBError::Query(message));
}
Ok(UnifiedResult {
columns: self.columns,
records,
stats: self.stats,
pre_serialized_json: self.pre_serialized_json,
})
}
pub(crate) fn collect_records(mut self) -> RedDBResult<Vec<UnifiedRecord>> {
let mut records: Vec<UnifiedRecord> = Vec::new();
while let Some(chunk) = self.next_chunk() {
let mut buf = chunk.records;
records.append(&mut buf);
if let Some(arena) = &self.arena {
arena.borrow_mut().recycle(buf);
}
}
if let Some(StreamTerminal::Error { message, .. }) = self.terminal {
return Err(RedDBError::Query(message));
}
Ok(records)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::schema::Value;
fn row(i: u64) -> UnifiedRecord {
let mut r = UnifiedRecord::new();
r.set("n", Value::UnsignedInteger(i));
r
}
#[test]
fn bounded_memory_peak_never_exceeds_high_water_mark() {
const N: u64 = 10_000;
const HWM: usize = 128;
let source = (0..N).map(|i| Ok(row(i)));
let mut stream =
RowStream::from_lazy(vec!["n".into()], Default::default(), HWM, Box::new(source));
let mut total = 0u64;
let mut chunks = 0u64;
while let Some(chunk) = stream.next_chunk() {
assert!(chunk.records.len() <= HWM, "chunk exceeded high-water mark");
assert!(
!chunk.records.is_empty(),
"next_chunk yields only non-empty chunks"
);
total += chunk.records.len() as u64;
chunks += 1;
}
assert_eq!(total, N);
assert!(
chunks >= N / HWM as u64,
"source must be split into many chunks"
);
assert_eq!(
stream.peak_buffered(),
HWM,
"peak resident set is exactly one chunk"
);
assert_eq!(
stream.terminal(),
Some(&StreamTerminal::End { row_count: N })
);
}
#[test]
fn mid_stream_error_surfaces_as_terminal_frame_after_delivering_rows() {
let mut yielded = 0;
let source = std::iter::from_fn(move || {
yielded += 1;
match yielded {
1..=3 => Some(Ok(row(yielded))),
4 => Some(Err(RedDBError::Query("boom".into()))),
_ => None,
}
});
let mut stream =
RowStream::from_lazy(vec!["n".into()], Default::default(), 2, Box::new(source));
let mut delivered = 0u64;
while let Some(chunk) = stream.next_chunk() {
delivered += chunk.records.len() as u64;
}
assert_eq!(
delivered, 3,
"rows before the error are delivered, not dropped"
);
match stream.terminal() {
Some(StreamTerminal::Error { code, message }) => {
assert_eq!(code, "query_error");
assert_eq!(message, "query error: boom");
}
other => panic!("expected error terminal, got {other:?}"),
}
}
#[test]
fn collect_unified_round_trips_a_materialised_result_verbatim() {
let original = UnifiedResult {
columns: vec!["a".into(), "b".into()],
records: vec![row(1), row(2), row(3)],
stats: Default::default(),
pre_serialized_json: Some("{\"fast\":true}".into()),
};
let stream = RowStream::from_unified(original.clone(), 2);
let collected = stream.collect_unified().expect("clean stream collects ok");
assert_eq!(collected.columns, original.columns);
assert_eq!(collected.records.len(), original.records.len());
assert_eq!(collected.pre_serialized_json, original.pre_serialized_json);
}
#[test]
fn collect_unified_propagates_a_source_error() {
let source = std::iter::from_fn({
let mut n = 0;
move || {
n += 1;
match n {
1 => Some(Ok(row(1))),
2 => Some(Err(RedDBError::NotFound("t".into()))),
_ => None,
}
}
});
let stream =
RowStream::from_lazy(vec!["n".into()], Default::default(), 8, Box::new(source));
assert!(stream.collect_unified().is_err());
}
#[test]
fn empty_source_closes_with_zero_row_end() {
let source = std::iter::empty::<RedDBResult<UnifiedRecord>>();
let mut stream = RowStream::from_lazy(Vec::new(), Default::default(), 16, Box::new(source));
assert!(stream.next_chunk().is_none());
assert_eq!(
stream.terminal(),
Some(&StreamTerminal::End { row_count: 0 })
);
}
#[test]
fn arena_lease_never_bleeds_prior_rows() {
let mut arena = RowBufferArena::new();
let mut buf = arena.lease();
assert!(buf.is_empty(), "fresh lease is empty");
buf.push(row(1));
buf.push(row(2));
arena.recycle(buf);
assert_eq!(arena.pooled(), 1, "recycled buffer is retained for reuse");
let reused = arena.lease();
assert!(
reused.is_empty(),
"a reused buffer carries no rows from the prior chunk"
);
assert_eq!(arena.pooled(), 0, "leasing drains the free-list slot");
}
#[test]
fn arena_reset_drops_retained_buffers() {
let mut arena = RowBufferArena::new();
let buf = arena.lease();
arena.recycle(buf);
let buf2 = arena.lease();
arena.recycle(buf2);
assert!(arena.pooled() >= 1);
arena.reset();
assert_eq!(arena.pooled(), 0, "reset clears the free-list");
}
#[test]
fn arena_backed_stream_reuses_buffer_and_matches_baseline() {
const N: u64 = 5_000;
const HWM: usize = 256;
let baseline = RowStream::from_lazy(
vec!["n".into()],
Default::default(),
HWM,
Box::new((0..N).map(|i| Ok(row(i)))),
)
.collect_records()
.expect("baseline collects");
let arena = Rc::new(RefCell::new(RowBufferArena::new()));
let arena_backed = RowStream::from_lazy(
vec!["n".into()],
Default::default(),
HWM,
Box::new((0..N).map(|i| Ok(row(i)))),
)
.with_arena(Rc::clone(&arena))
.collect_records()
.expect("arena-backed collects");
assert_eq!(
arena_backed.len(),
baseline.len(),
"row count identical to the per-request-allocation baseline"
);
for (a, b) in arena_backed.iter().zip(baseline.iter()) {
assert_eq!(
a.get("n"),
b.get("n"),
"each row is byte-identical to the baseline"
);
}
assert_eq!(
arena.borrow().pooled(),
1,
"one chunk buffer is recycled and reused across all chunk-fetches"
);
}
#[test]
fn arena_backed_from_unified_round_trips_verbatim() {
let original = UnifiedResult {
columns: vec!["a".into(), "b".into()],
records: vec![row(1), row(2), row(3)],
stats: Default::default(),
pre_serialized_json: Some("{\"fast\":true}".into()),
};
let arena = Rc::new(RefCell::new(RowBufferArena::new()));
let collected = RowStream::from_unified(original.clone(), 2)
.with_arena(arena)
.collect_unified()
.expect("arena-backed stream collects ok");
assert_eq!(collected.columns, original.columns);
assert_eq!(collected.records.len(), original.records.len());
assert_eq!(collected.pre_serialized_json, original.pre_serialized_json);
}
#[test]
fn real_table_scan_streams_with_bounded_resident_set() {
const N: usize = 500;
const HWM: usize = 64;
let rt = crate::RedDBRuntime::with_options(crate::RedDBOptions::in_memory())
.expect("runtime boots");
rt.execute_query("CREATE TABLE t (id INT, name TEXT)")
.expect("create table");
let values = (0..N)
.map(|i| format!("({i}, 'row{i}')"))
.collect::<Vec<_>>()
.join(", ");
rt.execute_query(&format!("INSERT INTO t (id, name) VALUES {values}"))
.expect("insert rows");
let db = rt.db();
let mut stream =
crate::runtime::record_search::stream_runtime_table_source_scan(db.as_ref(), "t", HWM)
.expect("stream scan builds");
let mut total = 0usize;
while let Some(chunk) = stream.next_chunk() {
assert!(chunk.records.len() <= HWM, "chunk exceeded high-water mark");
total += chunk.records.len();
}
assert_eq!(total, N, "every visible row is streamed");
assert!(
stream.peak_buffered() <= HWM,
"resident record set stayed bounded by the chunk size, not N"
);
assert_eq!(
stream.terminal(),
Some(&StreamTerminal::End {
row_count: N as u64
})
);
}
}