use crate::common::CompactArc;
use crate::core::{Row, Value};
use crate::storage::mvcc::arena::ArenaReadGuard;
#[derive(Clone, Copy)]
pub struct VisibleRowInfo {
pub row_id: i64,
pub arena_idx: usize,
}
pub struct StreamingResult<'a> {
arena_guard: ArenaReadGuard<'a>,
visible_indices: Vec<VisibleRowInfo>,
fallback_data: Vec<CompactArc<[Value]>>,
arena_len: usize,
current_pos: usize,
columns: Vec<String>,
current_row: Row,
current_arc: Option<CompactArc<[Value]>>,
}
impl<'a> StreamingResult<'a> {
pub fn new(
arena_guard: ArenaReadGuard<'a>,
visible_indices: Vec<VisibleRowInfo>,
columns: Vec<String>,
) -> Self {
let arena_len = arena_guard.len();
Self {
arena_guard,
visible_indices,
fallback_data: Vec::new(),
arena_len,
current_pos: 0,
columns,
current_row: Row::new(),
current_arc: None,
}
}
pub fn new_with_fallback(
arena_guard: ArenaReadGuard<'a>,
visible_indices: Vec<VisibleRowInfo>,
fallback_data: Vec<CompactArc<[Value]>>,
columns: Vec<String>,
) -> Self {
let arena_len = arena_guard.len();
Self {
arena_guard,
visible_indices,
fallback_data,
arena_len,
current_pos: 0,
columns,
current_row: Row::new(),
current_arc: None,
}
}
#[inline]
pub fn columns(&self) -> &[String] {
&self.columns
}
#[inline]
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> bool {
if self.current_pos < self.visible_indices.len() {
let info = &self.visible_indices[self.current_pos];
if info.arena_idx < self.arena_len {
if let Some(arc) = self.arena_guard.data().get(info.arena_idx) {
self.current_row = Row::from_arc(CompactArc::clone(arc));
self.current_arc = Some(CompactArc::clone(arc));
} else {
self.current_arc = None;
}
} else {
let fb_idx = info.arena_idx - self.arena_len;
if let Some(arc) = self.fallback_data.get(fb_idx) {
self.current_row = Row::from_arc(CompactArc::clone(arc));
self.current_arc = Some(CompactArc::clone(arc));
} else {
self.current_arc = None;
}
}
self.current_pos += 1;
true
} else {
false
}
}
#[inline]
pub fn row_arc_slice(&self) -> Option<&CompactArc<[Value]>> {
self.current_arc.as_ref()
}
#[inline]
pub fn row_id(&self) -> i64 {
if self.current_pos == 0 || self.current_pos > self.visible_indices.len() {
return 0;
}
self.visible_indices[self.current_pos - 1].row_id
}
#[inline]
pub fn row(&self) -> &Row {
&self.current_row
}
#[inline]
pub fn get(&self, col: usize) -> Option<&Value> {
self.current_arc.as_ref().and_then(|arc| arc.get(col))
}
#[inline]
pub fn remaining(&self) -> usize {
self.visible_indices.len().saturating_sub(self.current_pos)
}
pub fn reset(&mut self) {
self.current_pos = 0;
self.current_arc = None;
}
pub fn as_aggregation_scanner(&self) -> AggregationScanner<'_> {
AggregationScanner::new(
self.arena_guard.data(),
&self.visible_indices,
&self.fallback_data,
self.arena_len,
)
}
}
pub struct AggregationScanner<'a> {
arena_data: &'a [CompactArc<[Value]>],
visible_indices: &'a [VisibleRowInfo],
fallback_data: &'a [CompactArc<[Value]>],
arena_len: usize,
}
impl<'a> AggregationScanner<'a> {
pub fn new(
arena_data: &'a [CompactArc<[Value]>],
visible_indices: &'a [VisibleRowInfo],
fallback_data: &'a [CompactArc<[Value]>],
arena_len: usize,
) -> Self {
Self {
arena_data,
visible_indices,
fallback_data,
arena_len,
}
}
#[inline(always)]
fn get_row(&self, info: &VisibleRowInfo) -> Option<&CompactArc<[Value]>> {
if info.arena_idx < self.arena_len {
self.arena_data.get(info.arena_idx)
} else {
self.fallback_data.get(info.arena_idx - self.arena_len)
}
}
#[inline]
pub fn sum_column(&self, col_idx: usize) -> f64 {
let mut sum = 0.0f64;
for info in self.visible_indices {
if let Some(row) = self.get_row(info) {
if let Some(val) = row.get(col_idx) {
match val {
Value::Integer(i) => sum += *i as f64,
Value::Float(f) => sum += *f,
_ => {}
}
}
}
}
sum
}
#[inline]
pub fn count(&self) -> usize {
self.visible_indices.len()
}
#[inline]
pub fn count_column(&self, col_idx: usize) -> usize {
let mut count = 0;
for info in self.visible_indices {
if let Some(row) = self.get_row(info) {
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
count += 1;
}
}
}
}
count
}
pub fn min_column(&self, col_idx: usize) -> Option<Value> {
let mut min: Option<Value> = None;
for info in self.visible_indices {
if let Some(row) = self.get_row(info) {
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
match &min {
None => min = Some(val.clone()),
Some(current) => {
if let Ok(ord) = val.compare(current) {
if ord == std::cmp::Ordering::Less {
min = Some(val.clone());
}
}
}
}
}
}
}
}
min
}
pub fn max_column(&self, col_idx: usize) -> Option<Value> {
let mut max: Option<Value> = None;
for info in self.visible_indices {
if let Some(row) = self.get_row(info) {
if let Some(val) = row.get(col_idx) {
if !val.is_null() {
match &max {
None => max = Some(val.clone()),
Some(current) => {
if let Ok(ord) = val.compare(current) {
if ord == std::cmp::Ordering::Greater {
max = Some(val.clone());
}
}
}
}
}
}
}
}
max
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::DataType;
use crate::storage::mvcc::arena::RowArena;
#[test]
fn test_aggregation_scanner() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(1), Value::Float(10.0)]);
arena.insert(2, 1, &[Value::Integer(2), Value::Float(20.0)]);
arena.insert(3, 1, &[Value::Integer(3), Value::Float(30.0)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.count(), 3);
assert_eq!(scanner.sum_column(0), 6.0); assert_eq!(scanner.sum_column(1), 60.0); }
#[test]
fn test_streaming_result_empty() {
let arena = RowArena::new();
let guard = arena.read_guard();
let visible: Vec<VisibleRowInfo> = vec![];
let columns = vec!["id".to_string(), "value".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert!(!result.next());
assert_eq!(result.remaining(), 0);
assert!(result.row_arc_slice().is_none());
assert_eq!(result.row_id(), 0);
assert!(result.get(0).is_none());
}
#[test]
fn test_streaming_result_iteration() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(100), Value::Text("a".into())]);
arena.insert(2, 1, &[Value::Integer(200), Value::Text("b".into())]);
arena.insert(3, 1, &[Value::Integer(300), Value::Text("c".into())]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let columns = vec!["id".to_string(), "name".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert_eq!(result.remaining(), 3);
assert_eq!(result.columns(), &["id", "name"]);
assert!(result.next());
assert_eq!(result.remaining(), 2);
assert_eq!(result.row_id(), 1);
assert_eq!(result.get(0), Some(&Value::Integer(100)));
assert_eq!(result.get(1), Some(&Value::Text("a".into())));
assert!(result.get(2).is_none());
assert!(result.next());
assert_eq!(result.remaining(), 1);
assert_eq!(result.row_id(), 2);
assert_eq!(result.get(0), Some(&Value::Integer(200)));
assert!(result.next());
assert_eq!(result.remaining(), 0);
assert_eq!(result.row_id(), 3);
assert_eq!(result.get(0), Some(&Value::Integer(300)));
assert!(!result.next());
assert_eq!(result.remaining(), 0);
}
#[test]
fn test_streaming_result_reset() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(1)]);
arena.insert(2, 1, &[Value::Integer(2)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
];
let columns = vec!["id".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert!(result.next());
assert!(result.next());
assert!(!result.next());
assert_eq!(result.remaining(), 0);
result.reset();
assert_eq!(result.remaining(), 2);
assert!(result.current_arc.is_none());
assert!(result.next());
assert_eq!(result.row_id(), 1);
}
#[test]
fn test_streaming_result_row_id_edge_cases() {
let arena = RowArena::new();
arena.insert(100, 1, &[Value::Integer(1)]);
let guard = arena.read_guard();
let visible = vec![VisibleRowInfo {
row_id: 100,
arena_idx: 0,
}];
let columns = vec!["id".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert_eq!(result.row_id(), 0);
assert!(result.next());
assert_eq!(result.row_id(), 100);
assert!(!result.next());
assert_eq!(result.row_id(), 100);
}
#[test]
fn test_streaming_result_row_id_with_many_rows() {
let arena = RowArena::new();
arena.insert(10, 1, &[Value::Integer(1)]);
arena.insert(20, 1, &[Value::Integer(2)]);
arena.insert(30, 1, &[Value::Integer(3)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 10,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 20,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 30,
arena_idx: 2,
},
];
let columns = vec!["id".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert_eq!(result.row_id(), 0);
assert!(result.next());
assert_eq!(result.row_id(), 10);
assert!(result.next());
assert_eq!(result.row_id(), 20);
assert!(result.next());
assert_eq!(result.row_id(), 30);
assert!(!result.next());
assert_eq!(result.row_id(), 30); }
#[test]
fn test_streaming_result_row_slice_and_row() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(42), Value::Float(3.5)]);
let guard = arena.read_guard();
let visible = vec![VisibleRowInfo {
row_id: 1,
arena_idx: 0,
}];
let columns = vec!["num".to_string(), "val".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert!(result.row_arc_slice().is_none());
assert!(result.next());
let slice = result.row_arc_slice().unwrap();
assert_eq!(slice.len(), 2);
assert_eq!(slice[0], Value::Integer(42));
assert_eq!(slice[1], Value::Float(3.5));
let row = result.row();
assert_eq!(row.len(), 2);
}
#[test]
fn test_streaming_result_invalid_arena_index() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(1)]);
let guard = arena.read_guard();
let visible = vec![VisibleRowInfo {
row_id: 1,
arena_idx: 999,
}];
let columns = vec!["id".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert!(result.next());
assert!(result.current_arc.is_none());
assert!(result.row_arc_slice().is_none());
}
#[test]
fn test_streaming_result_as_aggregation_scanner() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(10)]);
arena.insert(2, 1, &[Value::Integer(20)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
];
let columns = vec!["value".to_string()];
let result = StreamingResult::new(guard, visible, columns);
let scanner = result.as_aggregation_scanner();
assert_eq!(scanner.count(), 2);
assert_eq!(scanner.sum_column(0), 30.0);
}
#[test]
fn test_aggregation_scanner_empty() {
let arena = RowArena::new();
let guard = arena.read_guard();
let visible: Vec<VisibleRowInfo> = vec![];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.count(), 0);
assert_eq!(scanner.sum_column(0), 0.0);
assert_eq!(scanner.count_column(0), 0);
assert!(scanner.min_column(0).is_none());
assert!(scanner.max_column(0).is_none());
}
#[test]
fn test_aggregation_scanner_with_nulls() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(10), Value::Null(DataType::Integer)]);
arena.insert(2, 1, &[Value::Null(DataType::Integer), Value::Integer(20)]);
arena.insert(3, 1, &[Value::Integer(30), Value::Integer(30)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.count(), 3);
assert_eq!(scanner.count_column(0), 2); assert_eq!(scanner.count_column(1), 2);
assert_eq!(scanner.sum_column(0), 40.0); assert_eq!(scanner.sum_column(1), 50.0); }
#[test]
fn test_aggregation_scanner_all_nulls() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Null(DataType::Integer)]);
arena.insert(2, 1, &[Value::Null(DataType::Integer)]);
arena.insert(3, 1, &[Value::Null(DataType::Integer)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.count(), 3);
assert_eq!(scanner.count_column(0), 0);
assert_eq!(scanner.sum_column(0), 0.0);
assert!(scanner.min_column(0).is_none());
assert!(scanner.max_column(0).is_none());
}
#[test]
fn test_aggregation_scanner_min_max() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(50)]);
arena.insert(2, 1, &[Value::Integer(10)]);
arena.insert(3, 1, &[Value::Integer(30)]);
arena.insert(4, 1, &[Value::Null(DataType::Integer)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
VisibleRowInfo {
row_id: 4,
arena_idx: 3,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.min_column(0), Some(Value::Integer(10)));
assert_eq!(scanner.max_column(0), Some(Value::Integer(50)));
}
#[test]
fn test_aggregation_scanner_min_max_floats() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Float(3.15)]);
arena.insert(2, 1, &[Value::Float(2.72)]);
arena.insert(3, 1, &[Value::Float(1.42)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.min_column(0), Some(Value::Float(1.42)));
assert_eq!(scanner.max_column(0), Some(Value::Float(3.15)));
}
#[test]
fn test_aggregation_scanner_min_max_text() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Text("banana".into())]);
arena.insert(2, 1, &[Value::Text("apple".into())]);
arena.insert(3, 1, &[Value::Text("cherry".into())]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.min_column(0), Some(Value::Text("apple".into())));
assert_eq!(scanner.max_column(0), Some(Value::Text("cherry".into())));
}
#[test]
fn test_aggregation_scanner_sum_non_numeric() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Text("hello".into())]);
arena.insert(2, 1, &[Value::Boolean(true)]);
arena.insert(3, 1, &[Value::Integer(10)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.sum_column(0), 10.0);
}
#[test]
fn test_aggregation_scanner_mixed_numeric() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(10)]);
arena.insert(2, 1, &[Value::Float(20.5)]);
arena.insert(3, 1, &[Value::Integer(30)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 1,
},
VisibleRowInfo {
row_id: 3,
arena_idx: 2,
},
];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.sum_column(0), 60.5);
}
#[test]
fn test_aggregation_scanner_invalid_column() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(10)]);
let guard = arena.read_guard();
let visible = vec![VisibleRowInfo {
row_id: 1,
arena_idx: 0,
}];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.sum_column(99), 0.0);
assert_eq!(scanner.count_column(99), 0);
assert!(scanner.min_column(99).is_none());
assert!(scanner.max_column(99).is_none());
}
#[test]
fn test_aggregation_scanner_invalid_arena_index() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(10)]);
let guard = arena.read_guard();
let visible = vec![
VisibleRowInfo {
row_id: 1,
arena_idx: 0,
},
VisibleRowInfo {
row_id: 2,
arena_idx: 999,
}, ];
let scanner = AggregationScanner::new(guard.data(), &visible, &[], guard.len());
assert_eq!(scanner.count(), 2);
assert_eq!(scanner.sum_column(0), 10.0); assert_eq!(scanner.count_column(0), 1); }
#[test]
fn test_visible_row_info_clone_copy() {
let info = VisibleRowInfo {
row_id: 42,
arena_idx: 5,
};
let copied = info;
assert_eq!(copied.row_id, 42);
assert_eq!(copied.arena_idx, 5);
let cloned = Clone::clone(&info);
assert_eq!(cloned.row_id, 42);
assert_eq!(cloned.arena_idx, 5);
}
#[test]
fn test_streaming_result_single_row() {
let arena = RowArena::new();
arena.insert(1, 1, &[Value::Integer(42)]);
let guard = arena.read_guard();
let visible = vec![VisibleRowInfo {
row_id: 1,
arena_idx: 0,
}];
let columns = vec!["value".to_string()];
let mut result = StreamingResult::new(guard, visible, columns);
assert_eq!(result.remaining(), 1);
assert!(result.next());
assert_eq!(result.remaining(), 0);
assert_eq!(result.get(0), Some(&Value::Integer(42)));
assert!(!result.next());
}
}