use crate::{
incremental::{
compiler::{DeltaSet, ExecuteState},
dbsp::{Delta, HashableRow, RowKeyZSet},
view::{IncrementalView, ViewTransactionState},
},
return_if_io,
storage::btree::BTreeCursor,
types::{IOResult, SeekKey, SeekOp, SeekResult, Value},
LimboError, Pager, Result,
};
use std::sync::{Arc, Mutex};
#[derive(Debug)]
enum SeekState {
Init,
Seek {
target: i64,
},
Done,
}
pub struct MaterializedViewCursor {
btree_cursor: Box<BTreeCursor>,
view: Arc<Mutex<IncrementalView>>,
pager: Arc<Pager>,
uncommitted: RowKeyZSet,
tx_state: Arc<ViewTransactionState>,
last_tx_state_len: usize,
current_row: Option<(i64, Vec<Value>)>,
execute_state: ExecuteState,
seek_state: SeekState,
}
impl MaterializedViewCursor {
pub fn new(
btree_cursor: Box<BTreeCursor>,
view: Arc<Mutex<IncrementalView>>,
pager: Arc<Pager>,
tx_state: Arc<ViewTransactionState>,
) -> Result<Self> {
Ok(Self {
btree_cursor,
view,
pager,
uncommitted: RowKeyZSet::new(),
tx_state,
last_tx_state_len: 0,
current_row: None,
execute_state: ExecuteState::Uninitialized,
seek_state: SeekState::Init,
})
}
fn ensure_tx_changes_computed(&mut self) -> Result<IOResult<()>> {
let current_len = self.tx_state.len();
if current_len == self.last_tx_state_len {
return Ok(IOResult::Done(()));
}
let mut view_guard = self.view.lock().unwrap();
let table_deltas = self.tx_state.get_table_deltas();
let mut uncommitted = DeltaSet::new();
for (table_name, delta) in table_deltas {
uncommitted.insert(table_name, delta);
}
let processed_delta = return_if_io!(view_guard.execute_with_uncommitted(
uncommitted,
self.pager.clone(),
&mut self.execute_state
));
self.uncommitted = RowKeyZSet::from_delta(&processed_delta);
self.last_tx_state_len = current_len;
Ok(IOResult::Done(()))
}
fn read_btree_delta_entry(&mut self) -> Result<IOResult<Vec<(HashableRow, isize)>>> {
let btree_rowid = return_if_io!(self.btree_cursor.rowid());
let rowid = match btree_rowid {
None => return Ok(IOResult::Done(Vec::new())),
Some(rowid) => rowid,
};
let btree_record = return_if_io!(self.btree_cursor.record());
let btree_ref_values = btree_record
.ok_or_else(|| {
crate::LimboError::InternalError(
"Invalid data in materialized view: found a rowid, but not the row!"
.to_string(),
)
})?
.get_values();
let mut btree_values: Vec<Value> =
btree_ref_values.iter().map(|rv| rv.to_owned()).collect();
let weight_value = btree_values.pop().ok_or_else(|| {
crate::LimboError::InternalError(
"Invalid data in materialized view: no weight column found".to_string(),
)
})?;
let weight = match weight_value {
Value::Integer(w) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid data in materialized view: expected integer weight, found {weight_value:?}"
)))
}
};
if weight <= 0 {
return Err(crate::LimboError::InternalError(format!(
"Invalid data in materialized view: expected a positive weight, found {weight}"
)));
}
Ok(IOResult::Done(vec![(
HashableRow::new(rowid, btree_values),
weight,
)]))
}
fn do_seek(&mut self, target_rowid: i64, op: SeekOp) -> Result<IOResult<SeekResult>> {
loop {
match &mut self.seek_state {
SeekState::Init => {
self.current_row = None;
self.seek_state = SeekState::Seek {
target: target_rowid,
};
}
SeekState::Seek { target } => {
let target = *target;
let btree_result =
return_if_io!(self.btree_cursor.seek(SeekKey::TableRowId(target), op));
let changes = if btree_result == SeekResult::Found {
return_if_io!(self.read_btree_delta_entry())
} else {
Vec::new()
};
let mut btree_entries = Delta { changes };
let changes = self.uncommitted.seek(target, op);
let uncommitted_entries = Delta { changes };
btree_entries.merge(&uncommitted_entries);
if btree_entries.is_empty() {
self.seek_state = SeekState::Done;
return Ok(IOResult::Done(SeekResult::NotFound));
}
let min_seen = btree_entries
.changes
.first()
.expect("cannot be empty, we just tested for it")
.0
.rowid;
let max_seen = btree_entries
.changes
.last()
.expect("cannot be empty, we just tested for it")
.0
.rowid;
let zset = RowKeyZSet::from_delta(&btree_entries);
let ret = zset.seek(target_rowid, op);
if !ret.is_empty() {
let (row, _) = &ret[0];
self.current_row = Some((row.rowid, row.values.clone()));
self.seek_state = SeekState::Done;
return Ok(IOResult::Done(SeekResult::Found));
}
let new_target = match op {
SeekOp::GT => Some(max_seen),
SeekOp::GE { eq_only: false } => Some(max_seen + 1),
SeekOp::LT => Some(min_seen),
SeekOp::LE { eq_only: false } => Some(min_seen - 1),
SeekOp::LE { eq_only: true } | SeekOp::GE { eq_only: true } => None,
};
if let Some(target) = new_target {
self.seek_state = SeekState::Seek { target };
} else {
self.seek_state = SeekState::Done;
return Ok(IOResult::Done(SeekResult::NotFound));
}
}
SeekState::Done => {
self.seek_state = SeekState::Init;
}
}
}
}
pub fn seek(&mut self, key: SeekKey, op: SeekOp) -> Result<IOResult<SeekResult>> {
return_if_io!(self.ensure_tx_changes_computed());
let target_rowid = match &key {
SeekKey::TableRowId(rowid) => *rowid,
SeekKey::IndexKey(_) => {
return Err(LimboError::ParseError(
"Cannot search a materialized view with an index key".to_string(),
));
}
};
self.do_seek(target_rowid, op)
}
pub fn next(&mut self) -> Result<IOResult<bool>> {
let Some((current_rowid, _)) = &self.current_row else {
return Ok(IOResult::Done(false));
};
let result = return_if_io!(self.do_seek(*current_rowid, SeekOp::GT));
Ok(IOResult::Done(result == SeekResult::Found))
}
pub fn column(&mut self, col: usize) -> Result<IOResult<Value>> {
if let Some((_, ref values)) = self.current_row {
Ok(IOResult::Done(
values.get(col).cloned().unwrap_or(Value::Null),
))
} else {
Ok(IOResult::Done(Value::Null))
}
}
pub fn rowid(&self) -> Result<IOResult<Option<i64>>> {
Ok(IOResult::Done(self.current_row.as_ref().map(|(id, _)| *id)))
}
pub fn rewind(&mut self) -> Result<IOResult<()>> {
return_if_io!(self.ensure_tx_changes_computed());
let _result = return_if_io!(self.do_seek(i64::MIN, SeekOp::GT));
Ok(IOResult::Done(()))
}
pub fn is_valid(&self) -> Result<bool> {
Ok(self.current_row.is_some())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::util::IOExt;
use crate::{Connection, Database, OpenFlags};
use std::sync::Arc;
fn create_test_connection() -> Result<Arc<Connection>> {
let io = Arc::new(crate::io::MemoryIO::new());
let db = Database::open_file_with_flags(
io,
":memory:",
OpenFlags::default(),
crate::DatabaseOpts {
enable_mvcc: false,
enable_indexes: false,
enable_views: true,
enable_strict: false,
enable_load_extension: false,
enable_encryption: false,
},
None,
)?;
let conn = db.connect()?;
conn.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, value INTEGER)")?;
conn.execute("CREATE MATERIALIZED VIEW test_view AS SELECT id, value FROM test_table")?;
Ok(conn)
}
fn create_test_cursor(
conn: &Arc<Connection>,
) -> Result<(
MaterializedViewCursor,
Arc<ViewTransactionState>,
Arc<Pager>,
)> {
let view_mutex = conn
.schema
.read()
.get_materialized_view("test_view")
.ok_or(crate::LimboError::InternalError(
"View not found".to_string(),
))?;
let view = view_mutex.lock().unwrap();
let root_page = view.get_root_page();
if root_page == 0 {
return Err(crate::LimboError::InternalError(
"View not materialized".to_string(),
));
}
let num_columns = view.column_schema.columns.len();
drop(view);
let pager = conn.get_pager();
let btree_cursor = Box::new(BTreeCursor::new(
None, pager.clone(),
root_page,
num_columns,
));
let tx_state = conn.view_transaction_states.get_or_create("test_view");
let cursor = MaterializedViewCursor::new(
btree_cursor,
view_mutex.clone(),
pager.clone(),
tx_state.clone(),
)?;
Ok((cursor, tx_state, pager))
}
fn populate_test_table(conn: &Arc<Connection>, rows: Vec<(i64, i64)>) -> Result<()> {
for (id, value) in rows {
let sql = format!("INSERT INTO test_table (id, value) VALUES ({id}, {value})");
conn.execute(&sql)?;
}
Ok(())
}
fn apply_changes_to_tx_state(
tx_state: &ViewTransactionState,
changes: Vec<(i64, Vec<Value>, isize)>,
) {
for (rowid, values, weight) in changes {
if weight > 0 {
tx_state.insert("test_table", rowid, values);
} else if weight < 0 {
tx_state.delete("test_table", rowid, values);
}
}
}
#[test]
fn test_seek_key_exists_in_btree() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::LT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_seek_key_exists_only_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], 1), (6, vec![Value::Integer(6), Value::Integer(60)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(30));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(60));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(6), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
Ok(())
}
#[test]
fn test_seek_key_deleted_by_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], -1), (5, vec![Value::Integer(5), Value::Integer(50)], -1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(1), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
#[test]
fn test_seek_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], -1), (3, vec![Value::Integer(3), Value::Integer(35)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(35));
Ok(())
}
#[test]
fn test_seek_boundary_conditions() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50), (10, 100)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(1), SeekOp::LT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(15), SeekOp::GT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(7), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
Ok(())
}
#[test]
fn test_seek_complex_uncommitted_weights() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(5, vec![Value::Integer(5), Value::Integer(50)], -1), (5, vec![Value::Integer(5), Value::Integer(51)], 1), (5, vec![Value::Integer(5), Value::Integer(51)], -1), (5, vec![Value::Integer(5), Value::Integer(52)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(52));
Ok(())
}
#[test]
fn test_seek_affected_by_transaction_state_changes() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
tx_state.insert("test_table", 2, vec![Value::Integer(2), Value::Integer(20)]);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(20));
Ok(())
}
#[test]
fn test_rewind_btree_first_uncommitted_later() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(8, vec![Value::Integer(8), Value::Integer(80)], 1),
(10, vec![Value::Integer(10), Value::Integer(100)], 1),
],
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
#[test]
fn test_rewind_with_uncommitted_first() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(2, vec![Value::Integer(2), Value::Integer(20)], 1)],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(20));
Ok(())
}
#[test]
fn test_rewind_skip_deleted_first() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(1, vec![Value::Integer(1), Value::Integer(10)], -1)],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_rewind_empty_btree_with_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], 1),
(7, vec![Value::Integer(7), Value::Integer(70)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(30));
Ok(())
}
#[test]
fn test_rewind_all_deleted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], -1),
(4, vec![Value::Integer(4), Value::Integer(40)], -1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(!cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, None);
Ok(())
}
#[test]
fn test_rewind_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::Integer(1), Value::Integer(10)], -1),
(1, vec![Value::Integer(1), Value::Integer(15)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(15));
Ok(())
}
#[test]
fn test_next_btree_only_sequential() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_uncommitted_only() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], 1),
(4, vec![Value::Integer(4), Value::Integer(40)], 1),
(6, vec![Value::Integer(6), Value::Integer(60)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_mixed_btree_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (5, 50), (9, 90)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], 1),
(7, vec![Value::Integer(7), Value::Integer(70)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_skip_deleted_rows() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], -1),
(4, vec![Value::Integer(4), Value::Integer(40)], -1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(30)], -1),
(3, vec![Value::Integer(3), Value::Integer(35)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(35));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_from_uninitialized() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
assert!(!cursor.is_valid()?);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_empty_table() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_all_deleted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::Integer(1), Value::Integer(10)], -1),
(2, vec![Value::Integer(2), Value::Integer(20)], -1),
(3, vec![Value::Integer(3), Value::Integer(30)], -1),
],
);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_complex_interleaving() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40), (6, 60), (8, 80)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::Integer(1), Value::Integer(10)], 1), (2, vec![Value::Integer(2), Value::Integer(20)], -1), (3, vec![Value::Integer(3), Value::Integer(30)], 1), (4, vec![Value::Integer(4), Value::Integer(40)], -1), (4, vec![Value::Integer(4), Value::Integer(45)], 1), (5, vec![Value::Integer(5), Value::Integer(50)], 1), (6, vec![Value::Integer(6), Value::Integer(60)], -1), (7, vec![Value::Integer(7), Value::Integer(70)], 1), (9, vec![Value::Integer(9), Value::Integer(90)], 1), ],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(45));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(8));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_after_seek() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70), (9, 90)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_multiple_weights_same_row() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::Integer(1), Value::Integer(10)], -1), (1, vec![Value::Integer(1), Value::Integer(11)], 1), (1, vec![Value::Integer(1), Value::Integer(11)], -1), (1, vec![Value::Integer(1), Value::Integer(12)], 1), (1, vec![Value::Integer(1), Value::Integer(12)], -1), ],
);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_only_uncommitted_large_gaps() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(100, vec![Value::Integer(100), Value::Integer(1000)], 1),
(500, vec![Value::Integer(500), Value::Integer(5000)], 1),
(999, vec![Value::Integer(999), Value::Integer(9990)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(100));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(500));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(999));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_multiple_updates_same_row_single_transaction() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], -1), (2, vec![Value::Integer(2), Value::Integer(25)], 1), (2, vec![Value::Integer(2), Value::Integer(25)], -1), (2, vec![Value::Integer(2), Value::Integer(28)], 1), (2, vec![Value::Integer(2), Value::Integer(28)], -1), (2, vec![Value::Integer(2), Value::Integer(32)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(32));
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(10));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(32));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(30));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_empty_materialized_view_with_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(5, vec![Value::Integer(5), Value::Integer(50)], 1),
(10, vec![Value::Integer(10), Value::Integer(100)], 1),
(15, vec![Value::Integer(15), Value::Integer(150)], 1),
],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(10), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(7), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(15));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_exact_match_btree_uncommitted_same_rowid_different_values() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::Integer(3), Value::Integer(35)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_boundary_value_seeks() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(100, 1000), (200, 2000)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(
i64::MIN + 1,
vec![Value::Integer(i64::MIN + 1), Value::Integer(-999)],
1,
),
(
i64::MAX - 1,
vec![Value::Integer(i64::MAX - 1), Value::Integer(999)],
1,
),
],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MAX), SeekOp::GT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MIN), SeekOp::LT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager.io.block(|| {
cursor.seek(
SeekKey::TableRowId(i64::MAX - 1),
SeekOp::GE { eq_only: false },
)
})?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MAX - 1));
let result = pager.io.block(|| {
cursor.seek(
SeekKey::TableRowId(i64::MIN + 1),
SeekOp::LE { eq_only: false },
)
})?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MIN + 1));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MIN), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MIN + 1));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MAX), SeekOp::LT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MAX - 1));
Ok(())
}
#[test]
fn test_next_concurrent_btree_uncommitted_advance() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], -1), (2, vec![Value::Integer(2), Value::Integer(25)], 1), (4, vec![Value::Integer(4), Value::Integer(40)], -1), ],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::Integer(25));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_transaction_state_changes_mid_iteration() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::Integer(2), Value::Integer(20)], 1), (4, vec![Value::Integer(4), Value::Integer(40)], 1), (6, vec![Value::Integer(6), Value::Integer(60)], 1), ],
);
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_rewind_after_failed_seek() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(2, vec![Value::Integer(2), Value::Integer(20)], 1)],
);
assert_eq!(
pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::GE { eq_only: true }))?,
SeekResult::NotFound
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
assert_eq!(
pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GT))?,
SeekResult::NotFound
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
}