use crate::numeric::Numeric;
use crate::sync::Arc;
use crate::sync::Mutex;
use crate::{
incremental::{
compiler::{DeltaSet, ExecuteState},
dbsp::{Delta, HashableRow, RowKeyZSet},
view::{IncrementalView, ViewTransactionState},
},
return_if_io,
storage::btree::CursorTrait,
types::{IOResult, SeekKey, SeekOp, SeekResult, Value},
LimboError, Pager, Result,
};
#[derive(Debug)]
enum SeekState {
Init,
Seek {
target: i64,
},
Advancing {
target: i64,
op: SeekOp,
},
Done,
}
pub struct MaterializedViewCursor {
btree_cursor: Box<dyn CursorTrait>,
view: Arc<Mutex<IncrementalView>>,
pager: Arc<Pager>,
uncommitted: RowKeyZSet,
tx_state: Arc<ViewTransactionState>,
last_tx_state_len: usize,
current_row: Option<(i64, Vec<Value>)>,
execute_state: ExecuteState,
seek_state: SeekState,
}
impl MaterializedViewCursor {
pub fn new(
btree_cursor: Box<dyn CursorTrait>,
view: Arc<Mutex<IncrementalView>>,
pager: Arc<Pager>,
tx_state: Arc<ViewTransactionState>,
) -> Result<Self> {
Ok(Self {
btree_cursor,
view,
pager,
uncommitted: RowKeyZSet::new(),
tx_state,
last_tx_state_len: 0,
current_row: None,
execute_state: ExecuteState::Uninitialized,
seek_state: SeekState::Init,
})
}
fn ensure_tx_changes_computed(&mut self) -> Result<IOResult<()>> {
let current_len = self.tx_state.len();
if current_len == self.last_tx_state_len {
return Ok(IOResult::Done(()));
}
let mut view_guard = self.view.lock();
let table_deltas = self.tx_state.get_table_deltas();
let mut uncommitted = DeltaSet::new();
for (table_name, delta) in table_deltas {
uncommitted.insert(table_name, delta);
}
let processed_delta = return_if_io!(view_guard.execute_with_uncommitted(
uncommitted,
self.pager.clone(),
&mut self.execute_state
));
self.uncommitted = RowKeyZSet::from_delta(&processed_delta);
self.last_tx_state_len = current_len;
Ok(IOResult::Done(()))
}
fn read_btree_delta_entry(&mut self) -> Result<IOResult<Vec<(HashableRow, isize)>>> {
let btree_rowid = return_if_io!(self.btree_cursor.rowid());
let rowid = match btree_rowid {
None => return Ok(IOResult::Done(Vec::new())),
Some(rowid) => rowid,
};
let btree_record = return_if_io!(self.btree_cursor.record()).ok_or_else(|| {
crate::LimboError::InternalError(
"Invalid data in materialized view: found a rowid, but not the row!".to_string(),
)
})?;
let mut btree_values = btree_record.get_values_owned()?;
let weight_value = btree_values.pop().ok_or_else(|| {
crate::LimboError::InternalError(
"Invalid data in materialized view: no weight column found".to_string(),
)
})?;
let weight = match weight_value {
Value::Numeric(Numeric::Integer(w)) => w as isize,
_ => {
return Err(crate::LimboError::InternalError(format!(
"Invalid data in materialized view: expected integer weight, found {weight_value:?}"
)))
}
};
if weight <= 0 {
return Err(crate::LimboError::InternalError(format!(
"Invalid data in materialized view: expected a positive weight, found {weight}"
)));
}
Ok(IOResult::Done(vec![(
HashableRow::new(rowid, btree_values),
weight,
)]))
}
fn process_btree_changes(
&mut self,
target: i64,
target_rowid: i64,
op: SeekOp,
changes: Vec<(HashableRow, isize)>,
) -> Result<IOResult<()>> {
let mut btree_entries = Delta { changes };
let changes = self.uncommitted.seek(target, op);
let uncommitted_entries = Delta { changes };
btree_entries.merge(&uncommitted_entries);
if btree_entries.is_empty() {
self.seek_state = SeekState::Done;
return Ok(IOResult::Done(()));
}
let min_seen = btree_entries
.changes
.first()
.expect("cannot be empty, we just tested for it")
.0
.rowid;
let max_seen = btree_entries
.changes
.last()
.expect("cannot be empty, we just tested for it")
.0
.rowid;
let zset = RowKeyZSet::from_delta(&btree_entries);
let ret = zset.seek(target_rowid, op);
if !ret.is_empty() {
let (row, _) = &ret[0];
self.current_row = Some((row.rowid, row.values.clone()));
self.seek_state = SeekState::Done;
return Ok(IOResult::Done(()));
}
let new_target = match op {
SeekOp::GT => Some(max_seen),
SeekOp::GE { eq_only: false } => Some(max_seen + 1),
SeekOp::LT => Some(min_seen),
SeekOp::LE { eq_only: false } => Some(min_seen - 1),
SeekOp::LE { eq_only: true } | SeekOp::GE { eq_only: true } => None,
};
if let Some(target) = new_target {
self.seek_state = SeekState::Seek { target };
} else {
self.seek_state = SeekState::Done;
}
Ok(IOResult::Done(()))
}
fn do_seek(&mut self, target_rowid: i64, op: SeekOp) -> Result<IOResult<SeekResult>> {
loop {
match &mut self.seek_state {
SeekState::Init => {
self.current_row = None;
self.seek_state = SeekState::Seek {
target: target_rowid,
};
}
SeekState::Seek { target } => {
let target = *target;
let btree_result =
return_if_io!(self.btree_cursor.seek(SeekKey::TableRowId(target), op));
let changes = match btree_result {
SeekResult::Found => return_if_io!(self.read_btree_delta_entry()),
SeekResult::TryAdvance => {
self.seek_state = SeekState::Advancing { target, op };
continue;
}
SeekResult::NotFound => Vec::new(),
};
return_if_io!(self.process_btree_changes(target, target_rowid, op, changes));
if matches!(self.seek_state, SeekState::Done) {
let result = if self.current_row.is_some() {
SeekResult::Found
} else {
SeekResult::NotFound
};
return Ok(IOResult::Done(result));
}
}
SeekState::Advancing { target, op } => {
let target = *target;
let op = *op;
match op {
SeekOp::GT | SeekOp::GE { .. } => {
return_if_io!(self.btree_cursor.next())
}
SeekOp::LT | SeekOp::LE { .. } => {
return_if_io!(self.btree_cursor.prev())
}
};
let changes = return_if_io!(self.read_btree_delta_entry());
return_if_io!(self.process_btree_changes(target, target_rowid, op, changes));
if matches!(self.seek_state, SeekState::Done) {
let result = if self.current_row.is_some() {
SeekResult::Found
} else {
SeekResult::NotFound
};
return Ok(IOResult::Done(result));
}
}
SeekState::Done => {
self.seek_state = SeekState::Init;
}
}
}
}
pub fn seek(&mut self, key: SeekKey, op: SeekOp) -> Result<IOResult<SeekResult>> {
return_if_io!(self.ensure_tx_changes_computed());
let target_rowid = match &key {
SeekKey::TableRowId(rowid) => *rowid,
SeekKey::IndexKey(_) => {
return Err(LimboError::ParseError(
"Cannot search a materialized view with an index key".to_string(),
));
}
};
self.do_seek(target_rowid, op)
}
pub fn next(&mut self) -> Result<IOResult<bool>> {
if matches!(
self.seek_state,
SeekState::Seek { .. } | SeekState::Advancing { .. }
) {
let result = return_if_io!(self.do_seek(0, SeekOp::GT));
return Ok(IOResult::Done(result == SeekResult::Found));
}
let Some((current_rowid, _)) = &self.current_row else {
return Ok(IOResult::Done(false));
};
let result = return_if_io!(self.do_seek(*current_rowid, SeekOp::GT));
Ok(IOResult::Done(result == SeekResult::Found))
}
pub fn column(&mut self, col: usize) -> Result<IOResult<Value>> {
if let Some((_, ref values)) = self.current_row {
Ok(IOResult::Done(
values.get(col).cloned().unwrap_or(Value::Null),
))
} else {
Ok(IOResult::Done(Value::Null))
}
}
pub fn rowid(&self) -> Result<IOResult<Option<i64>>> {
Ok(IOResult::Done(self.current_row.as_ref().map(|(id, _)| *id)))
}
pub fn rewind(&mut self) -> Result<IOResult<()>> {
return_if_io!(self.ensure_tx_changes_computed());
let _result = return_if_io!(self.do_seek(i64::MIN, SeekOp::GT));
Ok(IOResult::Done(()))
}
pub fn is_valid(&self) -> Result<bool> {
Ok(self.current_row.is_some())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::btree::BTreeCursor;
use crate::sync::Arc;
use crate::util::IOExt;
use crate::{Connection, Database, OpenFlags};
fn create_test_connection() -> Result<Arc<Connection>> {
let io = Arc::new(crate::io::MemoryIO::new());
let db = Database::open_file_with_flags(
io,
":memory:",
OpenFlags::default(),
crate::DatabaseOpts {
enable_views: true,
enable_custom_types: false,
enable_load_extension: false,
enable_encryption: false,
enable_index_method: false,
enable_autovacuum: false,
enable_vacuum: false,
enable_attach: false,
enable_generated_columns: false,
enable_multiprocess_wal: false,
enable_without_rowid: false,
unsafe_testing: false,
},
None,
)?;
let conn = db.connect()?;
conn.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, value INTEGER)")?;
conn.execute("CREATE MATERIALIZED VIEW test_view AS SELECT id, value FROM test_table")?;
Ok(conn)
}
fn create_test_cursor(
conn: &Arc<Connection>,
) -> Result<(
MaterializedViewCursor,
Arc<ViewTransactionState>,
Arc<Pager>,
)> {
let view_mutex = conn
.schema
.read()
.get_materialized_view("test_view")
.ok_or_else(|| crate::LimboError::InternalError("View not found".to_string()))?;
let view = view_mutex.lock();
let root_page = view.get_root_page();
if root_page == 0 {
return Err(crate::LimboError::InternalError(
"View not materialized".to_string(),
));
}
let num_columns = view.column_schema.columns.len();
drop(view);
let pager = conn.get_pager();
let btree_cursor = Box::new(BTreeCursor::new(pager.clone(), root_page, num_columns));
let tx_state = conn.view_transaction_states.get_or_create("test_view");
let cursor = MaterializedViewCursor::new(
btree_cursor,
view_mutex.clone(),
pager.clone(),
tx_state.clone(),
)?;
Ok((cursor, tx_state, pager))
}
fn populate_test_table(conn: &Arc<Connection>, rows: Vec<(i64, i64)>) -> Result<()> {
for (id, value) in rows {
let sql = format!("INSERT INTO test_table (id, value) VALUES ({id}, {value})");
conn.execute(&sql)?;
}
Ok(())
}
fn apply_changes_to_tx_state(
tx_state: &ViewTransactionState,
changes: Vec<(i64, Vec<Value>, isize)>,
) {
for (rowid, values, weight) in changes {
if weight > 0 {
tx_state.insert("test_table", rowid, values);
} else if weight < 0 {
tx_state.delete("test_table", rowid, values);
}
}
}
#[test]
fn test_seek_key_exists_in_btree() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::LT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_seek_key_exists_only_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], 1), (6, vec![Value::from_i64(6), Value::from_i64(60)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(30));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(60));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(6), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
Ok(())
}
#[test]
fn test_seek_key_deleted_by_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], -1), (5, vec![Value::from_i64(5), Value::from_i64(50)], -1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(1), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::LE { eq_only: false }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
#[test]
fn test_seek_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], -1), (3, vec![Value::from_i64(3), Value::from_i64(35)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(35));
Ok(())
}
#[test]
fn test_seek_boundary_conditions() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50), (10, 100)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(1), SeekOp::LT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(15), SeekOp::GT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(7), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
Ok(())
}
#[test]
fn test_seek_complex_uncommitted_weights() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(5, vec![Value::from_i64(5), Value::from_i64(50)], -1), (5, vec![Value::from_i64(5), Value::from_i64(51)], 1), (5, vec![Value::from_i64(5), Value::from_i64(51)], -1), (5, vec![Value::from_i64(5), Value::from_i64(52)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(52));
Ok(())
}
#[test]
fn test_seek_affected_by_transaction_state_changes() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::NotFound);
tx_state.insert(
"test_table",
2,
vec![Value::from_i64(2), Value::from_i64(20)],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(20));
Ok(())
}
#[test]
fn test_rewind_btree_first_uncommitted_later() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(8, vec![Value::from_i64(8), Value::from_i64(80)], 1),
(10, vec![Value::from_i64(10), Value::from_i64(100)], 1),
],
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
#[test]
fn test_rewind_with_uncommitted_first() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(5, 50), (7, 70)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(2, vec![Value::from_i64(2), Value::from_i64(20)], 1)],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(20));
Ok(())
}
#[test]
fn test_rewind_skip_deleted_first() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(1, vec![Value::from_i64(1), Value::from_i64(10)], -1)],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_rewind_empty_btree_with_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], 1),
(7, vec![Value::from_i64(7), Value::from_i64(70)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(30));
Ok(())
}
#[test]
fn test_rewind_all_deleted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], -1),
(4, vec![Value::from_i64(4), Value::from_i64(40)], -1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(!cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, None);
Ok(())
}
#[test]
fn test_rewind_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::from_i64(1), Value::from_i64(10)], -1),
(1, vec![Value::from_i64(1), Value::from_i64(15)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(15));
Ok(())
}
#[test]
fn test_next_btree_only_sequential() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_uncommitted_only() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], 1),
(4, vec![Value::from_i64(4), Value::from_i64(40)], 1),
(6, vec![Value::from_i64(6), Value::from_i64(60)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_mixed_btree_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (5, 50), (9, 90)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], 1),
(7, vec![Value::from_i64(7), Value::from_i64(70)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_skip_deleted_rows() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], -1),
(4, vec![Value::from_i64(4), Value::from_i64(40)], -1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_with_updates() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(30)], -1),
(3, vec![Value::from_i64(3), Value::from_i64(35)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(35));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_from_uninitialized() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
assert!(!cursor.is_valid()?);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_empty_table() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_all_deleted() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::from_i64(1), Value::from_i64(10)], -1),
(2, vec![Value::from_i64(2), Value::from_i64(20)], -1),
(3, vec![Value::from_i64(3), Value::from_i64(30)], -1),
],
);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_complex_interleaving() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(2, 20), (4, 40), (6, 60), (8, 80)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::from_i64(1), Value::from_i64(10)], 1), (2, vec![Value::from_i64(2), Value::from_i64(20)], -1), (3, vec![Value::from_i64(3), Value::from_i64(30)], 1), (4, vec![Value::from_i64(4), Value::from_i64(40)], -1), (4, vec![Value::from_i64(4), Value::from_i64(45)], 1), (5, vec![Value::from_i64(5), Value::from_i64(50)], 1), (6, vec![Value::from_i64(6), Value::from_i64(60)], -1), (7, vec![Value::from_i64(7), Value::from_i64(70)], 1), (9, vec![Value::from_i64(9), Value::from_i64(90)], 1), ],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(45));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(8));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_after_seek() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50), (7, 70), (9, 90)])?;
let (mut cursor, _tx_state, pager) = create_test_cursor(&conn)?;
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(7));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(9));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_next_multiple_weights_same_row() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(1, vec![Value::from_i64(1), Value::from_i64(10)], -1), (1, vec![Value::from_i64(1), Value::from_i64(11)], 1), (1, vec![Value::from_i64(1), Value::from_i64(11)], -1), (1, vec![Value::from_i64(1), Value::from_i64(12)], 1), (1, vec![Value::from_i64(1), Value::from_i64(12)], -1), ],
);
assert!(!pager.io.block(|| cursor.next())?);
assert!(!cursor.is_valid()?);
Ok(())
}
#[test]
fn test_next_only_uncommitted_large_gaps() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(100, vec![Value::from_i64(100), Value::from_i64(1000)], 1),
(500, vec![Value::from_i64(500), Value::from_i64(5000)], 1),
(999, vec![Value::from_i64(999), Value::from_i64(9990)], 1),
],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(100));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(500));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(999));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_multiple_updates_same_row_single_transaction() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], -1), (2, vec![Value::from_i64(2), Value::from_i64(25)], 1), (2, vec![Value::from_i64(2), Value::from_i64(25)], -1), (2, vec![Value::from_i64(2), Value::from_i64(28)], 1), (2, vec![Value::from_i64(2), Value::from_i64(28)], -1), (2, vec![Value::from_i64(2), Value::from_i64(32)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(2), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(32));
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(10));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(32));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(30));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_empty_materialized_view_with_uncommitted() -> Result<()> {
let conn = create_test_connection()?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(5, vec![Value::from_i64(5), Value::from_i64(50)], 1),
(10, vec![Value::from_i64(10), Value::from_i64(100)], 1),
(15, vec![Value::from_i64(15), Value::from_i64(150)], 1),
],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(10), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(7), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(10));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(15));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_exact_match_btree_uncommitted_same_rowid_different_values() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(3, vec![Value::from_i64(3), Value::from_i64(35)], 1), ],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(3), SeekOp::GE { eq_only: true }))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
Ok(())
}
#[test]
fn test_boundary_value_seeks() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(100, 1000), (200, 2000)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(
i64::MIN + 1,
vec![Value::from_i64(i64::MIN + 1), Value::from_i64(-999)],
1,
),
(
i64::MAX - 1,
vec![Value::from_i64(i64::MAX - 1), Value::from_i64(999)],
1,
),
],
);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MAX), SeekOp::GT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MIN), SeekOp::LT))?;
assert_eq!(result, SeekResult::NotFound);
let result = pager.io.block(|| {
cursor.seek(
SeekKey::TableRowId(i64::MAX - 1),
SeekOp::GE { eq_only: false },
)
})?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MAX - 1));
let result = pager.io.block(|| {
cursor.seek(
SeekKey::TableRowId(i64::MIN + 1),
SeekOp::LE { eq_only: false },
)
})?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MIN + 1));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MIN), SeekOp::GT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MIN + 1));
let result = pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(i64::MAX), SeekOp::LT))?;
assert_eq!(result, SeekResult::Found);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(i64::MAX - 1));
Ok(())
}
#[test]
fn test_next_concurrent_btree_uncommitted_advance() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (2, 20), (3, 30), (4, 40), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], -1), (2, vec![Value::from_i64(2), Value::from_i64(25)], 1), (4, vec![Value::from_i64(4), Value::from_i64(40)], -1), ],
);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert_eq!(pager.io.block(|| cursor.column(1))?, Value::from_i64(25));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_transaction_state_changes_mid_iteration() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
apply_changes_to_tx_state(
&tx_state,
vec![
(2, vec![Value::from_i64(2), Value::from_i64(20)], 1), (4, vec![Value::from_i64(4), Value::from_i64(40)], 1), (6, vec![Value::from_i64(6), Value::from_i64(60)], 1), ],
);
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
pager.io.block(|| cursor.rewind())?;
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(4));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(6));
assert!(!pager.io.block(|| cursor.next())?);
Ok(())
}
#[test]
fn test_rewind_after_failed_seek() -> Result<()> {
let conn = create_test_connection()?;
populate_test_table(&conn, vec![(1, 10), (3, 30), (5, 50)])?;
let (mut cursor, tx_state, pager) = create_test_cursor(&conn)?;
apply_changes_to_tx_state(
&tx_state,
vec![(2, vec![Value::from_i64(2), Value::from_i64(20)], 1)],
);
assert_eq!(
pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(4), SeekOp::GE { eq_only: true }))?,
SeekResult::NotFound
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(2));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(3));
assert!(pager.io.block(|| cursor.next())?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(5));
assert!(!pager.io.block(|| cursor.next())?);
assert_eq!(
pager
.io
.block(|| cursor.seek(SeekKey::TableRowId(5), SeekOp::GT))?,
SeekResult::NotFound
);
assert!(!cursor.is_valid()?);
pager.io.block(|| cursor.rewind())?;
assert!(cursor.is_valid()?);
assert_eq!(pager.io.block(|| cursor.rowid())?, Some(1));
Ok(())
}
mod io_resumption_tests {
use super::*;
use crate::io::Completion;
use crate::storage::btree::{BTreeKey, CursorTrait};
use crate::types::{IOCompletions, ImmutableRecord, IndexInfo};
use crate::Register;
use std::sync::atomic::{AtomicUsize, Ordering};
struct MockBTreeCursor {
seek_count: AtomicUsize,
next_count: AtomicUsize,
prev_count: AtomicUsize,
current_rowid: Option<i64>,
record: ImmutableRecord,
index_info: Arc<IndexInfo>,
}
impl MockBTreeCursor {
fn new() -> Self {
let record = Self::create_test_record(1, 10, 1);
Self {
seek_count: AtomicUsize::new(0),
next_count: AtomicUsize::new(0),
prev_count: AtomicUsize::new(0),
current_rowid: Some(1),
record,
index_info: Arc::new(IndexInfo::default()),
}
}
fn create_test_record(rowid: i64, value: i64, weight: i64) -> ImmutableRecord {
let mut payload = vec![
4u8, 6u8, 6u8, 6u8, ];
payload.extend_from_slice(&rowid.to_be_bytes());
payload.extend_from_slice(&value.to_be_bytes());
payload.extend_from_slice(&weight.to_be_bytes());
ImmutableRecord::from_bin_record(payload)
}
fn get_seek_count(&self) -> usize {
self.seek_count.load(Ordering::SeqCst)
}
fn get_prev_count(&self) -> usize {
self.prev_count.load(Ordering::SeqCst)
}
}
impl CursorTrait for MockBTreeCursor {
fn seek(&mut self, _key: SeekKey<'_>, _op: SeekOp) -> Result<IOResult<SeekResult>> {
let count = self.seek_count.fetch_add(1, Ordering::SeqCst);
if count == 0 {
Ok(IOResult::Done(SeekResult::TryAdvance))
} else {
Ok(IOResult::Done(SeekResult::Found))
}
}
fn seek_unpacked(
&mut self,
_registers: &[Register],
_op: SeekOp,
) -> Result<IOResult<SeekResult>> {
Ok(IOResult::Done(SeekResult::NotFound))
}
fn next(&mut self) -> Result<IOResult<()>> {
let count = self.next_count.fetch_add(1, Ordering::SeqCst);
if count == 0 {
let completion = Completion::new_yield();
Ok(IOResult::IO(IOCompletions::Single(completion)))
} else {
Ok(IOResult::Done(()))
}
}
fn prev(&mut self) -> Result<IOResult<()>> {
let count = self.prev_count.fetch_add(1, Ordering::SeqCst);
if count == 0 {
let completion = Completion::new_yield();
Ok(IOResult::IO(IOCompletions::Single(completion)))
} else {
Ok(IOResult::Done(()))
}
}
fn rowid(&mut self) -> Result<IOResult<Option<i64>>> {
Ok(IOResult::Done(self.current_rowid))
}
fn record(&mut self) -> Result<IOResult<Option<&ImmutableRecord>>> {
Ok(IOResult::Done(Some(&self.record)))
}
fn last(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn insert(&mut self, _key: &BTreeKey) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn delete(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn set_null_flag(&mut self, _flag: bool) {}
fn get_null_flag(&self) -> bool {
false
}
fn exists(&mut self, _key: &Value) -> Result<IOResult<bool>> {
Ok(IOResult::Done(false))
}
fn clear_btree(&mut self) -> Result<IOResult<Option<usize>>> {
Ok(IOResult::Done(None))
}
fn btree_destroy(&mut self) -> Result<IOResult<Option<usize>>> {
Ok(IOResult::Done(None))
}
fn count(&mut self) -> Result<IOResult<usize>> {
Ok(IOResult::Done(0))
}
fn is_empty(&self) -> bool {
false
}
fn root_page(&self) -> i64 {
1
}
fn rewind(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn has_record(&self) -> bool {
true
}
fn set_has_record(&mut self, _has_record: bool) {}
fn get_index_info(&self) -> &Arc<IndexInfo> {
&self.index_info
}
fn seek_end(&mut self) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn seek_to_last(&mut self, _always_seek: bool) -> Result<IOResult<()>> {
Ok(IOResult::Done(()))
}
fn invalidate_record(&mut self) {}
fn has_rowid(&self) -> bool {
true
}
fn get_pager(&self) -> Arc<Pager> {
panic!("MockBTreeCursor::get_pager should not be called")
}
fn get_skip_advance(&self) -> bool {
false
}
}
#[test]
fn test_seek_not_repeated_after_io_during_try_advance() -> Result<()> {
let conn = create_test_connection()?;
let view_mutex = conn
.schema
.read()
.get_materialized_view("test_view")
.ok_or_else(|| crate::LimboError::InternalError("View not found".to_string()))?;
let pager = conn.get_pager();
let tx_state = conn.view_transaction_states.get_or_create("test_view");
let mock_cursor = MockBTreeCursor::new();
let mock_cursor_box: Box<dyn CursorTrait> = Box::new(mock_cursor);
let mock_ptr = mock_cursor_box.as_ref() as *const dyn CursorTrait;
let mut cursor =
MaterializedViewCursor::new(mock_cursor_box, view_mutex, pager, tx_state)?;
let seek_op = SeekOp::LE { eq_only: false };
let result = cursor.do_seek(5, seek_op);
assert!(
matches!(result, Ok(IOResult::IO(_))),
"Expected IO result, got {result:?}"
);
let mock_ref: &MockBTreeCursor = unsafe { &*(mock_ptr as *const MockBTreeCursor) };
assert_eq!(
mock_ref.get_seek_count(),
1,
"seek should be called exactly once before IO"
);
assert_eq!(
mock_ref.get_prev_count(),
1,
"prev should be called once (returned IO)"
);
let result = cursor.do_seek(5, seek_op);
assert!(
matches!(result, Ok(IOResult::Done(_))),
"Expected Done result on resume, got {result:?}"
);
let final_seek_count = mock_ref.get_seek_count();
assert_eq!(
final_seek_count, 1,
"seek should only be called once, but was called {final_seek_count} times (redundant seek after IO during TryAdvance)"
);
Ok(())
}
}
}