#![allow(dead_code)]
pub use crate::incremental::aggregate_operator::{
AggregateEvalState, AggregateFunction, AggregateState,
};
pub use crate::incremental::filter_operator::{FilterOperator, FilterPredicate};
pub use crate::incremental::input_operator::InputOperator;
pub use crate::incremental::join_operator::{JoinEvalState, JoinOperator, JoinType};
pub use crate::incremental::project_operator::{ProjectColumn, ProjectOperator};
use crate::incremental::dbsp::{Delta, DeltaPair};
#[cfg(test)]
use crate::numeric::Numeric;
use crate::schema::{Index, IndexColumn};
use crate::storage::btree::BTreeCursor;
use crate::sync::Arc;
use crate::sync::Mutex;
use crate::types::IOResult;
use crate::Result;
use std::fmt::Debug;
pub struct DbspStateCursors {
pub table_cursor: BTreeCursor,
pub index_cursor: BTreeCursor,
}
impl DbspStateCursors {
pub fn new(table_cursor: BTreeCursor, index_cursor: BTreeCursor) -> Self {
Self {
table_cursor,
index_cursor,
}
}
}
pub fn create_dbsp_state_index(root_page: i64) -> Index {
Index {
name: "dbsp_state_pk".to_string(),
table_name: "dbsp_state".to_string(),
root_page,
columns: vec![
IndexColumn {
name: "operator_id".to_string(),
order: turso_parser::ast::SortOrder::Asc,
collation: None,
pos_in_table: 0,
default: None,
expr: None,
},
IndexColumn {
name: "zset_id".to_string(),
order: turso_parser::ast::SortOrder::Asc,
collation: None,
pos_in_table: 1,
default: None,
expr: None,
},
IndexColumn {
name: "element_id".to_string(),
order: turso_parser::ast::SortOrder::Asc,
collation: None,
pos_in_table: 2,
default: None,
expr: None,
},
],
unique: true,
ephemeral: false,
has_rowid: true,
where_clause: None,
index_method: None,
on_conflict: None,
}
}
pub fn generate_storage_id(operator_id: i64, column_index: usize, op_type: u8) -> i64 {
assert!(op_type <= 3, "Invalid operation type");
assert!(column_index < 16384, "Column index too large");
((operator_id) << 16) | ((column_index as i64) << 2) | (op_type as i64)
}
#[derive(Debug)]
pub enum EvalState {
Uninitialized,
Init { deltas: DeltaPair },
Aggregate(Box<AggregateEvalState>),
Join(Box<JoinEvalState>),
Done,
}
impl From<Delta> for EvalState {
fn from(delta: Delta) -> Self {
EvalState::Init {
deltas: delta.into(),
}
}
}
impl From<DeltaPair> for EvalState {
fn from(deltas: DeltaPair) -> Self {
EvalState::Init { deltas }
}
}
impl EvalState {
pub fn from_delta(delta: Delta) -> Self {
Self::Init {
deltas: delta.into(),
}
}
fn delta_ref(&self) -> &Delta {
match self {
EvalState::Init { deltas } => &deltas.left,
_ => panic!("delta_ref() can only be called when in Init state",),
}
}
pub fn extract_delta(&mut self) -> Delta {
match self {
EvalState::Init { deltas } => {
let extracted = std::mem::take(&mut deltas.left);
*self = EvalState::Uninitialized;
extracted
}
_ => panic!("extract_delta() can only be called when in Init state"),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct ComputationTracker {
pub filter_evaluations: usize,
pub project_operations: usize,
pub join_lookups: usize,
pub aggregation_updates: usize,
pub full_scans: usize,
}
impl ComputationTracker {
pub fn new() -> Self {
Self::default()
}
pub fn record_filter(&mut self) {
self.filter_evaluations += 1;
}
pub fn record_project(&mut self) {
self.project_operations += 1;
}
pub fn record_join_lookup(&mut self) {
self.join_lookups += 1;
}
pub fn record_aggregation(&mut self) {
self.aggregation_updates += 1;
}
pub fn record_full_scan(&mut self) {
self.full_scans += 1;
}
pub fn total_computations(&self) -> usize {
self.filter_evaluations
+ self.project_operations
+ self.join_lookups
+ self.aggregation_updates
}
}
#[derive(Debug, Clone)]
pub enum QueryOperator {
TableScan {
table_name: String,
column_names: Vec<String>,
},
Filter {
predicate: FilterPredicate,
input: usize, },
Project {
columns: Vec<ProjectColumn>,
input: usize,
},
Join {
join_type: JoinType,
on_column: String,
left_input: usize,
right_input: usize,
},
Aggregate {
group_by: Vec<String>,
aggregates: Vec<AggregateFunction>,
input: usize,
},
}
pub trait IncrementalOperator: Debug + Send {
fn eval(
&mut self,
state: &mut EvalState,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>>;
fn commit(
&mut self,
deltas: DeltaPair,
cursors: &mut DbspStateCursors,
) -> Result<IOResult<Delta>>;
fn set_tracker(&mut self, tracker: Arc<Mutex<ComputationTracker>>);
}
#[cfg(test)]
mod tests {
use rustc_hash::FxHashSet as HashSet;
use super::*;
use crate::incremental::aggregate_operator::{AggregateOperator, AGG_TYPE_REGULAR};
use crate::incremental::dbsp::HashableRow;
use crate::storage::btree::CursorTrait;
use crate::storage::pager::CreateBTreeFlags;
use crate::sync::Arc;
use crate::sync::Mutex;
use crate::types::Text;
use crate::util::IOExt;
use crate::Value;
use crate::{Database, MemoryIO, IO};
fn create_test_pager() -> (crate::sync::Arc<crate::Pager>, i64, i64) {
let io: Arc<dyn IO> = Arc::new(MemoryIO::new());
let db = Database::open_file(io.clone(), ":memory:").unwrap();
let conn = db.connect().unwrap();
let pager = conn.pager.load().clone();
let _ = pager.io.block(|| pager.allocate_page1());
let table_root_page_id = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_table()))
.expect("Failed to create BTree for aggregate state table")
as i64;
let index_root_page_id = pager
.io
.block(|| pager.btree_create(&CreateBTreeFlags::new_index()))
.expect("Failed to create BTree for aggregate state index")
as i64;
(pager, table_root_page_id, index_root_page_id)
}
fn get_current_state_from_btree(
agg: &AggregateOperator,
pager: &crate::sync::Arc<crate::Pager>,
cursors: &mut DbspStateCursors,
) -> Delta {
let mut result = Delta::new();
pager.io.block(|| cursors.table_cursor.rewind()).unwrap();
loop {
if cursors.table_cursor.is_empty() {
break;
}
let record = loop {
match cursors.table_cursor.record().unwrap() {
IOResult::Done(r) => break r,
IOResult::IO(io) => io.wait(&*pager.io).unwrap(),
}
}
.unwrap()
.to_owned();
let values: Vec<Value> = record.get_values_owned().unwrap();
if let Some(Value::Numeric(Numeric::Integer(op_id))) = values.first() {
let expected_op_id = generate_storage_id(agg.operator_id, 0, AGG_TYPE_REGULAR);
if *op_id != expected_op_id {
pager.io.block(|| cursors.table_cursor.next()).unwrap();
continue;
}
if let Some(Value::Blob(blob)) = values.get(3) {
match AggregateState::from_blob(blob) {
Ok((state, group_key)) => {
assert!(state.count != 0);
let mut output_values = group_key.clone();
output_values.extend(state.to_values(&agg.aggregates));
let group_key_str = AggregateOperator::group_key_to_string(&group_key);
let rowid = agg.generate_group_rowid(&group_key_str);
let output_row = HashableRow::new(rowid, output_values);
result.changes.push((output_row, 1));
}
Err(e) => {
eprintln!("Failed to deserialize aggregate state: {e}");
}
}
}
}
pager.io.block(|| cursors.table_cursor.next()).unwrap();
}
result.consolidate();
result
}
fn assert_incremental(tracker: &ComputationTracker, expected_ops: usize, data_size: usize) {
assert!(
tracker.total_computations() <= expected_ops,
"Expected <= {} operations for incremental update, got {}",
expected_ops,
tracker.total_computations()
);
assert!(
tracker.total_computations() < data_size,
"Computation count {} suggests full recomputation (data size: {})",
tracker.total_computations(),
data_size
);
assert_eq!(
tracker.full_scans, 0,
"Incremental computation should not perform full scans"
);
}
#[test]
fn test_aggregate_incremental_update_emits_retraction() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![AggregateFunction::Sum(2)], vec!["id".to_string(), "name".to_string(), "age".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Alice".to_string().into()),
Value::from_i64(25),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Bob".to_string().into()),
Value::from_i64(30),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Charlie".to_string().into()),
Value::from_i64(35),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 1, "Should have one aggregate row");
let (row, weight) = &state.changes[0];
assert_eq!(*weight, 1, "Aggregate row should have weight 1");
assert_eq!(row.values[0], Value::from_f64(90.0), "SUM should be 90");
let mut update_delta = Delta::new();
update_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("David".to_string().into()),
Value::from_i64(40),
],
);
let output_delta = pager
.io
.block(|| agg.commit((&update_delta).into(), &mut cursors))
.unwrap();
assert_eq!(
output_delta.changes.len(),
2,
"Expected 2 changes (retraction + insertion), got {}: {:?}",
output_delta.changes.len(),
output_delta.changes
);
let (retraction_row, retraction_weight) = &output_delta.changes[0];
assert_eq!(
*retraction_weight, -1,
"First change should be a retraction"
);
assert_eq!(
retraction_row.values[0],
Value::from_f64(90.0),
"Retracted value should be the old sum (90)"
);
let (insertion_row, insertion_weight) = &output_delta.changes[1];
assert_eq!(*insertion_weight, 1, "Second change should be an insertion");
assert_eq!(
insertion_row.values[0],
Value::from_f64(130.0),
"Inserted value should be the new sum (130)"
);
assert_eq!(
retraction_row.rowid, insertion_row.rowid,
"Retraction and insertion should have the same row ID"
);
}
#[test]
fn test_aggregate_with_group_by_emits_retractions() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![AggregateFunction::Sum(3)], vec![
"id".to_string(),
"team".to_string(),
"player".to_string(),
"score".to_string(),
],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("red".to_string().into()),
Value::Text("Alice".to_string().into()),
Value::from_i64(10),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("blue".to_string().into()),
Value::Text("Bob".to_string().into()),
Value::from_i64(15),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("red".to_string().into()),
Value::Text("Charlie".to_string().into()),
Value::from_i64(20),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 2, "Should have two groups");
let mut red_sum = None;
let mut blue_sum = None;
for (row, weight) in &state.changes {
assert_eq!(*weight, 1);
if let Value::Text(team) = &row.values[0] {
if team.as_str() == "red" {
red_sum = Some(&row.values[1]);
} else if team.as_str() == "blue" {
blue_sum = Some(&row.values[1]);
}
}
}
assert_eq!(
red_sum,
Some(&Value::from_f64(30.0)),
"Red team sum should be 30"
);
assert_eq!(
blue_sum,
Some(&Value::from_f64(15.0)),
"Blue team sum should be 15"
);
let mut update_delta = Delta::new();
update_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("red".to_string().into()),
Value::Text("David".to_string().into()),
Value::from_i64(25),
],
);
let output_delta = pager
.io
.block(|| agg.commit((&update_delta).into(), &mut cursors))
.unwrap();
assert_eq!(
output_delta.changes.len(),
2,
"Expected 2 changes for red team only, got {}: {:?}",
output_delta.changes.len(),
output_delta.changes
);
let mut found_retraction = false;
let mut found_insertion = false;
for (row, weight) in &output_delta.changes {
if let Value::Text(team) = &row.values[0] {
assert_eq!(team.as_str(), "red", "Only red team should have changes");
if *weight == -1 {
assert_eq!(
row.values[1],
Value::from_f64(30.0),
"Should retract old sum of 30"
);
found_retraction = true;
} else if *weight == 1 {
assert_eq!(
row.values[1],
Value::from_f64(55.0),
"Should insert new sum of 55"
);
found_insertion = true;
}
}
}
assert!(found_retraction, "Should have found retraction");
assert!(found_insertion, "Should have found insertion");
}
#[test]
fn test_count_increments_not_recounts() {
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![AggregateFunction::Count],
vec![
"item_id".to_string(),
"category".to_string(),
"price".to_string(),
],
)
.unwrap();
agg.set_tracker(tracker.clone());
let mut initial = Delta::new();
for i in 0..100 {
let category = format!("cat_{}", i / 10);
initial.insert(
i,
vec![
Value::from_i64(i),
Value::Text(Text::new(category)),
Value::from_i64(i * 10),
],
);
}
pager
.io
.block(|| agg.commit((&initial).into(), &mut cursors))
.unwrap();
tracker.lock().aggregation_updates = 0;
let mut delta = Delta::new();
delta.insert(
100,
vec![
Value::from_i64(100),
Value::Text(Text::new("cat_0")),
Value::from_i64(1000),
],
);
pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(tracker.lock().aggregation_updates, 1);
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let cat_0 = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text(Text::new("cat_0")))
.unwrap();
assert_eq!(cat_0.0.values[1], Value::from_i64(11));
let t = tracker.lock();
assert_incremental(&t, 2, 101);
}
#[test]
fn test_sum_updates_incrementally() {
let tracker = Arc::new(Mutex::new(ComputationTracker::new()));
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![AggregateFunction::Sum(2)], vec![
"sale_id".to_string(),
"product".to_string(),
"amount".to_string(),
],
)
.unwrap();
agg.set_tracker(tracker.clone());
let mut initial = Delta::new();
initial.insert(
1,
vec![
Value::from_i64(1),
Value::Text(Text::new("Widget")),
Value::from_i64(100),
],
);
initial.insert(
2,
vec![
Value::from_i64(2),
Value::Text(Text::new("Gadget")),
Value::from_i64(200),
],
);
initial.insert(
3,
vec![
Value::from_i64(3),
Value::Text(Text::new("Widget")),
Value::from_i64(150),
],
);
pager
.io
.block(|| agg.commit((&initial).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let widget_sum = state
.changes
.iter()
.find(|(c, _)| c.values[0] == Value::Text(Text::new("Widget")))
.map(|(c, _)| c)
.unwrap();
assert_eq!(widget_sum.values[1], Value::from_i64(250));
tracker.lock().aggregation_updates = 0;
let mut delta = Delta::new();
delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text(Text::new("Widget")),
Value::from_i64(50),
],
);
pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(tracker.lock().aggregation_updates, 1);
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let widget = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text(Text::new("Widget")))
.unwrap();
assert_eq!(widget.0.values[1], Value::from_i64(300));
}
#[test]
fn test_count_and_sum_together() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![
AggregateFunction::Count,
AggregateFunction::Sum(2), ],
vec![
"order_id".to_string(),
"user_id".to_string(),
"amount".to_string(),
],
)
.unwrap();
let mut initial = Delta::new();
initial.insert(
1,
vec![Value::from_i64(1), Value::from_i64(1), Value::from_i64(100)],
);
initial.insert(
2,
vec![Value::from_i64(2), Value::from_i64(1), Value::from_i64(200)],
);
initial.insert(
3,
vec![Value::from_i64(3), Value::from_i64(2), Value::from_i64(150)],
);
pager
.io
.block(|| agg.commit((&initial).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 2);
let user1 = state
.changes
.iter()
.find(|(c, _)| c.values[0] == Value::from_i64(1))
.map(|(c, _)| c)
.unwrap();
assert_eq!(user1.values[1], Value::from_i64(2)); assert_eq!(user1.values[2], Value::from_i64(300));
let user2 = state
.changes
.iter()
.find(|(c, _)| c.values[0] == Value::from_i64(2))
.map(|(c, _)| c)
.unwrap();
assert_eq!(user2.values[1], Value::from_i64(1)); assert_eq!(user2.values[2], Value::from_i64(150));
let mut delta = Delta::new();
delta.insert(
4,
vec![Value::from_i64(4), Value::from_i64(1), Value::from_i64(50)],
);
pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let user1 = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::from_i64(1))
.unwrap();
assert_eq!(user1.0.values[1], Value::from_i64(3)); assert_eq!(user1.0.values[2], Value::from_i64(350)); }
#[test]
fn test_avg_maintains_sum_and_count() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![AggregateFunction::Avg(2)], vec![
"id".to_string(),
"category".to_string(),
"value".to_string(),
],
)
.unwrap();
let mut initial = Delta::new();
initial.insert(
1,
vec![
Value::from_i64(1),
Value::Text(Text::new("A")),
Value::from_i64(10),
],
);
initial.insert(
2,
vec![
Value::from_i64(2),
Value::Text(Text::new("A")),
Value::from_i64(20),
],
);
initial.insert(
3,
vec![
Value::from_i64(3),
Value::Text(Text::new("B")),
Value::from_i64(30),
],
);
pager
.io
.block(|| agg.commit((&initial).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let cat_a = state
.changes
.iter()
.find(|(c, _)| c.values[0] == Value::Text(Text::new("A")))
.map(|(c, _)| c)
.unwrap();
assert_eq!(cat_a.values[1], Value::from_f64(15.0));
let cat_b = state
.changes
.iter()
.find(|(c, _)| c.values[0] == Value::Text(Text::new("B")))
.map(|(c, _)| c)
.unwrap();
assert_eq!(cat_b.values[1], Value::from_f64(30.0));
let mut delta = Delta::new();
delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text(Text::new("A")),
Value::from_i64(30),
],
);
pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let cat_a = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text(Text::new("A")))
.unwrap();
assert_eq!(cat_a.0.values[1], Value::from_f64(20.0));
}
#[test]
fn test_delete_updates_aggregates() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![
AggregateFunction::Count,
AggregateFunction::Sum(2), ],
vec![
"id".to_string(),
"category".to_string(),
"value".to_string(),
],
)
.unwrap();
let mut initial = Delta::new();
initial.insert(
1,
vec![
Value::from_i64(1),
Value::Text(Text::new("A")),
Value::from_i64(100),
],
);
initial.insert(
2,
vec![
Value::from_i64(2),
Value::Text(Text::new("A")),
Value::from_i64(200),
],
);
pager
.io
.block(|| agg.commit((&initial).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert!(!state.changes.is_empty());
let (row, _weight) = &state.changes[0];
assert_eq!(row.values[1], Value::from_i64(2)); assert_eq!(row.values[2], Value::from_i64(300));
let mut delta = Delta::new();
delta.delete(
1,
vec![
Value::from_i64(1),
Value::Text(Text::new("A")),
Value::from_i64(100),
],
);
pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let cat_a = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text(Text::new("A")))
.unwrap();
assert_eq!(cat_a.0.values[1], Value::from_i64(1)); assert_eq!(cat_a.0.values[2], Value::from_i64(200)); }
#[test]
fn test_count_aggregation_with_deletions() {
let aggregates = vec![AggregateFunction::Count];
let group_by = vec![0]; let input_columns = vec!["category".to_string(), "value".to_string()];
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, group_by,
aggregates,
input_columns,
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::Text("A".into()), Value::from_i64(10)]);
init_data.insert(2, vec![Value::Text("A".into()), Value::from_i64(20)]);
init_data.insert(3, vec![Value::Text("B".into()), Value::from_i64(30)]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 2);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
let group_b = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("B".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(2)); assert_eq!(group_b.0.values[1], Value::from_i64(1));
let mut delete_delta = Delta::new();
delete_delta.delete(1, vec![Value::Text("A".into()), Value::from_i64(10)]);
let output = pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
assert_eq!(output.changes.len(), 2);
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a_final = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(group_a_final.0.values[1], Value::from_i64(1));
let mut delete_all_b = Delta::new();
delete_all_b.delete(3, vec![Value::Text("B".into()), Value::from_i64(30)]);
let output_b = pager
.io
.block(|| agg.commit((&delete_all_b).into(), &mut cursors))
.unwrap();
assert_eq!(output_b.changes.len(), 1); assert_eq!(output_b.changes[0].1, -1);
let final_state2 = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(final_state2.changes.len(), 1); assert_eq!(final_state2.changes[0].0.values[0], Value::Text("A".into()));
}
#[test]
fn test_sum_aggregation_with_deletions() {
let aggregates = vec![AggregateFunction::Sum(1)]; let group_by = vec![0]; let input_columns = vec!["category".to_string(), "value".to_string()];
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, group_by,
aggregates,
input_columns,
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::Text("A".into()), Value::from_i64(10)]);
init_data.insert(2, vec![Value::Text("A".into()), Value::from_i64(20)]);
init_data.insert(3, vec![Value::Text("B".into()), Value::from_i64(30)]);
init_data.insert(4, vec![Value::Text("B".into()), Value::from_i64(15)]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
let group_b = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("B".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(30)); assert_eq!(group_b.0.values[1], Value::from_i64(45));
let mut delete_delta = Delta::new();
delete_delta.delete(2, vec![Value::Text("A".into()), Value::from_i64(20)]);
pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(10));
let mut delete_all_b = Delta::new();
delete_all_b.delete(3, vec![Value::Text("B".into()), Value::from_i64(30)]);
delete_all_b.delete(4, vec![Value::Text("B".into()), Value::from_i64(15)]);
pager
.io
.block(|| agg.commit((&delete_all_b).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(final_state.changes.len(), 1); assert_eq!(final_state.changes[0].0.values[0], Value::Text("A".into()));
}
#[test]
fn test_avg_aggregation_with_deletions() {
let aggregates = vec![AggregateFunction::Avg(1)]; let group_by = vec![0]; let input_columns = vec!["category".to_string(), "value".to_string()];
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, group_by,
aggregates,
input_columns,
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::Text("A".into()), Value::from_i64(10)]);
init_data.insert(2, vec![Value::Text("A".into()), Value::from_i64(20)]);
init_data.insert(3, vec![Value::Text("A".into()), Value::from_i64(30)]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 1);
assert_eq!(state.changes[0].0.values[1], Value::from_f64(20.0));
let mut delete_delta = Delta::new();
delete_delta.delete(2, vec![Value::Text("A".into()), Value::from_i64(20)]);
pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes[0].0.values[1], Value::from_f64(20.0));
let mut delete_another = Delta::new();
delete_another.delete(3, vec![Value::Text("A".into()), Value::from_i64(30)]);
pager
.io
.block(|| agg.commit((&delete_another).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes[0].0.values[1], Value::from_f64(10.0)); }
#[test]
fn test_multiple_aggregations_with_deletions() {
let aggregates = vec![
AggregateFunction::Count,
AggregateFunction::Sum(1), AggregateFunction::Avg(1), ];
let group_by = vec![0]; let input_columns = vec!["category".to_string(), "value".to_string()];
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, group_by,
aggregates,
input_columns,
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::Text("A".into()), Value::from_i64(100)]);
init_data.insert(2, vec![Value::Text("A".into()), Value::from_i64(200)]);
init_data.insert(3, vec![Value::Text("B".into()), Value::from_i64(50)]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(2)); assert_eq!(group_a.0.values[2], Value::from_i64(300)); assert_eq!(group_a.0.values[3], Value::from_f64(150.0));
let mut delete_delta = Delta::new();
delete_delta.delete(1, vec![Value::Text("A".into()), Value::from_i64(100)]);
pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(1)); assert_eq!(group_a.0.values[2], Value::from_i64(200)); assert_eq!(group_a.0.values[3], Value::from_f64(200.0));
let mut insert_delta = Delta::new();
insert_delta.insert(4, vec![Value::Text("A".into()), Value::from_f64(50.5)]);
pager
.io
.block(|| agg.commit((&insert_delta).into(), &mut cursors))
.unwrap();
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let group_a = state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(group_a.0.values[1], Value::from_i64(2)); assert_eq!(group_a.0.values[2], Value::from_f64(250.5)); assert_eq!(group_a.0.values[3], Value::from_f64(125.25)); }
#[test]
fn test_filter_operator_rowid_update() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
column_idx: 1, value: Value::from_i64(2),
});
let mut init_data = Delta::new();
init_data.insert(3, vec![Value::from_i64(3), Value::from_i64(3)]);
let state = pager
.io
.block(|| filter.commit((&init_data).into(), &mut cursors))
.unwrap();
assert_eq!(state.changes.len(), 1);
assert_eq!(state.changes[0].0.rowid, 3);
assert_eq!(
state.changes[0].0.values,
vec![Value::from_i64(3), Value::from_i64(3)]
);
let mut update_delta = Delta::new();
update_delta.delete(3, vec![Value::from_i64(3), Value::from_i64(3)]);
update_delta.insert(1, vec![Value::from_i64(1), Value::from_i64(3)]);
let output = pager
.io
.block(|| filter.commit((&update_delta).into(), &mut cursors))
.unwrap();
assert_eq!(output.changes.len(), 2);
assert_eq!(output.changes[0].1, -1); assert_eq!(output.changes[1].1, 1); }
#[test]
fn test_filter_eval_with_uncommitted() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut filter = FilterOperator::new(FilterPredicate::GreaterThan {
column_idx: 2, value: Value::from_i64(25),
});
let mut init_data = Delta::new();
init_data.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Alice".into()),
Value::from_i64(30),
],
);
init_data.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Bob".into()),
Value::from_i64(20),
],
);
let state = pager
.io
.block(|| filter.commit((&init_data).into(), &mut cursors))
.unwrap();
assert_eq!(state.changes.len(), 1);
assert_eq!(state.changes[0].0.rowid, 1);
let mut uncommitted = Delta::new();
uncommitted.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Charlie".into()),
Value::from_i64(35),
],
);
uncommitted.insert(
4,
vec![
Value::from_i64(4),
Value::Text("David".into()),
Value::from_i64(15),
],
);
let mut eval_state = uncommitted.clone().into();
let result = pager
.io
.block(|| filter.eval(&mut eval_state, &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
1,
"Only Charlie (35) should pass filter"
);
assert_eq!(result.changes[0].0.rowid, 3);
let state = pager
.io
.block(|| filter.commit((&uncommitted).into(), &mut cursors))
.unwrap();
assert_eq!(
state.changes.len(),
1,
"State should now have Alice and Charlie"
);
}
#[test]
fn test_aggregate_eval_with_uncommitted_preserves_state() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![
AggregateFunction::Count,
AggregateFunction::Sum(2), ],
vec![
"id".to_string(),
"category".to_string(),
"amount".to_string(),
],
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(
1,
vec![
Value::from_i64(1),
Value::Text("A".into()),
Value::from_i64(100),
],
);
init_data.insert(
2,
vec![
Value::from_i64(2),
Value::Text("A".into()),
Value::from_i64(200),
],
);
init_data.insert(
3,
vec![
Value::from_i64(3),
Value::Text("B".into()),
Value::from_i64(150),
],
);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(initial_state.changes.len(), 2);
let initial_a = initial_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(initial_a.0.values[1], Value::from_i64(2)); assert_eq!(initial_a.0.values[2], Value::from_f64(300.0));
let mut uncommitted = Delta::new();
uncommitted.insert(
4,
vec![
Value::from_i64(4),
Value::Text("A".into()),
Value::from_i64(50),
],
);
uncommitted.insert(
5,
vec![
Value::from_i64(5),
Value::Text("C".into()),
Value::from_i64(75),
],
);
let mut eval_state = uncommitted.clone().into();
let result = pager
.io
.block(|| agg.eval(&mut eval_state, &mut cursors))
.unwrap();
assert!(!result.changes.is_empty(), "Should have aggregate changes");
let state_after_eval = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(
state_after_eval.changes.len(),
2,
"State should still have only A and B"
);
let a_after_eval = state_after_eval
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(
a_after_eval.0.values[1],
Value::from_i64(2),
"A count should still be 2"
);
assert_eq!(
a_after_eval.0.values[2],
Value::from_f64(300.0),
"A sum should still be 300"
);
pager
.io
.block(|| agg.commit((&uncommitted).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(final_state.changes.len(), 3, "Should now have A, B, and C");
let a_final = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("A".into()))
.unwrap();
assert_eq!(
a_final.0.values[1],
Value::from_i64(3),
"A count should now be 3"
);
assert_eq!(
a_final.0.values[2],
Value::from_f64(350.0),
"A sum should now be 350"
);
let c_final = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("C".into()))
.unwrap();
assert_eq!(
c_final.0.values[1],
Value::from_i64(1),
"C count should be 1"
);
assert_eq!(
c_final.0.values[2],
Value::from_f64(75.0),
"C sum should be 75"
);
}
#[test]
fn test_aggregate_eval_multiple_times_without_commit() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Count,
AggregateFunction::Sum(1), ],
vec!["id".to_string(), "value".to_string()],
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::from_i64(1), Value::from_i64(100)]);
init_data.insert(2, vec![Value::from_i64(2), Value::from_i64(200)]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let initial_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(initial_state.changes.len(), 1);
assert_eq!(initial_state.changes[0].0.values[0], Value::from_i64(2));
assert_eq!(initial_state.changes[0].0.values[1], Value::from_f64(300.0));
let mut uncommitted1 = Delta::new();
uncommitted1.insert(3, vec![Value::from_i64(3), Value::from_i64(50)]);
let mut eval_state1 = uncommitted1.clone().into();
let _ = pager
.io
.block(|| agg.eval(&mut eval_state1, &mut cursors))
.unwrap();
let state1 = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state1.changes[0].0.values[0], Value::from_i64(2));
assert_eq!(state1.changes[0].0.values[1], Value::from_f64(300.0));
let mut uncommitted2 = Delta::new();
uncommitted2.insert(4, vec![Value::from_i64(4), Value::from_i64(75)]);
uncommitted2.insert(5, vec![Value::from_i64(5), Value::from_i64(25)]);
let mut eval_state2 = uncommitted2.clone().into();
let _ = pager
.io
.block(|| agg.eval(&mut eval_state2, &mut cursors))
.unwrap();
let state2 = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state2.changes[0].0.values[0], Value::from_i64(2));
assert_eq!(state2.changes[0].0.values[1], Value::from_f64(300.0));
let mut uncommitted3 = Delta::new();
uncommitted3.delete(1, vec![Value::from_i64(1), Value::from_i64(100)]);
let mut eval_state3 = uncommitted3.clone().into();
let _ = pager
.io
.block(|| agg.eval(&mut eval_state3, &mut cursors))
.unwrap();
let state3 = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state3.changes[0].0.values[0], Value::from_i64(2));
assert_eq!(state3.changes[0].0.values[1], Value::from_f64(300.0));
}
#[test]
fn test_aggregate_eval_with_mixed_committed_and_uncommitted() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![AggregateFunction::Count],
vec!["id".to_string(), "type".to_string()],
)
.unwrap();
let mut init_data = Delta::new();
init_data.insert(1, vec![Value::from_i64(1), Value::Text("X".into())]);
init_data.insert(2, vec![Value::from_i64(2), Value::Text("Y".into())]);
pager
.io
.block(|| agg.commit((&init_data).into(), &mut cursors))
.unwrap();
let mut committed_delta = Delta::new();
committed_delta.insert(3, vec![Value::from_i64(3), Value::Text("X".into())]);
let mut uncommitted = Delta::new();
uncommitted.insert(4, vec![Value::from_i64(4), Value::Text("Y".into())]);
uncommitted.insert(5, vec![Value::from_i64(5), Value::Text("Z".into())]);
let mut combined = committed_delta.clone();
combined.merge(&uncommitted);
let mut eval_state = combined.clone().into();
let result = pager
.io
.block(|| agg.eval(&mut eval_state, &mut cursors))
.unwrap();
assert!(!result.changes.is_empty(), "Result should not be empty");
assert_eq!(
result.changes.len(),
5,
"Should have 5 changes (2 retractions + 3 insertions)"
);
let mut sorted_changes: Vec<_> = result.changes.iter().collect();
sorted_changes.sort_by(|a, b| {
let a_group = &a.0.values[0];
let b_group = &b.0.values[0];
match a_group.partial_cmp(b_group).unwrap() {
std::cmp::Ordering::Equal => a.1.cmp(&b.1), other => other,
}
});
assert_eq!(sorted_changes[0].0.values[0], Value::Text("X".into()));
assert_eq!(sorted_changes[0].0.values[1], Value::from_i64(1)); assert_eq!(sorted_changes[0].1, -1);
assert_eq!(sorted_changes[1].0.values[0], Value::Text("X".into()));
assert_eq!(sorted_changes[1].0.values[1], Value::from_i64(2)); assert_eq!(sorted_changes[1].1, 1);
assert_eq!(sorted_changes[2].0.values[0], Value::Text("Y".into()));
assert_eq!(sorted_changes[2].0.values[1], Value::from_i64(1)); assert_eq!(sorted_changes[2].1, -1);
assert_eq!(sorted_changes[3].0.values[0], Value::Text("Y".into()));
assert_eq!(sorted_changes[3].0.values[1], Value::from_i64(2)); assert_eq!(sorted_changes[3].1, 1);
assert_eq!(sorted_changes[4].0.values[0], Value::Text("Z".into()));
assert_eq!(sorted_changes[4].0.values[1], Value::from_i64(1)); assert_eq!(sorted_changes[4].1, 1);
let state = get_current_state_from_btree(&agg, &pager, &mut cursors);
assert_eq!(state.changes.len(), 2, "Should still have only X and Y");
pager
.io
.block(|| agg.commit((&committed_delta).into(), &mut cursors))
.unwrap();
let final_state = get_current_state_from_btree(&agg, &pager, &mut cursors);
let x = final_state
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("X".into()))
.unwrap();
assert_eq!(x.0.values[1], Value::from_i64(2));
}
#[test]
fn test_min_max_basic() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Banana".into()),
Value::from_f64(0.75),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
let result = pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::from_f64(0.75)); assert_eq!(row.values[1], Value::from_f64(3.50)); }
#[test]
fn test_min_max_deletion_updates_min() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Banana".into()),
Value::from_f64(0.75),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let mut delete_delta = Delta::new();
delete_delta.delete(
2,
vec![
Value::from_i64(2),
Value::Text("Banana".into()),
Value::from_f64(0.75),
],
);
let result = pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let retraction = result.changes.iter().find(|(_, w)| *w == -1).unwrap();
assert_eq!(retraction.0.values[0], Value::from_f64(0.75)); assert_eq!(retraction.0.values[1], Value::from_f64(3.50));
let new_values = result.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_values.0.values[0], Value::from_f64(1.50)); assert_eq!(new_values.0.values[1], Value::from_f64(3.50)); }
#[test]
fn test_min_max_deletion_updates_max() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Banana".into()),
Value::from_f64(0.75),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let mut delete_delta = Delta::new();
delete_delta.delete(
4,
vec![
Value::from_i64(4),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
let result = pager
.io
.block(|| agg.commit((&delete_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let retraction = result.changes.iter().find(|(_, w)| *w == -1).unwrap();
assert_eq!(retraction.0.values[0], Value::from_f64(0.75)); assert_eq!(retraction.0.values[1], Value::from_f64(3.50));
let new_values = result.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_values.0.values[0], Value::from_f64(0.75)); assert_eq!(new_values.0.values[1], Value::from_f64(2.00)); }
#[test]
fn test_min_max_insertion_updates_min() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let mut insert_delta = Delta::new();
insert_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Lemon".into()),
Value::from_f64(0.50),
],
);
let result = pager
.io
.block(|| agg.commit((&insert_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let retraction = result.changes.iter().find(|(_, w)| *w == -1).unwrap();
assert_eq!(retraction.0.values[0], Value::from_f64(1.50)); assert_eq!(retraction.0.values[1], Value::from_f64(3.50));
let new_values = result.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_values.0.values[0], Value::from_f64(0.50)); assert_eq!(new_values.0.values[1], Value::from_f64(3.50)); }
#[test]
fn test_min_max_insertion_updates_max() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let mut insert_delta = Delta::new();
insert_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Melon".into()),
Value::from_f64(5.00),
],
);
let result = pager
.io
.block(|| agg.commit((&insert_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let retraction = result.changes.iter().find(|(_, w)| *w == -1).unwrap();
assert_eq!(retraction.0.values[0], Value::from_f64(1.50)); assert_eq!(retraction.0.values[1], Value::from_f64(3.50));
let new_values = result.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_values.0.values[0], Value::from_f64(1.50)); assert_eq!(new_values.0.values[1], Value::from_f64(5.00)); }
#[test]
fn test_min_max_update_changes_min() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
let mut update_delta = Delta::new();
update_delta.delete(
2,
vec![
Value::from_i64(2),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
update_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Orange".into()),
Value::from_f64(0.25),
],
);
let result = pager
.io
.block(|| agg.commit((&update_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let retraction = result.changes.iter().find(|(_, w)| *w == -1).unwrap();
assert_eq!(retraction.0.values[0], Value::from_f64(1.50)); assert_eq!(retraction.0.values[1], Value::from_f64(3.50));
let new_values = result.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_values.0.values[0], Value::from_f64(0.25)); assert_eq!(new_values.0.values[1], Value::from_f64(3.50)); }
#[test]
fn test_min_max_with_group_by() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![1], vec![
AggregateFunction::Min(3), AggregateFunction::Max(3), ],
vec![
"id".to_string(),
"category".to_string(),
"name".to_string(),
"price".to_string(),
],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("fruit".into()),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("fruit".into()),
Value::Text("Banana".into()),
Value::from_f64(0.75),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("fruit".into()),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("veggie".into()),
Value::Text("Carrot".into()),
Value::from_f64(0.50),
],
);
initial_delta.insert(
5,
vec![
Value::from_i64(5),
Value::Text("veggie".into()),
Value::Text("Lettuce".into()),
Value::from_f64(1.25),
],
);
let result = pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 2);
let fruit = result
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("fruit".into()))
.unwrap();
assert_eq!(fruit.1, 1); assert_eq!(fruit.0.values[1], Value::from_f64(0.75)); assert_eq!(fruit.0.values[2], Value::from_f64(2.00));
let veggie = result
.changes
.iter()
.find(|(row, _)| row.values[0] == Value::Text("veggie".into()))
.unwrap();
assert_eq!(veggie.1, 1); assert_eq!(veggie.0.values[1], Value::from_f64(0.50)); assert_eq!(veggie.0.values[2], Value::from_f64(1.25)); }
#[test]
fn test_min_max_with_nulls() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "price".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Apple".into()),
Value::from_f64(1.50),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Unknown1".into()),
Value::Null,
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Orange".into()),
Value::from_f64(2.00),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Unknown2".into()),
Value::Null,
],
);
initial_delta.insert(
5,
vec![
Value::from_i64(5),
Value::Text("Grape".into()),
Value::from_f64(3.50),
],
);
let result = pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::from_f64(1.50)); assert_eq!(row.values[1], Value::from_f64(3.50)); }
#[test]
fn test_min_max_integer_values() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(2), AggregateFunction::Max(2), ],
vec!["id".to_string(), "name".to_string(), "score".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(
1,
vec![
Value::from_i64(1),
Value::Text("Alice".into()),
Value::from_i64(85),
],
);
initial_delta.insert(
2,
vec![
Value::from_i64(2),
Value::Text("Bob".into()),
Value::from_i64(92),
],
);
initial_delta.insert(
3,
vec![
Value::from_i64(3),
Value::Text("Carol".into()),
Value::from_i64(78),
],
);
initial_delta.insert(
4,
vec![
Value::from_i64(4),
Value::Text("Dave".into()),
Value::from_i64(95),
],
);
let result = pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::from_i64(78)); assert_eq!(row.values[1], Value::from_i64(95)); }
#[test]
fn test_min_max_text_values() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(1), AggregateFunction::Max(1), ],
vec!["id".to_string(), "name".to_string()],
)
.unwrap();
let mut initial_delta = Delta::new();
initial_delta.insert(1, vec![Value::from_i64(1), Value::Text("Charlie".into())]);
initial_delta.insert(2, vec![Value::from_i64(2), Value::Text("Alice".into())]);
initial_delta.insert(3, vec![Value::from_i64(3), Value::Text("Bob".into())]);
initial_delta.insert(4, vec![Value::from_i64(4), Value::Text("David".into())]);
let result = pager
.io
.block(|| agg.commit((&initial_delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::Text("Alice".into())); assert_eq!(row.values[1], Value::Text("David".into())); }
#[test]
fn test_min_max_with_other_aggregates() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Count,
AggregateFunction::Sum(1), AggregateFunction::Min(1), AggregateFunction::Max(1), AggregateFunction::Avg(1), ],
vec!["id".to_string(), "value".to_string()],
)
.unwrap();
let mut delta = Delta::new();
delta.insert(1, vec![Value::from_i64(1), Value::from_i64(10)]);
delta.insert(2, vec![Value::from_i64(2), Value::from_i64(5)]);
delta.insert(3, vec![Value::from_i64(3), Value::from_i64(15)]);
delta.insert(4, vec![Value::from_i64(4), Value::from_i64(20)]);
let result = pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::from_i64(4)); assert_eq!(row.values[1], Value::from_i64(50)); assert_eq!(row.values[2], Value::from_i64(5)); assert_eq!(row.values[3], Value::from_i64(20)); assert_eq!(row.values[4], Value::from_f64(12.5));
let mut delta2 = Delta::new();
delta2.delete(2, vec![Value::from_i64(2), Value::from_i64(5)]);
let result2 = pager
.io
.block(|| agg.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 2);
let (row_del, weight_del) = &result2.changes[0];
assert_eq!(*weight_del, -1);
assert_eq!(row_del.values[0], Value::from_i64(4)); assert_eq!(row_del.values[1], Value::from_i64(50)); assert_eq!(row_del.values[2], Value::from_i64(5)); assert_eq!(row_del.values[3], Value::from_i64(20)); assert_eq!(row_del.values[4], Value::from_f64(12.5));
let (row_ins, weight_ins) = &result2.changes[1];
assert_eq!(*weight_ins, 1);
assert_eq!(row_ins.values[0], Value::from_i64(3)); assert_eq!(row_ins.values[1], Value::from_i64(45)); assert_eq!(row_ins.values[2], Value::from_i64(10)); assert_eq!(row_ins.values[3], Value::from_i64(20)); assert_eq!(row_ins.values[4], Value::from_f64(15.0));
let mut delta3 = Delta::new();
delta3.delete(4, vec![Value::from_i64(4), Value::from_i64(20)]);
let result3 = pager
.io
.block(|| agg.commit((&delta3).into(), &mut cursors))
.unwrap();
assert_eq!(result3.changes.len(), 2);
let (row_del2, weight_del2) = &result3.changes[0];
assert_eq!(*weight_del2, -1);
assert_eq!(row_del2.values[3], Value::from_i64(20));
let (row_ins2, weight_ins2) = &result3.changes[1];
assert_eq!(*weight_ins2, 1);
assert_eq!(row_ins2.values[0], Value::from_i64(2)); assert_eq!(row_ins2.values[1], Value::from_i64(25)); assert_eq!(row_ins2.values[2], Value::from_i64(10)); assert_eq!(row_ins2.values[3], Value::from_i64(15)); assert_eq!(row_ins2.values[4], Value::from_f64(12.5)); }
#[test]
fn test_min_max_multiple_columns() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg = AggregateOperator::new(
1, vec![], vec![
AggregateFunction::Min(0), AggregateFunction::Max(1), AggregateFunction::Min(2), ],
vec!["col1".to_string(), "col2".to_string(), "col3".to_string()],
)
.unwrap();
let mut delta = Delta::new();
delta.insert(
1,
vec![
Value::from_i64(10),
Value::from_i64(100),
Value::from_i64(1000),
],
);
delta.insert(
2,
vec![
Value::from_i64(5),
Value::from_i64(200),
Value::from_i64(2000),
],
);
delta.insert(
3,
vec![
Value::from_i64(15),
Value::from_i64(150),
Value::from_i64(500),
],
);
let result = pager
.io
.block(|| agg.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 1);
let (row, weight) = &result.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::from_i64(5)); assert_eq!(row.values[1], Value::from_i64(200)); assert_eq!(row.values[2], Value::from_i64(500));
let mut delta2 = Delta::new();
delta2.delete(
2,
vec![
Value::from_i64(5),
Value::from_i64(200),
Value::from_i64(2000),
],
);
let result2 = pager
.io
.block(|| agg.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 2);
let (row_del, weight_del) = &result2.changes[0];
assert_eq!(*weight_del, -1);
assert_eq!(row_del.values[0], Value::from_i64(5)); assert_eq!(row_del.values[1], Value::from_i64(200)); assert_eq!(row_del.values[2], Value::from_i64(500));
let (row_ins, weight_ins) = &result2.changes[1];
assert_eq!(*weight_ins, 1);
assert_eq!(row_ins.values[0], Value::from_i64(10)); assert_eq!(row_ins.values[1], Value::from_i64(150)); assert_eq!(row_ins.values[2], Value::from_i64(500)); }
#[test]
fn test_join_operator_inner() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0],
vec!["customer_id".to_string(), "amount".to_string()],
vec!["id".to_string(), "name".to_string()],
)
.unwrap();
let mut left_delta = Delta::new();
left_delta.insert(1, vec![Value::from_i64(1), Value::from_f64(100.0)]);
left_delta.insert(2, vec![Value::from_i64(2), Value::from_f64(200.0)]);
left_delta.insert(3, vec![Value::from_i64(3), Value::from_f64(300.0)]); let mut right_delta = Delta::new();
right_delta.insert(1, vec![Value::from_i64(1), Value::Text("Alice".into())]);
right_delta.insert(2, vec![Value::from_i64(2), Value::Text("Bob".into())]);
right_delta.insert(4, vec![Value::from_i64(4), Value::Text("David".into())]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
2,
"First commit should produce 2 matches"
);
let mut results: Vec<_> = result.changes;
results.sort_by_key(|r| r.0.values[0].clone());
assert_eq!(results[0].0.values[0], Value::from_i64(1));
assert_eq!(results[0].0.values[3], Value::Text("Alice".into()));
assert_eq!(results[1].0.values[0], Value::from_i64(2));
assert_eq!(results[1].0.values[3], Value::Text("Bob".into()));
let mut left_delta2 = Delta::new();
left_delta2.insert(5, vec![Value::from_i64(4), Value::from_f64(400.0)]);
let mut right_delta2 = Delta::new();
right_delta2.insert(6, vec![Value::from_i64(3), Value::Text("Charlie".into())]);
let delta_pair2 = DeltaPair::new(left_delta2, right_delta2);
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
2,
"Second commit should produce 2 new matches from incremental join. Got: {:?}",
result2.changes
);
let mut results2: Vec<_> = result2.changes;
results2.sort_by_key(|r| r.0.values[0].clone());
let charlie_match = results2
.iter()
.find(|(row, _)| row.values[0] == Value::from_i64(3))
.expect("Should find customer 3 joined with new Charlie");
assert_eq!(charlie_match.0.values[2], Value::from_i64(3));
assert_eq!(charlie_match.0.values[3], Value::Text("Charlie".into()));
let david_match = results2
.iter()
.find(|(row, _)| row.values[0] == Value::from_i64(4))
.expect("Should find new customer 4 joined with existing David");
assert_eq!(david_match.0.values[0], Value::from_i64(4));
assert_eq!(david_match.0.values[3], Value::Text("David".into()));
}
#[test]
fn test_join_operator_with_deletions() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0],
vec!["customer_id".to_string(), "amount".to_string()],
vec!["id".to_string(), "name".to_string()],
)
.unwrap();
let mut left_delta = Delta::new();
left_delta.insert(1, vec![Value::from_i64(1), Value::from_f64(100.0)]);
left_delta.insert(2, vec![Value::from_i64(2), Value::from_f64(200.0)]);
left_delta.insert(3, vec![Value::from_i64(3), Value::from_f64(300.0)]);
let mut right_delta = Delta::new();
right_delta.insert(1, vec![Value::from_i64(1), Value::Text("Alice".into())]);
right_delta.insert(2, vec![Value::from_i64(2), Value::Text("Bob".into())]);
right_delta.insert(3, vec![Value::from_i64(3), Value::Text("Charlie".into())]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 3, "Should have 3 initial joins");
let mut left_delta2 = Delta::new();
left_delta2.delete(2, vec![Value::from_i64(2), Value::from_f64(200.0)]);
let empty_right = Delta::new();
let delta_pair2 = DeltaPair::new(left_delta2, empty_right);
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
1,
"Should produce 1 retraction for deleted customer 2"
);
assert_eq!(
result2.changes[0].1, -1,
"Should have weight -1 for deletion"
);
assert_eq!(result2.changes[0].0.values[0], Value::from_i64(2));
assert_eq!(result2.changes[0].0.values[3], Value::Text("Bob".into()));
let empty_left = Delta::new();
let mut right_delta3 = Delta::new();
right_delta3.delete(3, vec![Value::from_i64(3), Value::Text("Charlie".into())]);
let delta_pair3 = DeltaPair::new(empty_left, right_delta3);
let result3 = pager
.io
.block(|| join.commit(delta_pair3.clone(), &mut cursors))
.unwrap();
assert_eq!(
result3.changes.len(),
1,
"Should produce 1 retraction for deleted customer 3"
);
assert_eq!(
result3.changes[0].1, -1,
"Should have weight -1 for deletion"
);
assert_eq!(result3.changes[0].0.values[0], Value::from_i64(3));
assert_eq!(result3.changes[0].0.values[2], Value::from_i64(3));
}
#[test]
fn test_join_operator_one_to_many() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0], vec![
"customer_id".to_string(),
"order_id".to_string(),
"amount".to_string(),
],
vec!["id".to_string(), "name".to_string()],
)
.unwrap();
let left_delta = Delta::new(); let mut right_delta = Delta::new();
right_delta.insert(1, vec![Value::from_i64(100), Value::Text("Alice".into())]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
0,
"Should have no joins with customer but no orders"
);
let mut left_delta2 = Delta::new();
left_delta2.insert(
1,
vec![
Value::from_i64(100),
Value::from_i64(1001),
Value::from_f64(50.0),
],
); left_delta2.insert(
2,
vec![
Value::from_i64(100),
Value::from_i64(1002),
Value::from_f64(75.0),
],
); left_delta2.insert(
3,
vec![
Value::from_i64(100),
Value::from_i64(1003),
Value::from_f64(100.0),
],
);
let right_delta2 = Delta::new();
let delta_pair2 = DeltaPair::new(left_delta2, right_delta2);
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
3,
"Should produce 3 joins for 3 orders with same customer. Got: {:?}",
result2.changes
);
for (row, weight) in &result2.changes {
assert_eq!(*weight, 1, "Weight should be 1 for insertion");
assert_eq!(
row.values[0],
Value::from_i64(100),
"Customer ID should be 100"
);
assert_eq!(
row.values[4],
Value::Text("Alice".into()),
"Customer name should be Alice"
);
let order_id = match &row.values[1] {
Value::Numeric(Numeric::Integer(id)) => *id,
_ => panic!("Expected integer order ID"),
};
assert!(
(1001..=1003).contains(&order_id),
"Order ID {order_id} should be between 1001 and 1003"
);
}
let mut left_delta3 = Delta::new();
left_delta3.delete(
2,
vec![
Value::from_i64(100),
Value::from_i64(1002),
Value::from_f64(75.0),
],
);
let delta_pair3 = DeltaPair::new(left_delta3, Delta::new());
let result3 = pager
.io
.block(|| join.commit(delta_pair3.clone(), &mut cursors))
.unwrap();
assert_eq!(result3.changes.len(), 1, "Should produce 1 retraction");
assert_eq!(result3.changes[0].1, -1, "Should be a deletion");
assert_eq!(
result3.changes[0].0.values[1],
Value::from_i64(1002),
"Should delete order 1002"
);
}
#[test]
fn test_join_operator_many_to_many() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0], vec![
"category_id".to_string(),
"product_name".to_string(),
"price".to_string(),
],
vec!["id".to_string(), "category_name".to_string()],
)
.unwrap();
let mut left_delta = Delta::new();
left_delta.insert(
1,
vec![
Value::from_i64(10),
Value::Text("Laptop".into()),
Value::from_f64(1000.0),
],
);
left_delta.insert(
2,
vec![
Value::from_i64(10),
Value::Text("Mouse".into()),
Value::from_f64(50.0),
],
);
left_delta.insert(
3,
vec![
Value::from_i64(10),
Value::Text("Keyboard".into()),
Value::from_f64(100.0),
],
);
let mut right_delta = Delta::new();
right_delta.insert(
1,
vec![Value::from_i64(10), Value::Text("Electronics".into())],
);
right_delta.insert(
2,
vec![Value::from_i64(10), Value::Text("Computers".into())],
);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
6,
"Should produce 6 joins (3 products × 2 category records). Got: {:?}",
result.changes
);
let mut found_combinations = HashSet::default();
for (row, weight) in &result.changes {
assert_eq!(*weight, 1);
let product = row.values[1].to_string();
let category = row.values[4].to_string();
found_combinations.insert((product, category));
}
assert_eq!(
found_combinations.len(),
6,
"Should have 6 unique combinations"
);
let mut left_delta2 = Delta::new();
left_delta2.insert(
4,
vec![
Value::from_i64(10),
Value::Text("Monitor".into()),
Value::from_f64(500.0),
],
);
let delta_pair2 = DeltaPair::new(left_delta2, Delta::new());
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
2,
"New product should join with 2 existing category records"
);
for (row, _) in &result2.changes {
assert_eq!(row.values[1], Value::Text("Monitor".into()));
}
}
#[test]
fn test_join_operator_update_in_one_to_many() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0], vec![
"customer_id".to_string(),
"order_id".to_string(),
"amount".to_string(),
],
vec!["id".to_string(), "name".to_string()],
)
.unwrap();
let mut left_delta = Delta::new();
left_delta.insert(
1,
vec![
Value::from_i64(100),
Value::from_i64(1001),
Value::from_f64(50.0),
],
);
left_delta.insert(
2,
vec![
Value::from_i64(100),
Value::from_i64(1002),
Value::from_f64(75.0),
],
);
left_delta.insert(
3,
vec![
Value::from_i64(100),
Value::from_i64(1003),
Value::from_f64(100.0),
],
);
let mut right_delta = Delta::new();
right_delta.insert(1, vec![Value::from_i64(100), Value::Text("Alice".into())]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(result.changes.len(), 3, "Should have 3 initial joins");
let mut right_delta2 = Delta::new();
right_delta2.delete(1, vec![Value::from_i64(100), Value::Text("Alice".into())]);
right_delta2.insert(
1,
vec![Value::from_i64(100), Value::Text("Alice Smith".into())],
);
let delta_pair2 = DeltaPair::new(Delta::new(), right_delta2);
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 6,
"Should produce 6 changes (3 deletions + 3 insertions) when updating customer with 3 orders");
let deletions: Vec<_> = result2.changes.iter().filter(|(_, w)| *w == -1).collect();
let insertions: Vec<_> = result2.changes.iter().filter(|(_, w)| *w == 1).collect();
assert_eq!(deletions.len(), 3, "Should have 3 deletions");
assert_eq!(insertions.len(), 3, "Should have 3 insertions");
for (row, _) in &deletions {
assert_eq!(
row.values[4],
Value::Text("Alice".into()),
"Deletions should have old name"
);
}
for (row, _) in &insertions {
assert_eq!(
row.values[4],
Value::Text("Alice Smith".into()),
"Insertions should have new name"
);
}
let mut order_ids = HashSet::default();
for (row, _) in &insertions {
if let Value::Numeric(Numeric::Integer(order_id)) = &row.values[1] {
order_ids.insert(*order_id);
}
}
assert_eq!(
order_ids.len(),
3,
"Should still have all 3 order IDs after update"
);
assert!(order_ids.contains(&1001));
assert!(order_ids.contains(&1002));
assert!(order_ids.contains(&1003));
}
#[test]
fn test_join_operator_weight_accumulation_complex() {
let (pager, table_page_id, index_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_page_id, 10);
let index_def = create_dbsp_state_index(index_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_page_id, &index_def, 10);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut join = JoinOperator::new(
1, JoinType::Inner,
vec![0], vec![0],
vec!["key".to_string(), "val_left".to_string()],
vec!["key".to_string(), "val_right".to_string()],
)
.unwrap();
let mut left_delta = Delta::new();
left_delta.insert(1, vec![Value::from_i64(10), Value::Text("A".into())]);
left_delta.insert(2, vec![Value::from_i64(10), Value::Text("A".into())]);
left_delta.insert(3, vec![Value::from_i64(10), Value::Text("A".into())]);
let mut right_delta = Delta::new();
right_delta.insert(4, vec![Value::from_i64(10), Value::Text("B".into())]);
right_delta.insert(5, vec![Value::from_i64(10), Value::Text("B".into())]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = pager
.io
.block(|| join.commit(delta_pair.clone(), &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
6,
"Should produce 6 joins (3 left rows × 2 right rows)"
);
for (_, weight) in &result.changes {
assert_eq!(*weight, 1);
}
let mut left_delta2 = Delta::new();
left_delta2.delete(2, vec![Value::from_i64(10), Value::Text("A".into())]);
let delta_pair2 = DeltaPair::new(left_delta2, Delta::new());
let result2 = pager
.io
.block(|| join.commit(delta_pair2.clone(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
2,
"Should produce 2 retractions when deleting 1 of 3 identical left rows"
);
for (_, weight) in &result2.changes {
assert_eq!(*weight, -1, "Should be retractions");
}
}
#[test]
fn test_join_produces_all_expected_results() {
let mut join = JoinOperator::new(
0,
JoinType::Inner,
vec![0], vec![0], vec!["id".to_string(), "name".to_string()],
vec![
"user_id".to_string(),
"product_id".to_string(),
"quantity".to_string(),
],
)
.unwrap();
let left_delta = Delta {
changes: vec![
(
HashableRow::new(
1,
vec![Value::from_i64(1), Value::Text(Text::from("Alice"))],
),
1,
),
(
HashableRow::new(2, vec![Value::from_i64(2), Value::Text(Text::from("Bob"))]),
1,
),
],
};
let right_delta = Delta {
changes: vec![
(
HashableRow::new(
1,
vec![Value::from_i64(1), Value::from_i64(100), Value::from_i64(5)],
),
1,
),
(
HashableRow::new(
2,
vec![Value::from_i64(1), Value::from_i64(101), Value::from_i64(3)],
),
1,
),
(
HashableRow::new(
3,
vec![Value::from_i64(2), Value::from_i64(100), Value::from_i64(7)],
),
1,
),
],
};
let delta_pair = DeltaPair::new(left_delta, right_delta);
let mut state = EvalState::Init { deltas: delta_pair };
let (pager, table_root, index_root) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root, 5);
let index_def = create_dbsp_state_index(index_root);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let result = pager
.io
.block(|| join.eval(&mut state, &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
3,
"Should produce 3 joined rows (Alice×2 + Bob×1)"
);
let mut expected_results = HashSet::default();
expected_results.insert(("Alice".to_string(), 5));
expected_results.insert(("Alice".to_string(), 3));
expected_results.insert(("Bob".to_string(), 7));
let mut actual_results = HashSet::default();
for (row, weight) in &result.changes {
assert_eq!(*weight, 1, "All results should have weight 1");
let name = match &row.values[1] {
Value::Text(t) => t.as_str().to_string(),
_ => panic!("Expected text value for name"),
};
let quantity = match &row.values[4] {
Value::Numeric(Numeric::Integer(q)) => *q,
_ => panic!("Expected integer value for quantity"),
};
actual_results.insert((name, quantity));
}
assert_eq!(
expected_results, actual_results,
"Join should produce all expected results. Expected: {expected_results:?}, Got: {actual_results:?}",
);
let mut seen_rowids = HashSet::default();
for (row, _) in &result.changes {
let was_new = seen_rowids.insert(row.rowid);
assert!(was_new, "Duplicate rowid found: {}. This would cause rows to overwrite each other in btree storage!", row.rowid);
}
}
use crate::incremental::merge_operator::{MergeOperator, UnionMode};
#[test]
fn test_merge_operator_basic() {
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(_pager, index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut merge_op = MergeOperator::new(
1,
UnionMode::All {
left_table: "table1".to_string(),
right_table: "table2".to_string(),
},
);
let mut left_delta = Delta::new();
left_delta.insert(1, vec![Value::from_i64(1)]);
left_delta.insert(2, vec![Value::from_i64(2)]);
let mut right_delta = Delta::new();
right_delta.insert(3, vec![Value::from_i64(3)]);
right_delta.insert(4, vec![Value::from_i64(4)]);
let delta_pair = DeltaPair::new(left_delta, right_delta);
let result = merge_op.commit(delta_pair, &mut cursors).unwrap();
if let IOResult::Done(merged) = result {
assert_eq!(merged.len(), 4);
let values: Vec<i64> = merged
.changes
.iter()
.filter_map(|(row, weight)| {
if *weight > 0 && !row.values.is_empty() {
if let Value::Numeric(Numeric::Integer(n)) = &row.values[0] {
Some(*n)
} else {
None
}
} else {
None
}
})
.collect();
assert!(values.contains(&1));
assert!(values.contains(&2));
assert!(values.contains(&3));
assert!(values.contains(&4));
} else {
panic!("Expected Done result");
}
}
#[test]
fn test_merge_operator_stateful_distinct() {
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(_pager, index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut merge_op = MergeOperator::new(7, UnionMode::Distinct);
let mut left_delta1 = Delta::new();
left_delta1.insert(1, vec![Value::from_i64(1)]);
left_delta1.insert(2, vec![Value::from_i64(2)]);
left_delta1.insert(3, vec![Value::from_i64(3)]);
let mut right_delta1 = Delta::new();
right_delta1.insert(4, vec![Value::from_i64(2)]); right_delta1.insert(5, vec![Value::from_i64(3)]); right_delta1.insert(6, vec![Value::from_i64(4)]);
let result1 = merge_op
.commit(DeltaPair::new(left_delta1, right_delta1), &mut cursors)
.unwrap();
if let IOResult::Done(merged1) = result1 {
assert_eq!(merged1.len(), 6);
let unique_rowids: HashSet<i64> =
merged1.changes.iter().map(|(row, _)| row.rowid).collect();
assert_eq!(
unique_rowids.len(),
4,
"Should have 4 unique rowids for 4 unique values"
);
} else {
panic!("Expected Done result");
}
let mut left_delta2 = Delta::new();
left_delta2.insert(7, vec![Value::from_i64(2)]);
let mut right_delta2 = Delta::new();
right_delta2.insert(8, vec![Value::from_i64(5)]);
let result2 = merge_op
.commit(DeltaPair::new(left_delta2, right_delta2), &mut cursors)
.unwrap();
if let IOResult::Done(merged2) = result2 {
assert_eq!(merged2.len(), 2, "Should have 2 entries in delta");
let has_existing_rowid = merged2
.changes
.iter()
.any(|(row, _)| row.values == vec![Value::from_i64(2)] && row.rowid <= 4);
assert!(has_existing_rowid, "Value 2 should reuse existing rowid");
let has_new_rowid = merged2
.changes
.iter()
.any(|(row, _)| row.values == vec![Value::from_i64(5)] && row.rowid > 4);
assert!(has_new_rowid, "Value 5 should get a new rowid");
} else {
panic!("Expected Done result");
}
}
#[test]
fn test_merge_operator_single_sided_inputs_union_all() {
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(_pager, index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut merge_op = MergeOperator::new(
10,
UnionMode::All {
left_table: "orders".to_string(),
right_table: "archived_orders".to_string(),
},
);
let mut left_delta1 = Delta::new();
left_delta1.insert(100, vec![Value::from_i64(1001)]);
left_delta1.insert(101, vec![Value::from_i64(1002)]);
let right_delta1 = Delta::new();
let result1 = merge_op
.commit(DeltaPair::new(left_delta1, right_delta1), &mut cursors)
.unwrap();
let first_rowids = if let IOResult::Done(ref merged1) = result1 {
assert_eq!(merged1.len(), 2, "Should have 2 entries from left only");
merged1
.changes
.iter()
.map(|(row, _)| row.rowid)
.collect::<Vec<_>>()
} else {
panic!("Expected Done result");
};
let left_delta2 = Delta::new();
let mut right_delta2 = Delta::new();
right_delta2.insert(100, vec![Value::from_i64(2001)]); right_delta2.insert(102, vec![Value::from_i64(2002)]);
let result2 = merge_op
.commit(DeltaPair::new(left_delta2, right_delta2), &mut cursors)
.unwrap();
let second_result_rowid_100 = if let IOResult::Done(ref merged2) = result2 {
assert_eq!(merged2.len(), 2, "Should have 2 entries from right only");
let second_rowids: Vec<i64> =
merged2.changes.iter().map(|(row, _)| row.rowid).collect();
for rowid in &second_rowids {
assert!(
!first_rowids.contains(rowid),
"Right side rowids should be different from left side rowids"
);
}
merged2
.changes
.iter()
.find(|(row, _)| row.values == vec![Value::from_i64(2001)])
.map(|(row, _)| row.rowid)
.unwrap()
} else {
panic!("Expected Done result");
};
let mut left_delta3 = Delta::new();
left_delta3.insert(100, vec![Value::from_i64(1003)]); left_delta3.insert(101, vec![Value::from_i64(1004)]);
let right_delta3 = Delta::new();
let result3 = merge_op
.commit(DeltaPair::new(left_delta3, right_delta3), &mut cursors)
.unwrap();
if let IOResult::Done(merged3) = result3 {
assert_eq!(merged3.len(), 2, "Should have 2 entries from left");
let third_rowids: Vec<i64> = merged3.changes.iter().map(|(row, _)| row.rowid).collect();
assert_eq!(
first_rowids, third_rowids,
"Same (table, rowid) pairs should get same assigned rowids"
);
} else {
panic!("Expected Done result");
}
let left_delta4 = Delta::new();
let mut right_delta4 = Delta::new();
right_delta4.insert(100, vec![Value::from_i64(2003)]);
let result4 = merge_op
.commit(DeltaPair::new(left_delta4, right_delta4), &mut cursors)
.unwrap();
if let IOResult::Done(merged4) = result4 {
assert_eq!(merged4.len(), 1, "Should have 1 entry from right");
let fourth_rowid = merged4.changes[0].0.rowid;
assert_eq!(
fourth_rowid, second_result_rowid_100,
"archived_orders rowid 100 should consistently map to same assigned rowid"
);
} else {
panic!("Expected Done result");
}
}
#[test]
fn test_merge_operator_both_sides_empty() {
let (_pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(_pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(_pager, index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut merge_op = MergeOperator::new(
12,
UnionMode::All {
left_table: "t1".to_string(),
right_table: "t2".to_string(),
},
);
let mut left_delta1 = Delta::new();
left_delta1.insert(1, vec![Value::from_i64(100)]);
let mut right_delta1 = Delta::new();
right_delta1.insert(1, vec![Value::from_i64(200)]);
let result1 = merge_op
.commit(DeltaPair::new(left_delta1, right_delta1), &mut cursors)
.unwrap();
let original_t1_rowid = if let IOResult::Done(ref merged1) = result1 {
assert_eq!(merged1.len(), 2, "Should have 2 entries initially");
merged1
.changes
.iter()
.find(|(row, _)| row.values == vec![Value::from_i64(100)])
.map(|(row, _)| row.rowid)
.unwrap()
} else {
panic!("Expected Done result");
};
let empty_left = Delta::new();
let empty_right = Delta::new();
let result2 = merge_op
.commit(DeltaPair::new(empty_left, empty_right), &mut cursors)
.unwrap();
if let IOResult::Done(merged2) = result2 {
assert_eq!(
merged2.len(),
0,
"Both empty sides should produce empty output"
);
} else {
panic!("Expected Done result");
}
let mut left_delta3 = Delta::new();
left_delta3.insert(1, vec![Value::from_i64(101)]); let right_delta3 = Delta::new();
let result3 = merge_op
.commit(DeltaPair::new(left_delta3, right_delta3), &mut cursors)
.unwrap();
if let IOResult::Done(merged3) = result3 {
assert_eq!(merged3.len(), 1, "Should have 1 entry");
let rowid = merged3.changes[0].0.rowid;
assert_eq!(
rowid, original_t1_rowid,
"Should maintain consistent rowid mapping after empty operation"
);
} else {
panic!("Expected Done result");
}
}
#[test]
fn test_aggregate_serialization_with_different_column_indices() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut agg1 = AggregateOperator::new(
1,
vec![0],
vec![AggregateFunction::Sum(1), AggregateFunction::Min(3)],
vec![
"group".to_string(),
"val1".to_string(),
"val2".to_string(),
"val3".to_string(),
],
)
.unwrap();
let mut delta = Delta::new();
delta.insert(
1,
vec![
Value::Text("A".into()),
Value::from_i64(10),
Value::from_i64(100),
Value::from_i64(5),
],
);
delta.insert(
2,
vec![
Value::Text("A".into()),
Value::from_i64(15),
Value::from_i64(200),
Value::from_i64(3),
],
);
let result1 = pager
.io
.block(|| agg1.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(result1.changes.len(), 1);
let (row1, _) = &result1.changes[0];
assert_eq!(row1.values[0], Value::Text("A".into()));
assert_eq!(row1.values[1], Value::from_i64(25)); assert_eq!(row1.values[2], Value::from_i64(3));
let mut agg2 = AggregateOperator::new(
1, vec![0],
vec![AggregateFunction::Sum(3), AggregateFunction::Min(1)],
vec![
"group".to_string(),
"val1".to_string(),
"val2".to_string(),
"val3".to_string(),
],
)
.unwrap();
let mut delta2 = Delta::new();
delta2.insert(
3,
vec![
Value::Text("A".into()),
Value::from_i64(20),
Value::from_i64(300),
Value::from_i64(4),
],
);
let result2 = pager
.io
.block(|| agg2.commit((&delta2).into(), &mut cursors))
.unwrap();
let row2 = result2
.changes
.iter()
.find(|(row, weight)| row.values[0] == Value::Text("A".into()) && *weight > 0)
.expect("Should have a positive weight row for group A");
let (row2, _) = row2;
assert_eq!(
row2.values[1],
Value::from_i64(4),
"SUM(col3) should be 4 (new data only)"
);
assert_eq!(
row2.values[2],
Value::from_i64(20),
"MIN(col1) should be 20 (new data only)"
);
}
#[test]
fn test_distinct_removes_duplicates() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
0, vec![0], vec![], vec!["value".to_string()],
)
.unwrap();
let mut input = Delta::new();
input.insert(1, vec![Value::from_i64(100)]); input.insert(2, vec![Value::from_i64(200)]); input.insert(3, vec![Value::from_i64(100)]); input.insert(4, vec![Value::from_i64(300)]); input.insert(5, vec![Value::from_i64(200)]); input.insert(6, vec![Value::from_i64(100)]);
let result = pager
.io
.block(|| operator.commit((&input).into(), &mut cursors))
.unwrap();
let distinct_values: HashSet<i64> = result
.changes
.iter()
.map(|(row, _weight)| match &row.values[0] {
Value::Numeric(Numeric::Integer(i)) => *i,
_ => panic!("Expected integer value"),
})
.collect();
assert_eq!(
distinct_values.len(),
3,
"Should have exactly 3 distinct values"
);
assert!(distinct_values.contains(&100));
assert!(distinct_values.contains(&200));
assert!(distinct_values.contains(&300));
for (_row, weight) in &result.changes {
assert_eq!(*weight, 1, "DISTINCT should output weight 1 for all groups");
}
}
#[test]
fn test_distinct_incremental_updates() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
0,
vec![0, 1], vec![], vec!["category".to_string(), "value".to_string()],
)
.unwrap();
let mut delta1 = Delta::new();
delta1.insert(1, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta1.insert(2, vec![Value::Text("B".into()), Value::from_i64(200)]);
delta1.insert(3, vec![Value::Text("A".into()), Value::from_i64(100)]);
let result1 = pager
.io
.block(|| operator.commit((&delta1).into(), &mut cursors))
.unwrap();
assert_eq!(
result1.changes.len(),
2,
"First commit should output 2 distinct groups"
);
for (_row, weight) in &result1.changes {
assert_eq!(*weight, 1, "New groups should have weight +1");
}
let mut delta2 = Delta::new();
delta2.delete(1, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta2.insert(4, vec![Value::Text("C".into()), Value::from_i64(300)]);
let result2 = pager
.io
.block(|| operator.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(
result2.changes.len(),
1,
"Second commit should only output new group"
);
let (row, weight) = &result2.changes[0];
assert_eq!(*weight, 1);
assert_eq!(row.values[0], Value::Text("C".into()));
assert_eq!(row.values[1], Value::from_i64(300));
let mut delta3 = Delta::new();
delta3.delete(3, vec![Value::Text("A".into()), Value::from_i64(100)]);
let result3 = pager
.io
.block(|| operator.commit((&delta3).into(), &mut cursors))
.unwrap();
assert_eq!(
result3.changes.len(),
1,
"Third commit should output disappeared group"
);
let (row, weight) = &result3.changes[0];
assert_eq!(*weight, -1, "Disappeared group should have weight -1");
assert_eq!(row.values[0], Value::Text("A".into()));
assert_eq!(row.values[1], Value::from_i64(100))
}
#[test]
fn test_distinct_state_transitions() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
0,
vec![0],
vec![], vec!["value".to_string()],
)
.unwrap();
let mut delta1 = Delta::new();
for i in 1..=3 {
delta1.insert(i, vec![Value::from_i64(100)]);
}
let result1 = pager
.io
.block(|| operator.commit((&delta1).into(), &mut cursors))
.unwrap();
assert_eq!(result1.changes.len(), 1);
assert_eq!(result1.changes[0].1, 1, "First appearance should output +1");
let mut delta2 = Delta::new();
for i in 1..=2 {
delta2.delete(i, vec![Value::from_i64(100)]);
}
let result2 = pager
.io
.block(|| operator.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 0, "No transition, no output");
let mut delta3 = Delta::new();
delta3.delete(3, vec![Value::from_i64(100)]);
let result3 = pager
.io
.block(|| operator.commit((&delta3).into(), &mut cursors))
.unwrap();
assert_eq!(result3.changes.len(), 1);
assert_eq!(result3.changes[0].1, -1, "Disappearance should output -1");
let mut delta4 = Delta::new();
delta4.insert(4, vec![Value::from_i64(100)]);
let result4 = pager
.io
.block(|| operator.commit((&delta4).into(), &mut cursors))
.unwrap();
assert_eq!(result4.changes.len(), 1);
assert_eq!(result4.changes[0].1, 1, "Reappearance should output +1")
}
#[test]
fn test_distinct_persistence() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator1 = AggregateOperator::new(
0,
vec![0],
vec![], vec!["value".to_string()],
)
.unwrap();
let mut delta1 = Delta::new();
delta1.insert(1, vec![Value::from_i64(100)]);
delta1.insert(2, vec![Value::from_i64(100)]); delta1.insert(3, vec![Value::from_i64(200)]);
let result1 = pager
.io
.block(|| operator1.commit((&delta1).into(), &mut cursors))
.unwrap();
assert_eq!(result1.changes.len(), 2, "Should output 2 distinct values");
let table_cursor2 = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_cursor2 =
BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors2 = DbspStateCursors::new(table_cursor2, index_cursor2);
let mut operator2 = AggregateOperator::new(
0, vec![0],
vec![], vec!["value".to_string()],
)
.unwrap();
let mut delta2 = Delta::new();
delta2.insert(4, vec![Value::from_i64(300)]);
delta2.delete(1, vec![Value::from_i64(100)]);
let result2 = pager
.io
.block(|| operator2.commit((&delta2).into(), &mut cursors2))
.unwrap();
assert_eq!(result2.changes.len(), 1, "Should only output new value");
assert_eq!(result2.changes[0].1, 1, "Should be insertion");
assert_eq!(result2.changes[0].0.values[0], Value::from_i64(300));
let mut delta3 = Delta::new();
delta3.delete(2, vec![Value::from_i64(100)]);
let result3 = pager
.io
.block(|| operator2.commit((&delta3).into(), &mut cursors2))
.unwrap();
assert_eq!(result3.changes.len(), 1, "Should output deletion");
assert_eq!(result3.changes[0].1, -1, "Should be deletion");
assert_eq!(result3.changes[0].0.values[0], Value::from_i64(100));
}
#[test]
fn test_distinct_batch_with_multiple_groups() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
0,
vec![0, 1], vec![], vec!["category".to_string(), "value".to_string()],
)
.unwrap();
let mut delta = Delta::new();
delta.insert(1, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta.insert(2, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta.insert(3, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta.insert(4, vec![Value::Text("B".into()), Value::from_i64(200)]);
delta.insert(5, vec![Value::Text("B".into()), Value::from_i64(200)]);
delta.insert(6, vec![Value::Text("A".into()), Value::from_i64(200)]);
delta.insert(7, vec![Value::Text("C".into()), Value::from_i64(100)]);
delta.insert(8, vec![Value::Text("C".into()), Value::from_i64(100)]);
delta.insert(9, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta.insert(10, vec![Value::Text("A".into()), Value::from_i64(100)]);
delta.insert(11, vec![Value::Text("B".into()), Value::from_i64(100)]);
let result = pager
.io
.block(|| operator.commit((&delta).into(), &mut cursors))
.unwrap();
assert_eq!(
result.changes.len(),
5,
"Should have exactly 5 distinct groups"
);
for (_row, weight) in &result.changes {
assert_eq!(*weight, 1, "All groups should have weight +1");
}
let groups: HashSet<(String, i64)> = result
.changes
.iter()
.map(|(row, _)| {
let category = match &row.values[0] {
Value::Text(s) => s.value.clone().into_owned(),
_ => panic!("Expected text for category"),
};
let value = match &row.values[1] {
Value::Numeric(Numeric::Integer(i)) => *i,
_ => panic!("Expected integer for value"),
};
(category, value)
})
.collect();
assert!(groups.contains(&("A".to_string(), 100)));
assert!(groups.contains(&("A".to_string(), 200)));
assert!(groups.contains(&("B".to_string(), 100)));
assert!(groups.contains(&("B".to_string(), 200)));
assert!(groups.contains(&("C".to_string(), 100)));
}
#[test]
fn test_multiple_distinct_aggregates_same_column() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
0,
vec![], vec![
AggregateFunction::CountDistinct(0), AggregateFunction::SumDistinct(0), AggregateFunction::AvgDistinct(0), ],
vec!["value".to_string()],
)
.unwrap();
let mut input = Delta::new();
input.insert(1, vec![Value::from_i64(10)]);
input.insert(2, vec![Value::from_i64(10)]); input.insert(3, vec![Value::from_i64(20)]);
input.insert(4, vec![Value::from_i64(20)]); input.insert(5, vec![Value::from_i64(30)]);
input.insert(6, vec![Value::from_i64(10)]);
let output = pager
.io
.block(|| operator.commit((&input).into(), &mut cursors))
.unwrap();
assert_eq!(output.changes.len(), 1);
let (row, weight) = &output.changes[0];
assert_eq!(*weight, 1);
let values = &row.values;
assert_eq!(values.len(), 3);
assert_eq!(values[0], Value::from_i64(3));
assert_eq!(values[1], Value::from_i64(60));
assert_eq!(values[2], Value::from_f64(20.0));
}
#[test]
fn test_count_distinct_with_deletions() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
1,
vec![], vec![AggregateFunction::CountDistinct(1)],
vec!["id".to_string(), "value".to_string()],
)
.unwrap();
let mut delta1 = Delta::new();
delta1.insert(1, vec![Value::from_i64(1), Value::from_i64(100)]);
delta1.insert(2, vec![Value::from_i64(2), Value::from_i64(200)]);
delta1.insert(3, vec![Value::from_i64(3), Value::from_i64(300)]);
let result1 = pager
.io
.block(|| operator.commit((&delta1).into(), &mut cursors))
.unwrap();
assert_eq!(result1.changes.len(), 1);
assert_eq!(result1.changes[0].1, 1);
assert_eq!(result1.changes[0].0.values[0], Value::from_i64(3));
let mut delta2 = Delta::new();
delta2.delete(2, vec![Value::from_i64(2), Value::from_i64(200)]);
let result2 = pager
.io
.block(|| operator.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 2);
let new_row = result2.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_row.0.values[0], Value::from_i64(2));
}
#[test]
fn test_sum_distinct_with_deletions() {
let (pager, table_root_page_id, index_root_page_id) = create_test_pager();
let table_cursor = BTreeCursor::new_table(pager.clone(), table_root_page_id, 5);
let index_def = create_dbsp_state_index(index_root_page_id);
let index_cursor = BTreeCursor::new_index(pager.clone(), index_root_page_id, &index_def, 4);
let mut cursors = DbspStateCursors::new(table_cursor, index_cursor);
let mut operator = AggregateOperator::new(
1,
vec![],
vec![AggregateFunction::SumDistinct(1)],
vec!["id".to_string(), "value".to_string()],
)
.unwrap();
let mut delta1 = Delta::new();
delta1.insert(1, vec![Value::from_i64(1), Value::from_i64(100)]);
delta1.insert(2, vec![Value::from_i64(2), Value::from_i64(200)]);
delta1.insert(3, vec![Value::from_i64(3), Value::from_i64(100)]); delta1.insert(4, vec![Value::from_i64(4), Value::from_i64(300)]);
let result1 = pager
.io
.block(|| operator.commit((&delta1).into(), &mut cursors))
.unwrap();
assert_eq!(result1.changes.len(), 1);
assert_eq!(result1.changes[0].1, 1);
assert_eq!(result1.changes[0].0.values[0], Value::from_f64(600.0));
let mut delta2 = Delta::new();
delta2.delete(2, vec![Value::from_i64(2), Value::from_i64(200)]);
let result2 = pager
.io
.block(|| operator.commit((&delta2).into(), &mut cursors))
.unwrap();
assert_eq!(result2.changes.len(), 2);
let new_row = result2.changes.iter().find(|(_, w)| *w == 1).unwrap();
assert_eq!(new_row.0.values[0], Value::from_f64(400.0)); }
}