use crate::common::CompactArc;
use crate::core::{Result, Row, RowVec, Schema};
use crate::storage::expression::Expression;
use crate::storage::traits::Scanner;
pub struct MVCCScanner {
rows: RowVec,
current_index: isize,
column_indices: Vec<usize>,
#[allow(dead_code)]
schema: CompactArc<Schema>,
#[allow(dead_code)]
filter: Option<Box<dyn Expression>>,
error: Option<crate::core::Error>,
#[allow(dead_code)]
projected_row: Row,
closed: bool,
}
impl MVCCScanner {
pub fn new(
rows: RowVec,
schema: CompactArc<Schema>,
column_indices: Vec<usize>,
filter: Option<Box<dyn Expression>>,
) -> Self {
let filtered_rows: RowVec = if let Some(ref expr) = filter {
rows.into_iter()
.filter(|(_, row)| expr.evaluate(row).unwrap_or_default())
.collect()
} else {
rows
};
Self::from_rows(filtered_rows, schema, column_indices)
}
#[inline]
pub fn from_rows(rows: RowVec, schema: CompactArc<Schema>, column_indices: Vec<usize>) -> Self {
let num_schema_cols = schema.columns.len();
let needs_projection = !column_indices.is_empty()
&& column_indices.len() < num_schema_cols
&& !column_indices.iter().enumerate().all(|(i, &idx)| i == idx);
let projected_rows: RowVec = if needs_projection {
rows.into_iter()
.map(|(id, row)| {
let projected_values: Vec<crate::core::Value> = column_indices
.iter()
.map(|&idx| {
row.get(idx)
.cloned()
.unwrap_or_else(crate::core::Value::null_unknown)
})
.collect();
(id, Row::from_values(projected_values))
})
.collect()
} else {
rows
};
Self {
rows: projected_rows,
current_index: -1,
column_indices: if needs_projection {
vec![]
} else {
column_indices
}, schema,
filter: None,
error: None,
projected_row: Row::default(),
closed: false,
}
}
#[inline]
pub fn empty(schema: CompactArc<Schema>, column_indices: Vec<usize>) -> Self {
Self {
rows: RowVec::new(),
current_index: -1,
column_indices,
schema,
filter: None,
error: None,
projected_row: Row::default(),
closed: false,
}
}
pub fn single(row: Row, schema: CompactArc<Schema>, column_indices: Vec<usize>) -> Self {
let mut rows = RowVec::new();
rows.push((0, row));
Self {
rows,
current_index: -1,
column_indices: column_indices.clone(),
schema,
filter: None,
error: None,
projected_row: Row::from_values(vec![
crate::core::Value::null_unknown();
column_indices.len()
]),
closed: false,
}
}
pub fn len(&self) -> usize {
self.rows.len()
}
pub fn is_empty(&self) -> bool {
self.rows.is_empty()
}
}
impl Scanner for MVCCScanner {
fn next(&mut self) -> bool {
if self.closed || self.error.is_some() {
return false;
}
self.current_index += 1;
(self.current_index as usize) < self.rows.len()
}
fn row(&self) -> &Row {
if self.current_index < 0 || (self.current_index as usize) >= self.rows.len() {
static EMPTY_ROW: std::sync::OnceLock<Row> = std::sync::OnceLock::new();
return EMPTY_ROW.get_or_init(|| Row::from_values(vec![]));
}
let (_, ref source_row) = self.rows[self.current_index as usize];
if self.column_indices.is_empty() {
return source_row;
}
if self.column_indices.len() == source_row.len() {
let all_match = self
.column_indices
.iter()
.enumerate()
.all(|(i, &idx)| i == idx);
if all_match {
return source_row;
}
}
source_row
}
fn err(&self) -> Option<&crate::core::Error> {
self.error.as_ref()
}
fn close(&mut self) -> Result<()> {
self.closed = true;
self.rows.clear();
Ok(())
}
fn take_row(&mut self) -> Row {
if self.current_index < 0 || (self.current_index as usize) >= self.rows.len() {
return Row::new();
}
let idx = self.current_index as usize;
std::mem::take(&mut self.rows[idx].1)
}
fn estimated_count(&self) -> Option<usize> {
Some(self.rows.len())
}
}
pub struct RangeScanner {
txn_id: i64,
#[allow(dead_code)]
current_id: i64,
#[allow(dead_code)]
end_id: i64,
#[allow(dead_code)]
inclusive: bool,
#[allow(dead_code)]
column_indices: Vec<usize>,
#[allow(dead_code)]
schema: CompactArc<Schema>,
error: Option<crate::core::Error>,
#[allow(dead_code)]
projected_row: Row,
rows: RowVec,
row_index: isize,
}
impl RangeScanner {
pub fn new(
start_id: i64,
end_id: i64,
inclusive: bool,
txn_id: i64,
schema: CompactArc<Schema>,
column_indices: Vec<usize>,
rows: RowVec,
) -> Self {
let actual_end = if inclusive { end_id } else { end_id - 1 };
let filtered_rows: RowVec = rows
.into_iter()
.filter(|(id, _)| *id >= start_id && *id <= actual_end)
.collect();
Self {
txn_id,
current_id: start_id,
end_id,
inclusive,
column_indices: column_indices.clone(),
schema,
error: None,
projected_row: Row::from_values(vec![
crate::core::Value::null_unknown();
column_indices.len()
]),
rows: filtered_rows,
row_index: -1,
}
}
pub fn txn_id(&self) -> i64 {
self.txn_id
}
}
impl Scanner for RangeScanner {
fn next(&mut self) -> bool {
if self.error.is_some() {
return false;
}
self.row_index += 1;
(self.row_index as usize) < self.rows.len()
}
fn row(&self) -> &Row {
static EMPTY_ROW: std::sync::OnceLock<Row> = std::sync::OnceLock::new();
if self.row_index >= 0 && (self.row_index as usize) < self.rows.len() {
let (_, ref row) = self.rows[self.row_index as usize];
row
} else {
EMPTY_ROW.get_or_init(|| Row::from_values(vec![]))
}
}
fn err(&self) -> Option<&crate::core::Error> {
self.error.as_ref()
}
fn close(&mut self) -> Result<()> {
self.rows.clear();
Ok(())
}
fn take_row(&mut self) -> Row {
if self.row_index >= 0 && (self.row_index as usize) < self.rows.len() {
let idx = self.row_index as usize;
std::mem::take(&mut self.rows[idx].1)
} else {
Row::new()
}
}
}
pub struct EmptyScanner;
impl EmptyScanner {
pub fn new() -> Self {
Self
}
}
impl Default for EmptyScanner {
fn default() -> Self {
Self::new()
}
}
impl Scanner for EmptyScanner {
fn next(&mut self) -> bool {
false
}
fn row(&self) -> &Row {
static EMPTY_ROW: std::sync::OnceLock<Row> = std::sync::OnceLock::new();
EMPTY_ROW.get_or_init(|| Row::from_values(vec![]))
}
fn err(&self) -> Option<&crate::core::Error> {
None
}
fn close(&mut self) -> Result<()> {
Ok(())
}
}
pub struct SingleRowScanner {
row: Row,
#[allow(dead_code)]
column_indices: Vec<usize>,
done: bool,
}
impl SingleRowScanner {
pub fn new(row: Row, column_indices: Vec<usize>) -> Self {
Self {
row,
column_indices,
done: false,
}
}
}
impl Scanner for SingleRowScanner {
fn next(&mut self) -> bool {
if self.done {
false
} else {
self.done = true;
true
}
}
fn row(&self) -> &Row {
if self.done {
&self.row
} else {
static EMPTY_ROW: std::sync::OnceLock<Row> = std::sync::OnceLock::new();
EMPTY_ROW.get_or_init(|| Row::from_values(vec![]))
}
}
fn err(&self) -> Option<&crate::core::Error> {
None
}
fn close(&mut self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{DataType, SchemaBuilder, Value};
fn test_schema() -> CompactArc<Schema> {
CompactArc::new(
SchemaBuilder::new("test")
.column("id", DataType::Integer, false, false)
.build(),
)
}
#[test]
fn test_mvcc_scanner_empty() {
let schema = test_schema();
let mut scanner = MVCCScanner::empty(schema, vec![0]);
assert!(!scanner.next());
assert!(scanner.is_empty());
}
#[test]
fn test_mvcc_scanner_single() {
let schema = test_schema();
let row = Row::from_values(vec![Value::Integer(42)]);
let mut scanner = MVCCScanner::single(row, schema, vec![0]);
assert_eq!(scanner.len(), 1);
assert!(!scanner.is_empty());
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(42)));
assert!(!scanner.next());
}
#[test]
fn test_mvcc_scanner_multiple_rows() {
let schema = test_schema();
let mut rows = RowVec::new();
rows.push((1, Row::from_values(vec![Value::Integer(1)])));
rows.push((2, Row::from_values(vec![Value::Integer(2)])));
rows.push((3, Row::from_values(vec![Value::Integer(3)])));
let mut scanner = MVCCScanner::from_rows(rows, schema, vec![0]);
assert_eq!(scanner.len(), 3);
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(1)));
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(2)));
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(3)));
assert!(!scanner.next());
}
#[test]
fn test_mvcc_scanner_close() {
let schema = test_schema();
let mut rows = RowVec::new();
rows.push((1, Row::from_values(vec![Value::Integer(1)])));
let mut scanner = MVCCScanner::from_rows(rows, schema, vec![0]);
assert!(scanner.next());
assert!(scanner.close().is_ok());
assert!(!scanner.next());
}
#[test]
fn test_empty_scanner() {
let mut scanner = EmptyScanner::new();
assert!(!scanner.next());
assert!(scanner.err().is_none());
assert!(scanner.close().is_ok());
}
#[test]
fn test_single_row_scanner() {
let row = Row::from_values(vec![Value::Integer(42), Value::text("test")]);
let mut scanner = SingleRowScanner::new(row, vec![0, 1]);
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(42)));
assert_eq!(scanner.row().get(1), Some(&Value::text("test")));
assert!(!scanner.next());
}
#[test]
fn test_range_scanner() {
let schema = test_schema();
let mut rows = RowVec::new();
rows.push((1, Row::from_values(vec![Value::Integer(1)])));
rows.push((2, Row::from_values(vec![Value::Integer(2)])));
rows.push((3, Row::from_values(vec![Value::Integer(3)])));
rows.push((5, Row::from_values(vec![Value::Integer(5)])));
let mut scanner = RangeScanner::new(1, 3, true, 1, schema, vec![0], rows);
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(1)));
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(2)));
assert!(scanner.next());
assert_eq!(scanner.row().get(0), Some(&Value::Integer(3)));
assert!(!scanner.next()); }
}