use std::collections::HashSet;
use std::sync::atomic::Ordering;
use fsqlite_types::sync_primitives::RwLock;
use fsqlite_types::{CommitSeq, PageData, PageNumber, Snapshot, TxnToken};
use tracing::{debug, info, trace, warn};
use crate::cell_mvcc_boundary::{BtreeOp, MvccOpClass, PageMetadata, classify_btree_op};
use crate::cell_visibility::{CellDelta, CellKey, CellVisibilityLog};
use crate::materialize::{MaterializationTrigger, materialize_page, should_materialize_eagerly};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CellMvccMode {
On,
Off,
#[default]
Auto,
}
impl CellMvccMode {
fn as_str(self) -> &'static str {
match self {
Self::On => "on",
Self::Off => "off",
Self::Auto => "auto",
}
}
}
impl std::str::FromStr for CellMvccMode {
type Err = ();
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"on" | "true" | "1" => Ok(Self::On),
"off" | "false" | "0" => Ok(Self::Off),
"auto" => Ok(Self::Auto),
_ => Err(()),
}
}
}
impl std::fmt::Display for CellMvccMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RoutingReason {
CellMvccDisabled,
InteriorPage,
StructuralOperation,
PageTrackedByOtherTxn,
MaterializationThresholdExceeded,
MemoryBudgetExceeded,
IndexPageAutoMode,
CellLevelEligible,
}
impl RoutingReason {
#[allow(dead_code)] fn as_str(self) -> &'static str {
match self {
Self::CellMvccDisabled => "cell_mvcc_disabled",
Self::InteriorPage => "interior_page",
Self::StructuralOperation => "structural_operation",
Self::PageTrackedByOtherTxn => "page_tracked_by_other_txn",
Self::MaterializationThresholdExceeded => "materialization_threshold",
Self::MemoryBudgetExceeded => "memory_budget_exceeded",
Self::IndexPageAutoMode => "index_page_auto_mode",
Self::CellLevelEligible => "cell_level_eligible",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RoutingDecision {
pub use_cell_level: bool,
pub reason: RoutingReason,
}
impl RoutingDecision {
#[must_use]
pub const fn cell_level(reason: RoutingReason) -> Self {
Self {
use_cell_level: true,
reason,
}
}
#[must_use]
pub const fn page_level(reason: RoutingReason) -> Self {
Self {
use_cell_level: false,
reason,
}
}
}
#[derive(Debug, Clone)]
pub struct EscalationResult {
pub materialized_page: PageData,
pub deltas_applied: usize,
pub discarded_cells: Vec<CellKey>,
}
#[derive(Debug, Default)]
pub struct TxnEscalationTracker {
escalated_pages: HashSet<PageNumber>,
cell_level_pages: HashSet<PageNumber>,
}
impl TxnEscalationTracker {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_cell_level(&mut self, page: PageNumber) {
if !self.escalated_pages.contains(&page) {
self.cell_level_pages.insert(page);
}
}
#[must_use]
pub fn is_cell_level(&self, page: PageNumber) -> bool {
self.cell_level_pages.contains(&page) && !self.escalated_pages.contains(&page)
}
pub fn escalate(&mut self, page: PageNumber) {
self.cell_level_pages.remove(&page);
self.escalated_pages.insert(page);
debug!(pgno = page.get(), "page_escalated_to_page_level");
}
#[must_use]
pub fn is_escalated(&self, page: PageNumber) -> bool {
self.escalated_pages.contains(&page)
}
pub fn cell_level_pages(&self) -> impl Iterator<Item = PageNumber> + '_ {
self.cell_level_pages.iter().copied()
}
pub fn escalated_pages(&self) -> impl Iterator<Item = PageNumber> + '_ {
self.escalated_pages.iter().copied()
}
pub fn clear(&mut self) {
self.cell_level_pages.clear();
self.escalated_pages.clear();
}
}
#[derive(Debug, Default)]
pub struct PageTrackingState {
page_level_txns: RwLock<std::collections::HashMap<PageNumber, HashSet<TxnToken>>>,
}
impl PageTrackingState {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register_page_level(&self, page: PageNumber, txn: TxnToken) {
let mut map = self.page_level_txns.write();
map.entry(page).or_default().insert(txn);
trace!(
pgno = page.get(),
txn_id = txn.id.get(),
"registered_page_level_tracking"
);
}
pub fn unregister_page_level(&self, page: PageNumber, txn: TxnToken) {
let mut map = self.page_level_txns.write();
if let Some(txns) = map.get_mut(&page) {
txns.remove(&txn);
if txns.is_empty() {
map.remove(&page);
}
}
}
pub fn unregister_all(&self, txn: TxnToken) {
let mut map = self.page_level_txns.write();
map.retain(|_, txns| {
txns.remove(&txn);
!txns.is_empty()
});
}
#[must_use]
pub fn has_other_page_level_txn(&self, page: PageNumber, current_txn: TxnToken) -> bool {
let map = self.page_level_txns.read();
map.get(&page)
.is_some_and(|txns| txns.iter().any(|t| *t != current_txn))
}
#[must_use]
pub fn page_level_txn_count(&self, page: PageNumber) -> usize {
let map = self.page_level_txns.read();
map.get(&page).map_or(0, HashSet::len)
}
}
#[derive(Debug)]
pub struct RoutingContext<'a> {
pub mode: CellMvccMode,
pub cell_log: &'a CellVisibilityLog,
pub page_tracking: &'a PageTrackingState,
pub current_txn: TxnToken,
pub materialization_threshold: usize,
}
#[must_use]
pub fn should_use_cell_path(
ctx: &RoutingContext<'_>,
page: &PageMetadata,
op: &BtreeOp,
) -> RoutingDecision {
match ctx.mode {
CellMvccMode::Off => {
trace!(
pgno = page.page_no.get(),
mode = "off",
"routing_cell_mvcc_disabled"
);
return RoutingDecision::page_level(RoutingReason::CellMvccDisabled);
}
CellMvccMode::Auto if !page.is_table => {
trace!(
pgno = page.page_no.get(),
mode = "auto",
is_index = true,
"routing_index_page_auto_mode"
);
return RoutingDecision::page_level(RoutingReason::IndexPageAutoMode);
}
_ => {}
}
if !page.is_leaf {
trace!(
pgno = page.page_no.get(),
is_leaf = false,
"routing_interior_page"
);
return RoutingDecision::page_level(RoutingReason::InteriorPage);
}
let op_class = classify_btree_op(op, page);
if op_class == MvccOpClass::Structural {
trace!(
pgno = page.page_no.get(),
op = ?op,
"routing_structural_operation"
);
return RoutingDecision::page_level(RoutingReason::StructuralOperation);
}
if ctx
.page_tracking
.has_other_page_level_txn(page.page_no, ctx.current_txn)
{
debug!(
pgno = page.page_no.get(),
txn_id = ctx.current_txn.id.get(),
"routing_page_tracked_by_other_txn"
);
return RoutingDecision::page_level(RoutingReason::PageTrackedByOtherTxn);
}
let delta_count = ctx.cell_log.page_delta_count(page.page_no);
if should_materialize_eagerly(delta_count, ctx.materialization_threshold) {
debug!(
pgno = page.page_no.get(),
delta_count,
threshold = ctx.materialization_threshold,
"routing_threshold_exceeded"
);
return RoutingDecision::page_level(RoutingReason::MaterializationThresholdExceeded);
}
let txn_bytes = ctx.cell_log.txn_bytes(ctx.current_txn);
let per_txn_budget = ctx.cell_log.per_txn_budget_bytes();
if txn_bytes >= per_txn_budget {
debug!(
pgno = page.page_no.get(),
txn_bytes,
budget = per_txn_budget,
"routing_memory_budget_exceeded"
);
return RoutingDecision::page_level(RoutingReason::MemoryBudgetExceeded);
}
trace!(
pgno = page.page_no.get(),
mode = ctx.mode.as_str(),
delta_count,
"routing_cell_level_eligible"
);
RoutingDecision::cell_level(RoutingReason::CellLevelEligible)
}
pub fn escalate_to_page_level(
base_page: &PageData,
page_number: PageNumber,
committed_deltas: &[CellDelta],
uncommitted_deltas: &[CellDelta],
snapshot: &Snapshot,
usable_size: u32,
escalation_tracker: &mut TxnEscalationTracker,
) -> fsqlite_error::Result<EscalationResult> {
warn!(
pgno = page_number.get(),
committed_delta_count = committed_deltas.len(),
uncommitted_delta_count = uncommitted_deltas.len(),
"escalating_to_page_level"
);
let committed_result = materialize_page(
base_page,
page_number,
committed_deltas,
snapshot,
usable_size,
MaterializationTrigger::Structural,
)?;
let uncommitted_snapshot = Snapshot {
high: CommitSeq::new(u64::MAX),
schema_epoch: snapshot.schema_epoch,
};
let committed_deltas_applied = committed_result.deltas_applied;
let final_result = if uncommitted_deltas.is_empty() {
committed_result
} else {
let visible_uncommitted = uncommitted_deltas
.iter()
.enumerate()
.map(|(idx, delta)| {
let mut visible_delta = delta.clone();
visible_delta.commit_seq =
CommitSeq::new(u64::try_from(idx + 1).unwrap_or(u64::MAX));
visible_delta
})
.collect::<Vec<_>>();
materialize_page(
&committed_result.page,
page_number,
&visible_uncommitted,
&uncommitted_snapshot,
usable_size,
MaterializationTrigger::Structural,
)?
};
let discarded_cells: Vec<CellKey> = uncommitted_deltas.iter().map(|d| d.cell_key).collect();
escalation_tracker.escalate(page_number);
let total_deltas_applied = committed_deltas_applied + final_result.deltas_applied;
info!(
pgno = page_number.get(),
total_deltas_applied,
discarded_cells = discarded_cells.len(),
"escalation_complete"
);
Ok(EscalationResult {
materialized_page: final_result.page,
deltas_applied: total_deltas_applied,
discarded_cells,
})
}
static GLOBAL_CELL_MVCC_MODE: AtomicCellMvccMode = AtomicCellMvccMode::new(CellMvccMode::Auto);
struct AtomicCellMvccMode(std::sync::atomic::AtomicU8);
impl AtomicCellMvccMode {
const fn new(mode: CellMvccMode) -> Self {
Self(std::sync::atomic::AtomicU8::new(mode as u8))
}
fn load(&self, ordering: Ordering) -> CellMvccMode {
match self.0.load(ordering) {
0 => CellMvccMode::On,
1 => CellMvccMode::Off,
_ => CellMvccMode::Auto,
}
}
fn store(&self, mode: CellMvccMode, ordering: Ordering) {
self.0.store(mode as u8, ordering);
}
}
#[must_use]
pub fn get_cell_mvcc_mode() -> CellMvccMode {
GLOBAL_CELL_MVCC_MODE.load(Ordering::Acquire)
}
pub fn set_cell_mvcc_mode(mode: CellMvccMode) {
let old_mode = GLOBAL_CELL_MVCC_MODE.load(Ordering::Acquire);
GLOBAL_CELL_MVCC_MODE.store(mode, Ordering::Release);
info!(
old_mode = old_mode.as_str(),
new_mode = mode.as_str(),
"cell_mvcc_mode_changed"
);
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{BtreeRef, TxnEpoch, TxnId};
fn test_txn(id: u64) -> TxnToken {
TxnToken::new(TxnId::new(id).unwrap(), TxnEpoch::new(0))
}
fn leaf_table_page(page_no: PageNumber, cell_count: u16, available: u32) -> PageMetadata {
PageMetadata {
page_no,
is_leaf: true,
is_table: true,
cell_count,
content_offset: available + 8 + (u32::from(cell_count) + 1) * 2,
usable_size: 4096,
header_offset: 0,
header_size: 8,
}
}
fn interior_table_page(page_no: PageNumber) -> PageMetadata {
PageMetadata {
page_no,
is_leaf: false,
is_table: true,
cell_count: 5,
content_offset: 4000,
usable_size: 4096,
header_offset: 0,
header_size: 12,
}
}
fn index_leaf_page(page_no: PageNumber) -> PageMetadata {
PageMetadata {
page_no,
is_leaf: true,
is_table: false, cell_count: 10,
content_offset: 3000,
usable_size: 4096,
header_offset: 0,
header_size: 8,
}
}
#[test]
fn test_route_logical_insert_to_cell_path() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::CellLevelEligible);
}
#[test]
fn test_route_structural_split_to_page_path() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 100, 30);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 100,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::StructuralOperation);
}
#[test]
fn test_route_overflowing_insert_to_page_path_even_when_empty_page_has_space() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 0, 4086);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 4062,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::StructuralOperation);
}
#[test]
fn test_route_interior_page_always_page_path() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = interior_table_page(PageNumber::new(2).unwrap());
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 10,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::InteriorPage);
}
#[test]
fn test_route_pragma_off_forces_page_path() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let ctx = RoutingContext {
mode: CellMvccMode::Off, cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::CellMvccDisabled);
}
#[test]
fn test_route_pragma_auto_tables_cell_indexes_page() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let ctx = RoutingContext {
mode: CellMvccMode::Auto,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let table_page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let table_decision = should_use_cell_path(&ctx, &table_page, &op);
assert!(table_decision.use_cell_level);
let index_page = index_leaf_page(PageNumber::new(3).unwrap());
let index_op = BtreeOp::IndexInsert { key_size: 50 };
let index_decision = should_use_cell_path(&ctx, &index_page, &index_op);
assert!(!index_decision.use_cell_level);
assert_eq!(index_decision.reason, RoutingReason::IndexPageAutoMode);
}
#[test]
fn test_route_page_tracked_by_other_txn_forces_page() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn1 = test_txn(1);
let txn2 = test_txn(2);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
page_tracking.register_page_level(page.page_no, txn1);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn2,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::PageTrackedByOtherTxn);
}
#[test]
fn test_route_same_txn_page_level_allows_cell() {
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
page_tracking.register_page_level(page.page_no, txn);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn, materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(decision.use_cell_level);
}
#[test]
fn test_route_threshold_exceeded_forces_page() {
use fsqlite_types::TableId;
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let btree = BtreeRef::Table(TableId::new(1));
for i in 0i64..35 {
let cell_key = CellKey::table_row(btree, i);
let delta_txn = test_txn(100 + i as u64);
let idx = cell_log
.record_insert(cell_key, page.page_no, vec![i as u8; 20], delta_txn)
.expect("insert should succeed");
cell_log.commit_delta(idx, CommitSeq::new((i + 1) as u64));
}
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(
decision.reason,
RoutingReason::MaterializationThresholdExceeded
);
}
#[test]
fn test_escalation_is_one_way() {
let mut tracker = TxnEscalationTracker::new();
let page = PageNumber::new(2).unwrap();
tracker.add_cell_level(page);
assert!(tracker.is_cell_level(page));
assert!(!tracker.is_escalated(page));
tracker.escalate(page);
assert!(!tracker.is_cell_level(page));
assert!(tracker.is_escalated(page));
tracker.add_cell_level(page);
assert!(!tracker.is_cell_level(page));
assert!(tracker.is_escalated(page));
}
#[test]
fn test_escalation_only_affects_target_page() {
let mut tracker = TxnEscalationTracker::new();
let page47 = PageNumber::new(47).unwrap();
let page48 = PageNumber::new(48).unwrap();
tracker.add_cell_level(page47);
tracker.add_cell_level(page48);
tracker.escalate(page47);
assert!(!tracker.is_cell_level(page47));
assert!(tracker.is_escalated(page47));
assert!(tracker.is_cell_level(page48));
assert!(!tracker.is_escalated(page48));
}
#[test]
fn test_cell_mvcc_mode_parsing() {
assert_eq!("on".parse::<CellMvccMode>(), Ok(CellMvccMode::On));
assert_eq!("ON".parse::<CellMvccMode>(), Ok(CellMvccMode::On));
assert_eq!("true".parse::<CellMvccMode>(), Ok(CellMvccMode::On));
assert_eq!("1".parse::<CellMvccMode>(), Ok(CellMvccMode::On));
assert_eq!("off".parse::<CellMvccMode>(), Ok(CellMvccMode::Off));
assert_eq!("OFF".parse::<CellMvccMode>(), Ok(CellMvccMode::Off));
assert_eq!("false".parse::<CellMvccMode>(), Ok(CellMvccMode::Off));
assert_eq!("0".parse::<CellMvccMode>(), Ok(CellMvccMode::Off));
assert_eq!("auto".parse::<CellMvccMode>(), Ok(CellMvccMode::Auto));
assert_eq!("AUTO".parse::<CellMvccMode>(), Ok(CellMvccMode::Auto));
assert!("invalid".parse::<CellMvccMode>().is_err());
}
#[test]
fn test_page_tracking_state() {
let state = PageTrackingState::new();
let page = PageNumber::new(2).unwrap();
let txn1 = test_txn(1);
let txn2 = test_txn(2);
assert_eq!(state.page_level_txn_count(page), 0);
assert!(!state.has_other_page_level_txn(page, txn1));
state.register_page_level(page, txn1);
assert_eq!(state.page_level_txn_count(page), 1);
assert!(!state.has_other_page_level_txn(page, txn1)); assert!(state.has_other_page_level_txn(page, txn2));
state.register_page_level(page, txn2);
assert_eq!(state.page_level_txn_count(page), 2);
assert!(state.has_other_page_level_txn(page, txn1));
state.unregister_page_level(page, txn1);
assert_eq!(state.page_level_txn_count(page), 1);
assert!(!state.has_other_page_level_txn(page, txn2));
state.unregister_all(txn2);
assert_eq!(state.page_level_txn_count(page), 0);
}
#[test]
fn test_global_mode_control() {
let saved = get_cell_mvcc_mode();
set_cell_mvcc_mode(CellMvccMode::Off);
assert_eq!(get_cell_mvcc_mode(), CellMvccMode::Off);
set_cell_mvcc_mode(CellMvccMode::On);
assert_eq!(get_cell_mvcc_mode(), CellMvccMode::On);
set_cell_mvcc_mode(CellMvccMode::Auto);
assert_eq!(get_cell_mvcc_mode(), CellMvccMode::Auto);
set_cell_mvcc_mode(saved);
}
#[test]
fn test_route_memory_budget_exceeded_forces_page() {
use fsqlite_types::TableId;
let btree = BtreeRef::Table(TableId::new(1));
let txn = test_txn(1);
let measure_log = CellVisibilityLog::with_per_txn_budget(1024 * 1024, 10_000);
let cell_key = CellKey::table_row(btree, 0);
let _ =
measure_log.record_insert(cell_key, PageNumber::new(99).unwrap(), vec![0u8; 10], txn);
let delta_size = measure_log.txn_bytes(txn);
let cell_log = CellVisibilityLog::with_per_txn_budget(1024 * 1024, delta_size);
let page_tracking = PageTrackingState::new();
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let cell_key1 = CellKey::table_row(btree, 1);
let result =
cell_log.record_insert(cell_key1, PageNumber::new(99).unwrap(), vec![0u8; 10], txn);
assert!(result.is_some(), "First insert should succeed");
let tracked_bytes = cell_log.txn_bytes(txn);
assert_eq!(
tracked_bytes, delta_size,
"txn_bytes should equal budget after one insert"
);
let ctx = RoutingContext {
mode: CellMvccMode::On,
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
let decision = should_use_cell_path(&ctx, &page, &op);
assert!(!decision.use_cell_level);
assert_eq!(decision.reason, RoutingReason::MemoryBudgetExceeded);
}
#[test]
fn test_escalation_preserves_uncommitted_deltas() {
use crate::cell_visibility::CellDeltaKind;
use fsqlite_types::{PageSize, SchemaEpoch, TableId};
let mut base_page = PageData::zeroed(PageSize::new(4096).unwrap());
let page_bytes = base_page.as_bytes_mut();
page_bytes[0] = 0x0D; page_bytes[3] = 0; page_bytes[4] = 0; page_bytes[5] = 0x10; page_bytes[6] = 0x00;
let page_number = PageNumber::new(5).unwrap();
let btree = BtreeRef::Table(TableId::new(1));
let txn = test_txn(42);
let snapshot = Snapshot {
high: CommitSeq::new(0),
schema_epoch: SchemaEpoch::new(0),
};
let uncommitted_deltas = vec![
CellDelta {
cell_key: CellKey::table_row(btree, 1),
commit_seq: CommitSeq::new(0), created_by: txn,
kind: CellDeltaKind::Insert,
page_number,
cell_data: vec![1, 2, 3, 4, 5],
prev_idx: None,
},
CellDelta {
cell_key: CellKey::table_row(btree, 2),
commit_seq: CommitSeq::new(0), created_by: txn,
kind: CellDeltaKind::Insert,
page_number,
cell_data: vec![6, 7, 8, 9, 10],
prev_idx: None,
},
];
let mut tracker = TxnEscalationTracker::new();
tracker.add_cell_level(page_number);
let result = escalate_to_page_level(
&base_page,
page_number,
&[], &uncommitted_deltas, &snapshot,
4096,
&mut tracker,
)
.expect("escalation should succeed");
assert_eq!(result.deltas_applied, 2);
assert_eq!(result.discarded_cells.len(), 2);
assert!(tracker.is_escalated(page_number));
}
#[test]
#[ignore = "requires B-tree split integration (C-TRANSITION integration)"]
fn test_escalation_insert_triggers_split() {
}
#[test]
#[ignore = "requires full SQL execution stack (C-TRANSITION integration)"]
fn test_cell_path_result_identical_to_page_path() {
}
#[test]
#[ignore = "requires concurrent transaction infrastructure (C-TRANSITION integration)"]
fn test_mixed_concurrent_txns() {
}
#[test]
#[ignore = "requires threading and contention testing (C-TRANSITION integration)"]
fn test_escalation_under_contention() {
}
#[test]
fn test_pragma_toggle_mid_session() {
let saved = get_cell_mvcc_mode();
let cell_log = CellVisibilityLog::new(1024 * 1024);
let page_tracking = PageTrackingState::new();
let txn = test_txn(1);
let page = leaf_table_page(PageNumber::new(2).unwrap(), 10, 1000);
let op = BtreeOp::TableInsert {
rowid: 100,
payload_size: 50,
};
set_cell_mvcc_mode(CellMvccMode::On);
let ctx_on = RoutingContext {
mode: get_cell_mvcc_mode(),
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let decision_on = should_use_cell_path(&ctx_on, &page, &op);
assert!(decision_on.use_cell_level);
set_cell_mvcc_mode(CellMvccMode::Off);
let ctx_off = RoutingContext {
mode: get_cell_mvcc_mode(),
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let decision_off = should_use_cell_path(&ctx_off, &page, &op);
assert!(!decision_off.use_cell_level);
assert_eq!(decision_off.reason, RoutingReason::CellMvccDisabled);
set_cell_mvcc_mode(CellMvccMode::Auto);
let ctx_auto = RoutingContext {
mode: get_cell_mvcc_mode(),
cell_log: &cell_log,
page_tracking: &page_tracking,
current_txn: txn,
materialization_threshold: 32,
};
let decision_auto = should_use_cell_path(&ctx_auto, &page, &op);
assert!(decision_auto.use_cell_level);
set_cell_mvcc_mode(saved);
}
#[test]
#[ignore = "requires proptest infrastructure (C-TRANSITION stress)"]
fn test_random_routing_1000_ops() {
}
#[test]
#[ignore = "requires repeated fill/split cycle (C-TRANSITION stress)"]
fn test_rapid_escalation_cycle() {
}
}