use crate::{Condition, RelationalEngine, Result, Row};
pub struct StreamingCursor<'a> {
engine: &'a RelationalEngine,
table: String,
condition: Condition,
batch_size: usize,
current_offset: usize,
current_batch: Vec<Row>,
batch_index: usize,
exhausted: bool,
max_rows: Option<usize>,
rows_yielded: usize,
}
impl<'a> StreamingCursor<'a> {
pub(crate) fn new(
engine: &'a RelationalEngine,
table: impl Into<String>,
condition: Condition,
) -> Self {
Self {
engine,
table: table.into(),
condition,
batch_size: 1000,
current_offset: 0,
current_batch: Vec::new(),
batch_index: 0,
exhausted: false,
max_rows: None,
rows_yielded: 0,
}
}
#[must_use]
pub const fn with_batch_size(mut self, size: usize) -> Self {
self.batch_size = if size == 0 { 1000 } else { size };
self
}
#[must_use]
pub const fn with_max_rows(mut self, max: usize) -> Self {
self.max_rows = Some(max);
self
}
#[must_use]
pub const fn rows_yielded(&self) -> usize {
self.rows_yielded
}
#[must_use]
pub const fn is_exhausted(&self) -> bool {
self.exhausted
}
fn fetch_next_batch(&mut self) -> Result<()> {
if self.exhausted {
return Ok(());
}
let fetch_limit = if let Some(max) = self.max_rows {
let remaining = max.saturating_sub(self.rows_yielded);
if remaining == 0 {
self.exhausted = true;
return Ok(());
}
remaining.min(self.batch_size)
} else {
self.batch_size
};
let batch = self.engine.select_with_limit(
&self.table,
self.condition.clone(),
fetch_limit,
self.current_offset,
)?;
if batch.is_empty() {
self.exhausted = true;
self.current_batch = Vec::new();
} else {
self.current_offset += batch.len();
self.exhausted = batch.len() < fetch_limit;
self.current_batch = batch;
}
self.batch_index = 0;
Ok(())
}
}
impl Iterator for StreamingCursor<'_> {
type Item = Result<Row>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(max) = self.max_rows {
if self.rows_yielded >= max {
return None;
}
}
if self.batch_index >= self.current_batch.len() {
if self.exhausted {
return None;
}
if let Err(e) = self.fetch_next_batch() {
return Some(Err(e));
}
if self.current_batch.is_empty() {
return None;
}
}
if self.batch_index < self.current_batch.len() {
let row = self.current_batch[self.batch_index].clone();
self.batch_index += 1;
self.rows_yielded += 1;
Some(Ok(row))
} else {
None
}
}
}
impl std::fmt::Debug for StreamingCursor<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingCursor")
.field("table", &self.table)
.field("batch_size", &self.batch_size)
.field("current_offset", &self.current_offset)
.field("rows_yielded", &self.rows_yielded)
.field("exhausted", &self.exhausted)
.finish()
}
}
pub struct CursorBuilder<'a> {
engine: &'a RelationalEngine,
table: String,
condition: Condition,
batch_size: usize,
max_rows: Option<usize>,
}
impl<'a> CursorBuilder<'a> {
pub(crate) fn new(
engine: &'a RelationalEngine,
table: impl Into<String>,
condition: Condition,
) -> Self {
Self {
engine,
table: table.into(),
condition,
batch_size: 1000,
max_rows: None,
}
}
#[must_use]
pub const fn batch_size(mut self, size: usize) -> Self {
self.batch_size = if size == 0 { 1000 } else { size };
self
}
#[must_use]
pub const fn max_rows(mut self, max: usize) -> Self {
self.max_rows = Some(max);
self
}
#[must_use]
pub fn build(self) -> StreamingCursor<'a> {
let mut cursor = StreamingCursor::new(self.engine, self.table, self.condition);
cursor.batch_size = self.batch_size;
cursor.max_rows = self.max_rows;
cursor
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Column, ColumnType, Schema, Value};
use std::collections::HashMap;
fn create_test_engine() -> RelationalEngine {
let engine = RelationalEngine::new();
let schema = Schema::new(vec![
Column::new("id", ColumnType::Int),
Column::new("name", ColumnType::String),
]);
engine.create_table("test", schema).unwrap();
for i in 0..100 {
let mut values = HashMap::new();
values.insert("id".to_string(), Value::Int(i));
values.insert("name".to_string(), Value::String(format!("user_{i}")));
engine.insert("test", values).unwrap();
}
engine
}
#[test]
fn test_streaming_cursor_all_rows() {
let engine = create_test_engine();
let cursor = StreamingCursor::new(&engine, "test", Condition::True).with_batch_size(10);
let rows: Vec<_> = cursor.collect();
assert_eq!(rows.len(), 100);
assert!(rows.iter().all(Result::is_ok));
}
#[test]
fn test_streaming_cursor_with_max_rows() {
let engine = create_test_engine();
let cursor = StreamingCursor::new(&engine, "test", Condition::True)
.with_batch_size(10)
.with_max_rows(25);
let rows: Vec<_> = cursor.collect();
assert_eq!(rows.len(), 25);
}
#[test]
fn test_streaming_cursor_small_batch() {
let engine = create_test_engine();
let cursor = StreamingCursor::new(&engine, "test", Condition::True).with_batch_size(5);
let mut count = 0;
for result in cursor {
assert!(result.is_ok());
count += 1;
}
assert_eq!(count, 100);
}
#[test]
fn test_streaming_cursor_with_condition() {
let engine = create_test_engine();
let cursor = StreamingCursor::new(
&engine,
"test",
Condition::Lt("id".to_string(), Value::Int(10)),
)
.with_batch_size(5);
let rows: Vec<_> = cursor.collect();
assert_eq!(rows.len(), 10);
}
#[test]
fn test_cursor_builder() {
let engine = create_test_engine();
let cursor = CursorBuilder::new(&engine, "test", Condition::True)
.batch_size(20)
.max_rows(50)
.build();
let rows: Vec<_> = cursor.collect();
assert_eq!(rows.len(), 50);
}
#[test]
fn test_streaming_cursor_rows_yielded() {
let engine = create_test_engine();
let mut cursor = StreamingCursor::new(&engine, "test", Condition::True).with_batch_size(10);
assert_eq!(cursor.rows_yielded(), 0);
for _ in 0..5 {
let _ = cursor.next();
}
assert_eq!(cursor.rows_yielded(), 5);
}
#[test]
fn test_streaming_cursor_is_exhausted() {
let engine = create_test_engine();
let mut cursor = StreamingCursor::new(&engine, "test", Condition::True).with_max_rows(5);
assert!(!cursor.is_exhausted());
while cursor.next().is_some() {}
}
#[test]
fn test_streaming_cursor_debug() {
let engine = create_test_engine();
let cursor = StreamingCursor::new(&engine, "test", Condition::True).with_batch_size(100);
let debug_str = format!("{cursor:?}");
assert!(debug_str.contains("StreamingCursor"));
assert!(debug_str.contains("test"));
}
#[test]
fn test_cursor_empty_table() {
let engine = RelationalEngine::new();
let schema = Schema::new(vec![Column::new("id", ColumnType::Int)]);
engine.create_table("empty", schema).unwrap();
let cursor = StreamingCursor::new(&engine, "empty", Condition::True);
let rows: Vec<_> = cursor.collect();
assert!(rows.is_empty());
}
}