use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use fsqlite_types::sync_primitives::{Mutex, RwLock};
use fsqlite_types::{BtreeRef, CommitSeq, PageNumber, SemanticKeyKind, SemanticKeyRef, TxnToken};
use smallvec::SmallVec;
use tracing::{debug, trace};
use crate::cache_aligned::CacheAligned;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CellKey {
pub btree: BtreeRef,
pub kind: SemanticKeyKind,
pub key_digest: [u8; 16],
}
impl CellKey {
#[must_use]
pub fn from_semantic_ref(skr: &SemanticKeyRef) -> Self {
Self {
btree: skr.btree,
kind: skr.kind,
key_digest: skr.key_digest,
}
}
#[must_use]
pub fn table_row(btree: BtreeRef, rowid: i64) -> Self {
let mut key_bytes = [0u8; 10]; let len = encode_varint_i64(rowid, &mut key_bytes);
Self::from_semantic_ref(&SemanticKeyRef::new(
btree,
SemanticKeyKind::TableRow,
&key_bytes[..len],
))
}
#[must_use]
pub fn index_entry(btree: BtreeRef, index_key_bytes: &[u8]) -> Self {
Self::from_semantic_ref(&SemanticKeyRef::new(
btree,
SemanticKeyKind::IndexEntry,
index_key_bytes,
))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CellDeltaKind {
Insert,
Delete,
Update,
}
#[derive(Debug, Clone)]
pub struct CellDelta {
pub commit_seq: CommitSeq,
pub created_by: TxnToken,
pub cell_key: CellKey,
pub kind: CellDeltaKind,
pub page_number: PageNumber,
pub cell_data: Vec<u8>,
pub prev_idx: Option<CellDeltaIdx>,
}
impl CellDelta {
#[must_use]
pub fn memory_size(&self) -> usize {
std::mem::size_of::<Self>() + self.cell_data.len()
}
#[must_use]
pub fn is_visible_to(&self, snapshot_high: CommitSeq) -> bool {
self.commit_seq.get() != 0 && self.commit_seq <= snapshot_high
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CellDeltaIdx {
chunk: u32,
offset: u32,
generation: u32,
}
impl CellDeltaIdx {
#[inline]
const fn new(chunk: u32, offset: u32, generation: u32) -> Self {
Self {
chunk,
offset,
generation,
}
}
#[inline]
#[must_use]
pub fn chunk(&self) -> u32 {
self.chunk
}
#[inline]
#[must_use]
pub fn offset(&self) -> u32 {
self.offset
}
#[inline]
#[must_use]
pub fn generation(&self) -> u32 {
self.generation
}
}
const DELTA_ARENA_CHUNK: usize = 4096;
struct DeltaSlot {
generation: u32,
delta: Option<CellDelta>,
}
pub struct CellDeltaArena {
chunks: Vec<Vec<DeltaSlot>>,
free_list: Vec<CellDeltaIdx>,
cell_data_bytes: AtomicU64,
high_water: u64,
}
impl CellDeltaArena {
#[must_use]
pub fn new() -> Self {
Self {
chunks: vec![Vec::with_capacity(DELTA_ARENA_CHUNK)],
free_list: Vec::new(),
cell_data_bytes: AtomicU64::new(0),
high_water: 0,
}
}
pub fn alloc(&mut self, delta: CellDelta) -> CellDeltaIdx {
let data_size = delta.cell_data.len() as u64;
self.cell_data_bytes.fetch_add(data_size, Ordering::Relaxed);
if let Some(idx) = self.free_list.pop() {
let slot = &mut self.chunks[idx.chunk as usize][idx.offset as usize];
slot.delta = Some(delta);
return CellDeltaIdx::new(idx.chunk, idx.offset, slot.generation);
}
let last_chunk = self.chunks.len() - 1;
if self.chunks[last_chunk].len() >= DELTA_ARENA_CHUNK {
self.chunks.push(Vec::with_capacity(DELTA_ARENA_CHUNK));
}
let chunk_idx = self.chunks.len() - 1;
let offset = self.chunks[chunk_idx].len();
self.chunks[chunk_idx].push(DeltaSlot {
generation: 0,
delta: Some(delta),
});
self.high_water += 1;
let chunk_u32 = u32::try_from(chunk_idx).unwrap_or(u32::MAX);
let offset_u32 = u32::try_from(offset).unwrap_or(u32::MAX);
CellDeltaIdx::new(chunk_u32, offset_u32, 0)
}
pub fn free(&mut self, idx: CellDeltaIdx) -> Option<CellDelta> {
let slot = self
.chunks
.get_mut(idx.chunk as usize)?
.get_mut(idx.offset as usize)?;
if slot.generation != idx.generation {
return None; }
let delta = slot.delta.take()?;
let data_size = delta.cell_data.len() as u64;
self.cell_data_bytes.fetch_sub(data_size, Ordering::Relaxed);
let mut next_gen = slot.generation.wrapping_add(1);
if next_gen == 0 {
next_gen = 1;
}
slot.generation = next_gen;
self.free_list.push(idx);
Some(delta)
}
#[must_use]
pub fn get(&self, idx: CellDeltaIdx) -> Option<&CellDelta> {
let slot = self
.chunks
.get(idx.chunk as usize)?
.get(idx.offset as usize)?;
if slot.generation != idx.generation {
return None;
}
slot.delta.as_ref()
}
#[must_use]
pub fn cell_data_bytes(&self) -> u64 {
self.cell_data_bytes.load(Ordering::Relaxed)
}
#[must_use]
pub fn high_water(&self) -> u64 {
self.high_water
}
}
impl Default for CellDeltaArena {
fn default() -> Self {
Self::new()
}
}
pub const CELL_LOG_SHARDS: usize = 64;
#[derive(Debug, Clone, Copy)]
pub(crate) struct CellHeadEntry {
pub(crate) cell_key: CellKey,
pub(crate) head_idx: CellDeltaIdx,
}
pub(crate) struct CellLogShard {
pub(crate) heads: RwLock<HashMap<(PageNumber, [u8; 16]), CellHeadEntry>>,
page_delta_counts: RwLock<HashMap<PageNumber, usize>>,
}
impl CellLogShard {
fn new() -> Self {
Self {
heads: RwLock::new(HashMap::new()),
page_delta_counts: RwLock::new(HashMap::new()),
}
}
fn increment_page_count(&self, page: PageNumber) {
let mut counts = self.page_delta_counts.write();
*counts.entry(page).or_insert(0) += 1;
}
fn decrement_page_count(&self, page: PageNumber) {
let mut counts = self.page_delta_counts.write();
if let Some(count) = counts.get_mut(&page) {
*count = count.saturating_sub(1);
if *count == 0 {
counts.remove(&page);
}
}
}
fn page_has_deltas(&self, page: PageNumber) -> bool {
let counts = self.page_delta_counts.read();
counts.get(&page).is_some_and(|&c| c > 0)
}
}
struct TxnDeltaTracker {
txn_deltas: HashMap<TxnToken, SmallVec<[CellDeltaIdx; 16]>>,
txn_bytes: HashMap<TxnToken, u64>,
}
impl TxnDeltaTracker {
fn new() -> Self {
Self {
txn_deltas: HashMap::new(),
txn_bytes: HashMap::new(),
}
}
fn record(&mut self, txn: TxnToken, idx: CellDeltaIdx, bytes: u64) {
self.txn_deltas.entry(txn).or_default().push(idx);
*self.txn_bytes.entry(txn).or_insert(0) += bytes;
}
#[allow(dead_code)]
fn get_deltas(&self, txn: TxnToken) -> Option<&SmallVec<[CellDeltaIdx; 16]>> {
self.txn_deltas.get(&txn)
}
#[allow(dead_code)]
fn remove_txn(&mut self, txn: TxnToken) -> Option<SmallVec<[CellDeltaIdx; 16]>> {
self.txn_bytes.remove(&txn);
self.txn_deltas.remove(&txn)
}
fn txn_bytes(&self, txn: TxnToken) -> u64 {
self.txn_bytes.get(&txn).copied().unwrap_or(0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CellConflict {
None,
Conflict { with_txn: TxnToken },
}
#[derive(Debug, Clone, Default)]
pub struct CellGcStats {
pub examined: u64,
pub reclaimed: u64,
pub bytes_freed: u64,
}
pub type MaterializationCallback = Box<dyn Fn(PageNumber, &[CellKey]) + Send + Sync>;
pub struct CellVisibilityLog {
pub(crate) shards: Box<[CacheAligned<CellLogShard>; CELL_LOG_SHARDS]>,
pub(crate) arena: Mutex<CellDeltaArena>,
txn_tracker: Mutex<TxnDeltaTracker>,
budget_bytes: u64,
per_txn_budget_bytes: u64,
delta_count: AtomicU64,
materialization_cb: Option<MaterializationCallback>,
}
const DEFAULT_PER_TXN_BUDGET: u64 = 4 * 1024 * 1024;
impl CellVisibilityLog {
#[must_use]
pub fn new(budget_bytes: u64) -> Self {
Self::with_per_txn_budget(budget_bytes, DEFAULT_PER_TXN_BUDGET)
}
#[must_use]
pub fn with_per_txn_budget(budget_bytes: u64, per_txn_budget_bytes: u64) -> Self {
trace!(
budget_bytes,
per_txn_budget_bytes, "cell_visibility_log_created"
);
Self {
shards: Box::new(std::array::from_fn(|_| {
CacheAligned::new(CellLogShard::new())
})),
arena: Mutex::new(CellDeltaArena::new()),
txn_tracker: Mutex::new(TxnDeltaTracker::new()),
budget_bytes,
per_txn_budget_bytes,
delta_count: AtomicU64::new(0),
materialization_cb: None,
}
}
pub fn set_materialization_callback(&mut self, cb: MaterializationCallback) {
self.materialization_cb = Some(cb);
}
#[inline]
fn shard_index(pgno: PageNumber) -> usize {
(pgno.get() as usize) & (CELL_LOG_SHARDS - 1)
}
#[must_use]
pub fn page_has_deltas(&self, page_number: PageNumber) -> bool {
let shard_idx = Self::shard_index(page_number);
self.shards[shard_idx].page_has_deltas(page_number)
}
pub fn record_insert(
&self,
cell_key: CellKey,
page_number: PageNumber,
cell_data: Vec<u8>,
created_by: TxnToken,
) -> Option<CellDeltaIdx> {
self.record_delta(
cell_key,
page_number,
CellDeltaKind::Insert,
cell_data,
created_by,
)
}
pub fn record_delete(
&self,
cell_key: CellKey,
page_number: PageNumber,
created_by: TxnToken,
) -> Option<CellDeltaIdx> {
self.record_delta(
cell_key,
page_number,
CellDeltaKind::Delete,
Vec::new(),
created_by,
)
}
pub fn record_update(
&self,
cell_key: CellKey,
page_number: PageNumber,
new_cell_data: Vec<u8>,
created_by: TxnToken,
) -> Option<CellDeltaIdx> {
self.record_delta(
cell_key,
page_number,
CellDeltaKind::Update,
new_cell_data,
created_by,
)
}
fn record_delta(
&self,
cell_key: CellKey,
page_number: PageNumber,
kind: CellDeltaKind,
cell_data: Vec<u8>,
created_by: TxnToken,
) -> Option<CellDeltaIdx> {
let shard_idx = Self::shard_index(page_number);
let shard = &self.shards[shard_idx];
let lookup_key = (page_number, cell_key.key_digest);
let prev_idx = {
let heads = shard.heads.read();
heads.get(&lookup_key).map(|e| e.head_idx)
};
let delta = CellDelta {
commit_seq: CommitSeq::new(0), created_by,
cell_key,
kind: kind.clone(),
page_number,
cell_data,
prev_idx,
};
let delta_memory = delta.memory_size() as u64;
let new_idx = {
let mut tracker = self.txn_tracker.lock();
let current_txn_bytes = tracker.txn_bytes(created_by);
if current_txn_bytes + delta_memory > self.per_txn_budget_bytes {
debug!(
txn_id = created_by.id.get(),
current_bytes = current_txn_bytes,
requested_bytes = delta_memory,
budget = self.per_txn_budget_bytes,
"cell_delta_txn_budget_exceeded"
);
return None;
}
let idx = {
let mut arena = self.arena.lock();
arena.alloc(delta)
};
tracker.record(created_by, idx, delta_memory);
idx
};
{
let mut heads = shard.heads.write();
heads.insert(
lookup_key,
CellHeadEntry {
cell_key,
head_idx: new_idx,
},
);
}
shard.increment_page_count(page_number);
self.delta_count.fetch_add(1, Ordering::Relaxed);
trace!(
pgno = page_number.get(),
key_digest = ?&cell_key.key_digest[..4],
txn_id = created_by.id.get(),
op = ?kind,
delta_idx_chunk = new_idx.chunk(),
delta_idx_offset = new_idx.offset(),
"cell_delta_recorded"
);
self.maybe_trigger_materialization();
Some(new_idx)
}
fn maybe_trigger_materialization(&self) {
let arena = self.arena.lock();
let current_bytes = arena.cell_data_bytes();
drop(arena);
if current_bytes > self.budget_bytes {
if let Some(ref cb) = self.materialization_cb {
let pages_to_materialize = self.find_high_delta_pages(10);
for (pgno, cell_keys) in pages_to_materialize {
debug!(
pgno = pgno.get(),
cell_count = cell_keys.len(),
"cell_budget_exceeded_materializing"
);
cb(pgno, &cell_keys);
}
}
}
}
fn find_high_delta_pages(&self, max_pages: usize) -> Vec<(PageNumber, Vec<CellKey>)> {
let mut page_counts = HashMap::new();
let mut page_cells: HashMap<PageNumber, Vec<CellKey>> = HashMap::new();
for shard in self.shards.iter() {
let counts = shard.page_delta_counts.read();
for (&pgno, &count) in counts.iter() {
if count > 0 {
page_counts.insert(pgno, count);
}
}
drop(counts);
let heads = shard.heads.read();
for ((pgno, _), entry) in heads.iter() {
page_cells.entry(*pgno).or_default().push(entry.cell_key);
}
}
let mut pages: Vec<_> = page_cells
.into_iter()
.map(|(pgno, cell_keys)| {
let delta_count = page_counts.get(&pgno).copied().unwrap_or(cell_keys.len());
(pgno, delta_count, cell_keys)
})
.collect();
pages.sort_by(|lhs, rhs| {
rhs.1
.cmp(&lhs.1)
.then_with(|| rhs.2.len().cmp(&lhs.2.len()))
});
pages.truncate(max_pages);
pages
.into_iter()
.map(|(pgno, _delta_count, cell_keys)| (pgno, cell_keys))
.collect()
}
pub fn commit_delta(&self, idx: CellDeltaIdx, commit_seq: CommitSeq) {
let mut arena = self.arena.lock();
if let Some(slot) = arena
.chunks
.get_mut(idx.chunk as usize)
.and_then(|c| c.get_mut(idx.offset as usize))
{
if slot.generation == idx.generation {
if let Some(ref mut delta) = slot.delta {
delta.commit_seq = commit_seq;
}
}
}
}
#[must_use]
pub fn resolve(
&self,
page_number: PageNumber,
cell_key: &CellKey,
snapshot_high: CommitSeq,
) -> Option<Vec<u8>> {
let shard_idx = Self::shard_index(page_number);
let shard = &self.shards[shard_idx];
let lookup_key = (page_number, cell_key.key_digest);
let head_idx = {
let heads = shard.heads.read();
match heads.get(&lookup_key) {
Some(entry) => entry.head_idx,
None => {
trace!(
pgno = page_number.get(),
key_digest = ?&cell_key.key_digest[..4],
snapshot_high = snapshot_high.get(),
result = "not_tracked",
"cell_resolved"
);
return None;
}
}
};
let arena = self.arena.lock();
let mut current_idx = Some(head_idx);
while let Some(idx) = current_idx {
if let Some(delta) = arena.get(idx) {
if delta.is_visible_to(snapshot_high) {
let result = match delta.kind {
CellDeltaKind::Delete => {
trace!(
pgno = page_number.get(),
key_digest = ?&cell_key.key_digest[..4],
snapshot_high = snapshot_high.get(),
result = "deleted",
"cell_resolved"
);
None
}
CellDeltaKind::Insert | CellDeltaKind::Update => {
trace!(
pgno = page_number.get(),
key_digest = ?&cell_key.key_digest[..4],
snapshot_high = snapshot_high.get(),
result = "visible",
data_len = delta.cell_data.len(),
"cell_resolved"
);
Some(delta.cell_data.clone())
}
};
return result;
}
current_idx = delta.prev_idx;
} else {
break;
}
}
trace!(
pgno = page_number.get(),
key_digest = ?&cell_key.key_digest[..4],
snapshot_high = snapshot_high.get(),
result = "no_visible_version",
"cell_resolved"
);
None
}
#[must_use]
pub fn is_over_budget(&self) -> bool {
let arena = self.arena.lock();
arena.cell_data_bytes() > self.budget_bytes
}
#[must_use]
pub fn delta_count(&self) -> u64 {
self.delta_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn cell_data_bytes(&self) -> u64 {
let arena = self.arena.lock();
arena.cell_data_bytes()
}
#[must_use]
pub fn budget_bytes(&self) -> u64 {
self.budget_bytes
}
#[must_use]
pub fn per_txn_budget_bytes(&self) -> u64 {
self.per_txn_budget_bytes
}
pub fn commit_txn(&self, txn: TxnToken, commit_seq: CommitSeq) {
let mut tracker = self.txn_tracker.lock();
let delta_indices = match tracker.get_deltas(txn) {
Some(indices) => indices.clone(),
None => {
trace!(txn_id = txn.id.get(), "cell_txn_commit_no_deltas");
return;
}
};
let mut arena = self.arena.lock();
let mut committed_count = 0u64;
for idx in &delta_indices {
if let Some(slot) = arena
.chunks
.get_mut(idx.chunk as usize)
.and_then(|c| c.get_mut(idx.offset as usize))
{
if slot.generation == idx.generation {
if let Some(ref mut delta) = slot.delta {
delta.commit_seq = commit_seq;
committed_count += 1;
}
}
}
}
drop(arena);
tracker.remove_txn(txn);
debug!(
txn_id = txn.id.get(),
commit_seq = commit_seq.get(),
delta_count = committed_count,
"cell_txn_committed"
);
}
pub fn rollback_txn(&self, txn: TxnToken) -> u64 {
let mut tracker = self.txn_tracker.lock();
let delta_indices = match tracker.remove_txn(txn) {
Some(indices) => indices,
None => {
trace!(txn_id = txn.id.get(), "cell_txn_rollback_no_deltas");
return 0;
}
};
let mut arena = self.arena.lock();
let mut removed_count = 0u64;
let mut bytes_freed = 0u64;
for idx in delta_indices.iter().rev() {
if let Some(delta) = arena.free(*idx) {
bytes_freed += delta.memory_size() as u64;
removed_count += 1;
let shard_idx = Self::shard_index(delta.page_number);
let shard = &self.shards[shard_idx];
let mut heads = shard.heads.write();
let lookup_key = (delta.page_number, delta.cell_key.key_digest);
let next_head = delta.prev_idx.filter(|prev| arena.get(*prev).is_some());
let should_remove = heads
.get(&lookup_key)
.is_some_and(|entry| entry.head_idx == *idx)
&& next_head.is_none();
if let Some(entry) = heads.get_mut(&lookup_key) {
if entry.head_idx == *idx {
if let Some(prev) = next_head {
entry.head_idx = prev;
}
}
}
if should_remove {
heads.remove(&lookup_key);
}
drop(heads); shard.decrement_page_count(delta.page_number);
}
}
self.delta_count.fetch_sub(removed_count, Ordering::Relaxed);
debug!(
txn_id = txn.id.get(),
delta_count = removed_count,
bytes_freed,
"cell_txn_rolled_back"
);
removed_count
}
#[must_use]
pub fn check_conflict(&self, txn: TxnToken, other_txn: TxnToken) -> CellConflict {
if txn == other_txn {
return CellConflict::None;
}
let tracker = self.txn_tracker.lock();
let our_deltas = match tracker.get_deltas(txn) {
Some(d) => d,
None => return CellConflict::None,
};
let their_deltas = match tracker.get_deltas(other_txn) {
Some(d) => d,
None => return CellConflict::None,
};
let arena = self.arena.lock();
let mut our_cells: std::collections::HashSet<(PageNumber, [u8; 16])> =
std::collections::HashSet::new();
for idx in our_deltas {
if let Some(delta) = arena.get(*idx) {
our_cells.insert((delta.page_number, delta.cell_key.key_digest));
}
}
for idx in their_deltas {
if let Some(delta) = arena.get(*idx) {
if our_cells.contains(&(delta.page_number, delta.cell_key.key_digest)) {
debug!(
txn_id = txn.id.get(),
other_txn_id = other_txn.id.get(),
pgno = delta.page_number.get(),
"cell_conflict_detected"
);
return CellConflict::Conflict {
with_txn: other_txn,
};
}
}
}
CellConflict::None
}
pub fn gc(&self, gc_horizon: CommitSeq) -> CellGcStats {
let mut stats = CellGcStats::default();
let mut to_free: Vec<CellDeltaIdx> = Vec::new();
{
let arena = self.arena.lock();
for shard in self.shards.iter() {
let heads = shard.heads.read();
for (_, entry) in heads.iter() {
let mut current_idx = Some(entry.head_idx);
let mut found_visible_below_horizon = false;
while let Some(idx) = current_idx {
stats.examined += 1;
if let Some(delta) = arena.get(idx) {
if delta.commit_seq.get() != 0 && delta.commit_seq <= gc_horizon {
if found_visible_below_horizon {
to_free.push(idx);
} else {
found_visible_below_horizon = true;
}
}
current_idx = delta.prev_idx;
} else {
break;
}
}
}
}
}
if !to_free.is_empty() {
let mut arena = self.arena.lock();
for idx in to_free {
if let Some(delta) = arena.free(idx) {
stats.reclaimed += 1;
stats.bytes_freed += delta.memory_size() as u64;
let shard_idx = Self::shard_index(delta.page_number);
self.shards[shard_idx].decrement_page_count(delta.page_number);
}
}
}
self.delta_count
.fetch_sub(stats.reclaimed, Ordering::Relaxed);
debug!(
gc_horizon = gc_horizon.get(),
examined = stats.examined,
reclaimed = stats.reclaimed,
bytes_freed = stats.bytes_freed,
"cell_gc_completed"
);
stats
}
#[must_use]
pub fn txn_bytes(&self, txn: TxnToken) -> u64 {
let tracker = self.txn_tracker.lock();
tracker.txn_bytes(txn)
}
#[must_use]
pub fn active_txn_count(&self) -> usize {
let tracker = self.txn_tracker.lock();
tracker.txn_deltas.len()
}
#[must_use]
pub fn collect_visible_deltas(
&self,
page_number: PageNumber,
snapshot_high: CommitSeq,
) -> Vec<CellDelta> {
let shard_idx = Self::shard_index(page_number);
let shard = &self.shards[shard_idx];
let mut deltas = Vec::new();
let arena = self.arena.lock();
let heads = shard.heads.read();
for ((pgno, _key_digest), entry) in heads.iter() {
if *pgno != page_number {
continue;
}
let mut current_idx = Some(entry.head_idx);
while let Some(idx) = current_idx {
if let Some(delta) = arena.get(idx) {
if delta.is_visible_to(snapshot_high) {
deltas.push(delta.clone());
}
current_idx = delta.prev_idx;
} else {
break;
}
}
}
deltas.sort_by_key(|d| d.commit_seq);
trace!(
pgno = page_number.get(),
snapshot_high = snapshot_high.get(),
delta_count = deltas.len(),
"collected_visible_deltas"
);
deltas
}
#[must_use]
pub fn page_delta_count(&self, page_number: PageNumber) -> usize {
let shard_idx = Self::shard_index(page_number);
let shard = &self.shards[shard_idx];
let arena = self.arena.lock();
let heads = shard.heads.read();
let mut count = 0usize;
for ((pgno, _), entry) in heads.iter() {
if *pgno != page_number {
continue;
}
let mut current_idx = Some(entry.head_idx);
while let Some(idx) = current_idx {
if let Some(delta) = arena.get(idx) {
count += 1;
current_idx = delta.prev_idx;
} else {
break;
}
}
}
count
}
#[must_use]
pub fn pages_with_deltas(&self) -> Vec<PageNumber> {
let mut pages: std::collections::HashSet<PageNumber> = std::collections::HashSet::new();
for shard in self.shards.iter() {
let heads = shard.heads.read();
for ((pgno, _), _) in heads.iter() {
pages.insert(*pgno);
}
}
pages.into_iter().collect()
}
pub fn clear_page_deltas(&self, page_number: PageNumber, below_commit_seq: CommitSeq) -> usize {
let shard_idx = Self::shard_index(page_number);
let shard = &self.shards[shard_idx];
let mut updates: Vec<([u8; 16], Vec<CellDeltaIdx>, Option<CellDeltaIdx>)> = Vec::new();
{
let arena = self.arena.lock();
let heads = shard.heads.read();
for ((pgno, key_digest), entry) in heads.iter() {
if *pgno != page_number {
continue;
}
let mut to_free_for_key: Vec<CellDeltaIdx> = Vec::new();
let mut new_head: Option<CellDeltaIdx> = None;
let mut current_idx = Some(entry.head_idx);
while let Some(idx) = current_idx {
if let Some(delta) = arena.get(idx) {
if delta.commit_seq.get() != 0 && delta.commit_seq <= below_commit_seq {
to_free_for_key.push(idx);
} else if new_head.is_none() {
new_head = Some(idx);
}
current_idx = delta.prev_idx;
} else {
break;
}
}
if !to_free_for_key.is_empty() {
updates.push((*key_digest, to_free_for_key, new_head));
}
}
}
if updates.is_empty() {
return 0;
}
let mut arena = self.arena.lock();
let mut heads = shard.heads.write();
let mut freed = 0usize;
for (key_digest, to_free, new_head) in updates {
let lookup_key = (page_number, key_digest);
for idx in &to_free {
if arena.free(*idx).is_some() {
freed += 1;
}
}
if let Some(new_head_idx) = new_head {
if let Some(entry) = heads.get_mut(&lookup_key) {
entry.head_idx = new_head_idx;
}
} else {
heads.remove(&lookup_key);
}
}
drop(heads);
drop(arena);
for _ in 0..freed {
shard.decrement_page_count(page_number);
}
self.delta_count.fetch_sub(freed as u64, Ordering::Relaxed);
debug!(
pgno = page_number.get(),
below_commit_seq = below_commit_seq.get(),
freed_count = freed,
"page_deltas_cleared"
);
freed
}
}
#[allow(clippy::missing_fields_in_debug)]
impl std::fmt::Debug for CellVisibilityLog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CellVisibilityLog")
.field("budget_bytes", &self.budget_bytes)
.field("delta_count", &self.delta_count.load(Ordering::Relaxed))
.field("cell_data_bytes", &self.cell_data_bytes())
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MutationOutcome {
CellFit,
CellRemovedPageNonEmpty,
PageSplit,
PageMerge,
OverflowChainModified,
InteriorCellReplaced,
}
impl MutationOutcome {
#[inline]
#[must_use]
pub const fn is_structural(self) -> bool {
match self {
Self::CellFit | Self::CellRemovedPageNonEmpty => false,
Self::PageSplit
| Self::PageMerge
| Self::OverflowChainModified
| Self::InteriorCellReplaced => true,
}
}
#[inline]
#[must_use]
pub const fn is_logical(self) -> bool {
!self.is_structural()
}
}
#[must_use]
pub fn can_be_logical_insert(
payload_size: usize,
local_max: usize,
page_free_space: usize,
cell_overhead: usize,
) -> bool {
if payload_size > local_max {
return false;
}
let total_cell_size = payload_size + cell_overhead;
total_cell_size <= page_free_space
}
#[inline]
#[must_use]
pub const fn will_be_logical_delete(current_cell_count: u16) -> bool {
current_cell_count > 1
}
fn encode_varint_i64(value: i64, buf: &mut [u8; 10]) -> usize {
let uval = value as u64;
encode_varint_u64(uval, buf)
}
fn encode_varint_u64(mut value: u64, buf: &mut [u8; 10]) -> usize {
let mut i = 0;
loop {
if value <= 0x7f {
buf[i] = value as u8;
return i + 1;
}
buf[i] = ((value & 0x7f) | 0x80) as u8;
value >>= 7;
i += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{TableId, TxnEpoch, TxnId};
fn test_txn_token() -> TxnToken {
TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(1))
}
#[test]
fn cell_key_from_rowid() {
let btree = BtreeRef::Table(TableId::new(1));
let key1 = CellKey::table_row(btree, 100);
let key2 = CellKey::table_row(btree, 100);
let key3 = CellKey::table_row(btree, 101);
assert_eq!(key1.key_digest, key2.key_digest, "Same rowid = same digest");
assert_ne!(
key1.key_digest, key3.key_digest,
"Different rowid = different digest"
);
}
#[test]
fn cell_delta_arena_alloc_free() {
let mut arena = CellDeltaArena::new();
let delta = CellDelta {
commit_seq: CommitSeq::new(1),
created_by: test_txn_token(),
cell_key: CellKey::table_row(BtreeRef::Table(TableId::new(1)), 100),
kind: CellDeltaKind::Insert,
page_number: PageNumber::new(42).unwrap(),
cell_data: vec![1, 2, 3, 4],
prev_idx: None,
};
let idx = arena.alloc(delta);
assert!(arena.get(idx).is_some());
let freed = arena.free(idx);
assert!(freed.is_some());
assert!(arena.get(idx).is_none());
}
#[test]
fn cell_visibility_log_basic() {
let log = CellVisibilityLog::new(1024 * 1024); let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = test_txn_token();
let idx = log
.record_insert(cell_key, page_number, vec![1, 2, 3, 4], token)
.expect("insert should succeed");
let snapshot = CommitSeq::new(10);
assert!(log.resolve(page_number, &cell_key, snapshot).is_none());
log.commit_delta(idx, CommitSeq::new(5));
let result = log.resolve(page_number, &cell_key, snapshot);
assert_eq!(result, Some(vec![1, 2, 3, 4]));
let old_snapshot = CommitSeq::new(4);
assert!(log.resolve(page_number, &cell_key, old_snapshot).is_none());
}
#[test]
fn cell_visibility_log_delete() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = test_txn_token();
let idx1 = log
.record_insert(cell_key, page_number, vec![1, 2, 3], token)
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let idx2 = log
.record_delete(cell_key, page_number, token)
.expect("delete should succeed");
log.commit_delta(idx2, CommitSeq::new(10));
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(15))
.is_none()
);
let result = log.resolve(page_number, &cell_key, CommitSeq::new(7));
assert_eq!(result, Some(vec![1, 2, 3]));
}
#[test]
fn mutation_outcome_classification() {
assert!(MutationOutcome::CellFit.is_logical());
assert!(MutationOutcome::CellRemovedPageNonEmpty.is_logical());
assert!(!MutationOutcome::CellFit.is_structural());
assert!(!MutationOutcome::CellRemovedPageNonEmpty.is_structural());
assert!(MutationOutcome::PageSplit.is_structural());
assert!(MutationOutcome::PageMerge.is_structural());
assert!(MutationOutcome::OverflowChainModified.is_structural());
assert!(MutationOutcome::InteriorCellReplaced.is_structural());
assert!(!MutationOutcome::PageSplit.is_logical());
assert!(!MutationOutcome::PageMerge.is_logical());
}
#[test]
fn can_be_logical_insert_checks() {
assert!(can_be_logical_insert(50, 200, 100, 10));
assert!(!can_be_logical_insert(50, 200, 50, 10));
assert!(!can_be_logical_insert(250, 200, 1000, 10)); }
#[test]
fn will_be_logical_delete_checks() {
assert!(will_be_logical_delete(2)); assert!(will_be_logical_delete(10)); assert!(!will_be_logical_delete(1)); assert!(!will_be_logical_delete(0)); }
fn txn_token_n(n: u32) -> TxnToken {
TxnToken::new(TxnId::new(u64::from(n)).unwrap(), TxnEpoch::new(1))
}
#[test]
fn test_uncommitted_invisible() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
log.record_insert(cell_key, page_number, vec![1, 2, 3, 4], txn_token_n(1))
.expect("insert should succeed");
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(100))
.is_none()
);
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(1))
.is_none()
);
}
#[test]
fn test_rollback_removes() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
log.record_insert(cell_key, page_number, vec![1, 2, 3, 4], token)
.expect("insert should succeed");
assert_eq!(log.delta_count(), 1);
let removed = log.rollback_txn(token);
assert_eq!(removed, 1);
assert_eq!(log.delta_count(), 0);
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(100))
.is_none()
);
}
#[test]
fn test_rollback_restores_previous_committed_version_after_same_txn_updates() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let committed = log
.record_insert(cell_key, page_number, vec![1], txn_token_n(1))
.expect("committed insert should succeed");
log.commit_delta(committed, CommitSeq::new(10));
let rollback_token = txn_token_n(2);
log.record_update(cell_key, page_number, vec![2], rollback_token)
.expect("first update should succeed");
log.record_update(cell_key, page_number, vec![3], rollback_token)
.expect("second update should succeed");
assert_eq!(log.rollback_txn(rollback_token), 2);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(20)),
Some(vec![1])
);
}
#[test]
fn test_update_versioning() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx1 = log
.record_insert(cell_key, page_number, vec![1, 1, 1], token)
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let idx2 = log
.record_update(cell_key, page_number, vec![2, 2, 2], token)
.expect("update should succeed");
log.commit_delta(idx2, CommitSeq::new(10));
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(7)),
Some(vec![1, 1, 1])
);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(12)),
Some(vec![2, 2, 2])
);
}
#[test]
fn test_multi_txn_ordering() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let idx1 = log
.record_insert(cell_key, page_number, vec![1], txn_token_n(1))
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let idx2 = log
.record_update(cell_key, page_number, vec![2], txn_token_n(2))
.expect("update should succeed");
log.commit_delta(idx2, CommitSeq::new(10));
let idx3 = log
.record_update(cell_key, page_number, vec![3], txn_token_n(3))
.expect("update should succeed");
log.commit_delta(idx3, CommitSeq::new(15));
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(7)),
Some(vec![1])
);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(12)),
Some(vec![2])
);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(20)),
Some(vec![3])
);
}
#[test]
fn test_different_cells_no_conflict() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let cell_key1 = CellKey::table_row(btree, 100);
let cell_key2 = CellKey::table_row(btree, 101);
let token1 = txn_token_n(1);
let token2 = txn_token_n(2);
log.record_insert(cell_key1, page_number, vec![1], token1)
.expect("insert should succeed");
log.record_insert(cell_key2, page_number, vec![2], token2)
.expect("insert should succeed");
assert_eq!(log.check_conflict(token1, token2), CellConflict::None);
assert_eq!(log.check_conflict(token2, token1), CellConflict::None);
}
#[test]
fn test_same_cell_conflict() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token1 = txn_token_n(1);
let token2 = txn_token_n(2);
log.record_insert(cell_key, page_number, vec![1], token1)
.expect("insert should succeed");
log.record_insert(cell_key, page_number, vec![2], token2)
.expect("insert should succeed");
assert!(matches!(
log.check_conflict(token1, token2),
CellConflict::Conflict { .. }
));
}
#[test]
fn test_gc_reclaims_old() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx1 = log
.record_insert(cell_key, page_number, vec![1, 2, 3], token)
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let idx2 = log
.record_update(cell_key, page_number, vec![4, 5, 6], token)
.expect("update should succeed");
log.commit_delta(idx2, CommitSeq::new(10));
assert_eq!(log.delta_count(), 2);
let stats = log.gc(CommitSeq::new(15));
assert_eq!(stats.reclaimed, 1);
assert_eq!(log.delta_count(), 1);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(15)),
Some(vec![4, 5, 6])
);
}
#[test]
fn test_gc_preserves_visible() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx = log
.record_insert(cell_key, page_number, vec![1, 2, 3], token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new(10));
assert_eq!(log.delta_count(), 1);
let stats = log.gc(CommitSeq::new(5));
assert_eq!(stats.reclaimed, 0);
assert_eq!(log.delta_count(), 1);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(15)),
Some(vec![1, 2, 3])
);
}
#[test]
fn test_shard_distribution() {
let mut shard_used = [false; CELL_LOG_SHARDS];
for pgno in 1..=256 {
let page_number = PageNumber::new(pgno).unwrap();
let shard_idx = CellVisibilityLog::shard_index(page_number);
shard_used[shard_idx] = true;
}
let used_count = shard_used.iter().filter(|&&x| x).count();
assert!(used_count >= CELL_LOG_SHARDS / 2);
}
#[test]
fn test_memory_tracking_accurate() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
for i in 0..10 {
let cell_key = CellKey::table_row(btree, i);
log.record_insert(cell_key, page_number, vec![0; 100], token)
.expect("insert should succeed");
}
let expected_min_bytes = 10 * 100;
let actual_bytes = log.cell_data_bytes();
assert!(
actual_bytes >= expected_min_bytes as u64,
"Expected at least {} bytes, got {}",
expected_min_bytes,
actual_bytes
);
}
#[test]
fn test_commit_txn_bulk() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
for i in 0..5 {
let cell_key = CellKey::table_row(btree, i);
log.record_insert(cell_key, page_number, vec![i as u8], token)
.expect("insert should succeed");
}
for i in 0..5 {
let cell_key = CellKey::table_row(btree, i);
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(100))
.is_none()
);
}
log.commit_txn(token, CommitSeq::new(10));
for i in 0..5 {
let cell_key = CellKey::table_row(btree, i);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(15)),
Some(vec![i as u8])
);
}
}
#[test]
fn test_per_txn_budget_exceeded() {
let log = CellVisibilityLog::with_per_txn_budget(1024 * 1024, 200);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let cell_key1 = CellKey::table_row(btree, 1);
assert!(
log.record_insert(cell_key1, page_number, vec![0; 50], token)
.is_some()
);
let cell_key2 = CellKey::table_row(btree, 2);
assert!(
log.record_insert(cell_key2, page_number, vec![0; 50], token)
.is_none()
);
}
#[test]
fn test_budget_exceeded_triggers_materialization() {
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
let materialization_triggered = Arc::new(AtomicBool::new(false));
let trigger_clone = Arc::clone(&materialization_triggered);
let mut log = CellVisibilityLog::with_per_txn_budget(200, 1024 * 1024);
log.set_materialization_callback(Box::new(move |_pgno, _cells| {
trigger_clone.store(true, AtomicOrdering::SeqCst);
}));
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
for i in 0..5 {
let cell_key = CellKey::table_row(btree, i);
log.record_insert(cell_key, page_number, vec![0; 100], token);
}
assert!(
materialization_triggered.load(AtomicOrdering::SeqCst),
"Materialization callback should be triggered when global budget exceeded"
);
}
#[test]
fn test_materialization_callback_preserves_real_cell_key() {
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
let seen_cells = Arc::new(StdMutex::new(Vec::new()));
let seen_cells_clone = Arc::clone(&seen_cells);
let mut log = CellVisibilityLog::with_per_txn_budget(64, 1024 * 1024);
log.set_materialization_callback(Box::new(move |_pgno, cells| {
seen_cells_clone.lock().unwrap().extend_from_slice(cells);
}));
let btree = BtreeRef::Table(TableId::new(7));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let cell_key = CellKey::table_row(btree, 4242);
log.record_insert(cell_key, page_number, vec![0; 100], token)
.expect("insert should succeed");
let captured = seen_cells.lock().unwrap();
assert_eq!(captured.as_slice(), &[cell_key]);
}
#[test]
fn test_find_high_delta_pages_ranks_by_total_delta_count() {
let log = CellVisibilityLog::with_per_txn_budget(1024 * 1024, 1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let hot_page = PageNumber::new(42).unwrap();
let wide_page = PageNumber::new(43).unwrap();
let hot_key = CellKey::table_row(btree, 7);
let hot_insert = log
.record_insert(hot_key, hot_page, vec![1; 32], txn_token_n(1))
.expect("hot insert should succeed");
log.commit_delta(hot_insert, CommitSeq::new(1));
let hot_update_1 = log
.record_update(hot_key, hot_page, vec![2; 32], txn_token_n(2))
.expect("first hot update should succeed");
log.commit_delta(hot_update_1, CommitSeq::new(2));
let hot_update_2 = log
.record_update(hot_key, hot_page, vec![3; 32], txn_token_n(3))
.expect("second hot update should succeed");
log.commit_delta(hot_update_2, CommitSeq::new(3));
for (rowid, txn_id) in [(100_i64, 4_u32), (101_i64, 5_u32)] {
let cell_key = CellKey::table_row(btree, rowid);
let idx = log
.record_insert(cell_key, wide_page, vec![9; 32], txn_token_n(txn_id))
.expect("wide-page insert should succeed");
log.commit_delta(idx, CommitSeq::new(u64::from(txn_id)));
}
let pages = log.find_high_delta_pages(1);
assert_eq!(pages.len(), 1);
assert_eq!(pages[0].0, hot_page);
assert_eq!(pages[0].1.as_slice(), &[hot_key]);
}
#[test]
fn c7_page_with_no_deltas_returns_false() {
let log = CellVisibilityLog::new(1024 * 1024);
let page_number = PageNumber::new(42).unwrap();
assert!(
!log.page_has_deltas(page_number),
"page without deltas should return false"
);
}
#[test]
fn c7_page_with_deltas_returns_true() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = test_txn_token();
assert!(!log.page_has_deltas(page_number));
log.record_insert(cell_key, page_number, vec![1, 2, 3], token);
assert!(
log.page_has_deltas(page_number),
"page with delta should return true"
);
}
#[test]
fn c7_page_delta_count_tracks_rollback() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = test_txn_token();
for i in 0..5 {
let cell_key = CellKey::table_row(btree, i);
log.record_insert(cell_key, page_number, vec![i as u8], token);
}
assert!(log.page_has_deltas(page_number));
log.rollback_txn(token);
assert!(
!log.page_has_deltas(page_number),
"page should have no deltas after rollback"
);
}
#[test]
fn c7_page_delta_count_tracks_gc() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = test_txn_token();
for i in 0..3 {
let cell_key = CellKey::table_row(btree, i);
let idx = log
.record_insert(cell_key, page_number, vec![i as u8], token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new((i + 1) as u64));
}
assert!(log.page_has_deltas(page_number));
let stats = log.gc(CommitSeq::new(3));
assert_eq!(
stats.reclaimed, 0,
"single-version cells should not be GC'd"
);
assert!(
log.page_has_deltas(page_number),
"page should still have deltas"
);
}
#[test]
fn c7_different_pages_tracked_separately() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_a = PageNumber::new(42).unwrap();
let page_b = PageNumber::new(43).unwrap();
let token = test_txn_token();
let cell_key = CellKey::table_row(btree, 100);
log.record_insert(cell_key, page_a, vec![1], token);
assert!(log.page_has_deltas(page_a), "page A should have deltas");
assert!(!log.page_has_deltas(page_b), "page B should have no deltas");
}
#[test]
fn c3_test_insert_update_delete_chain() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let idx1 = log
.record_insert(cell_key, page_number, vec![1, 1, 1], txn_token_n(1))
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let idx2 = log
.record_update(cell_key, page_number, vec![2, 2, 2], txn_token_n(2))
.expect("update should succeed");
log.commit_delta(idx2, CommitSeq::new(10));
let idx3 = log
.record_delete(cell_key, page_number, txn_token_n(3))
.expect("delete should succeed");
log.commit_delta(idx3, CommitSeq::new(15));
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(3))
.is_none(),
"snapshot=3 should not see cell (before insert)"
);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(7)),
Some(vec![1, 1, 1]),
"snapshot=7 should see insert version"
);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(12)),
Some(vec![2, 2, 2]),
"snapshot=12 should see update version"
);
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(20))
.is_none(),
"snapshot=20 should not see cell (deleted)"
);
}
#[test]
fn c3_test_100_txns_same_cell() {
let log = CellVisibilityLog::new(10 * 1024 * 1024); let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let idx0 = log
.record_insert(cell_key, page_number, vec![1], txn_token_n(1))
.expect("insert should succeed");
log.commit_delta(idx0, CommitSeq::new(1));
for i in 2u64..=100 {
let idx = log
.record_update(cell_key, page_number, vec![i as u8], txn_token_n(i as u32))
.expect("update should succeed");
log.commit_delta(idx, CommitSeq::new(i));
}
for i in 1u64..=100 {
let expected = vec![i as u8];
let actual = log.resolve(page_number, &cell_key, CommitSeq::new(i));
assert_eq!(
actual,
Some(expected.clone()),
"snapshot={} should see version with data={}",
i,
i
);
}
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(0))
.is_none(),
"snapshot=0 should not see any version"
);
}
#[test]
fn c3_test_exact_commit_seq_visible() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx = log
.record_insert(cell_key, page_number, vec![42], token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new(42));
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(42)),
Some(vec![42]),
"snapshot=42 (exact commit_seq) should see the cell"
);
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(41))
.is_none(),
"snapshot=41 (before commit_seq) should not see the cell"
);
}
#[test]
fn c3_test_snapshot_zero_sees_nothing() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx = log
.record_insert(cell_key, page_number, vec![1], token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new(1));
assert!(
log.resolve(page_number, &cell_key, CommitSeq::new(0))
.is_none(),
"snapshot=0 should never see any committed cell"
);
}
#[test]
fn c3_test_two_txns_update_same_cell_conflict() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let idx1 = log
.record_insert(cell_key, page_number, vec![1], txn_token_n(1))
.expect("insert should succeed");
log.commit_delta(idx1, CommitSeq::new(5));
let token2 = txn_token_n(2);
let token3 = txn_token_n(3);
log.record_update(cell_key, page_number, vec![2], token2)
.expect("update 2 should succeed");
log.record_update(cell_key, page_number, vec![3], token3)
.expect("update 3 should succeed");
assert!(
matches!(
log.check_conflict(token2, token3),
CellConflict::Conflict { .. }
),
"two txns updating same cell should conflict"
);
assert!(
matches!(
log.check_conflict(token3, token2),
CellConflict::Conflict { .. }
),
"conflict should be symmetric"
);
}
#[test]
fn c3_test_read_before_commit_then_update_conflict() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token_a = txn_token_n(1);
let token_b = txn_token_n(2);
log.record_insert(cell_key, page_number, vec![1], token_a)
.expect("insert should succeed");
log.record_insert(cell_key, page_number, vec![2], token_b)
.expect("insert should succeed");
assert!(
matches!(
log.check_conflict(token_a, token_b),
CellConflict::Conflict { .. }
),
"txn A and B both touching same cell should conflict"
);
}
#[test]
fn c3_test_different_cells_same_page_both_commit() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(47).unwrap();
let cell_key_5 = CellKey::table_row(btree, 5);
let cell_key_12 = CellKey::table_row(btree, 12);
let token_a = txn_token_n(1);
let token_b = txn_token_n(2);
let idx_a = log
.record_insert(cell_key_5, page_number, vec![5], token_a)
.expect("insert cell 5 should succeed");
let idx_b = log
.record_insert(cell_key_12, page_number, vec![12], token_b)
.expect("insert cell 12 should succeed");
assert_eq!(
log.check_conflict(token_a, token_b),
CellConflict::None,
"different cells on same page should NOT conflict"
);
log.commit_delta(idx_a, CommitSeq::new(5));
log.commit_delta(idx_b, CommitSeq::new(6));
assert_eq!(
log.resolve(page_number, &cell_key_5, CommitSeq::new(10)),
Some(vec![5]),
"cell 5 should be visible"
);
assert_eq!(
log.resolve(page_number, &cell_key_12, CommitSeq::new(10)),
Some(vec![12]),
"cell 12 should be visible"
);
}
#[test]
fn c3_test_8_txns_8_cells_all_commit() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let mut indices = Vec::new();
for i in 0..8 {
let cell_key = CellKey::table_row(btree, i);
let token = txn_token_n((i + 1) as u32);
let idx = log
.record_insert(cell_key, page_number, vec![i as u8], token)
.expect("insert should succeed");
indices.push((idx, token, i));
}
for i in 0..8 {
for j in (i + 1)..8 {
let token_i = txn_token_n((i + 1) as u32);
let token_j = txn_token_n((j + 1) as u32);
assert_eq!(
log.check_conflict(token_i, token_j),
CellConflict::None,
"txn {} and {} should not conflict (different cells)",
i + 1,
j + 1
);
}
}
for (idx, _token, i) in &indices {
log.commit_delta(*idx, CommitSeq::new((*i + 1) as u64));
}
for i in 0..8 {
let cell_key = CellKey::table_row(btree, i);
assert_eq!(
log.resolve(page_number, &cell_key, CommitSeq::new(20)),
Some(vec![i as u8]),
"cell {} should be visible",
i
);
}
}
#[test]
fn c3_test_gc_single_delta_below_horizon() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let idx = log
.record_insert(cell_key, page_number, vec![1, 2, 3], token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new(5));
assert_eq!(log.delta_count(), 1);
let stats = log.gc(CommitSeq::new(10));
assert_eq!(
stats.reclaimed, 0,
"single version should not be reclaimed even below horizon"
);
assert_eq!(log.delta_count(), 1);
let idx2 = log
.record_update(cell_key, page_number, vec![4, 5, 6], token)
.expect("update should succeed");
log.commit_delta(idx2, CommitSeq::new(15));
assert_eq!(log.delta_count(), 2);
let stats = log.gc(CommitSeq::new(20));
assert_eq!(stats.reclaimed, 1, "older version should be reclaimed");
assert_eq!(log.delta_count(), 1);
}
#[test]
fn c3_test_gc_uncommitted_never_reclaimed() {
let log = CellVisibilityLog::new(1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 100);
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
log.record_insert(cell_key, page_number, vec![1, 2, 3], token)
.expect("insert should succeed");
assert_eq!(log.delta_count(), 1);
let stats = log.gc(CommitSeq::new(1000));
assert_eq!(
stats.reclaimed, 0,
"uncommitted delta should never be reclaimed"
);
assert_eq!(log.delta_count(), 1);
}
#[test]
fn c3_test_gc_sustained_load_bounded_memory() {
let log = CellVisibilityLog::new(100 * 1024 * 1024); let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let token = txn_token_n(1);
let cell_key = CellKey::table_row(btree, 1);
let idx0 = log
.record_insert(cell_key, page_number, vec![0; 100], token)
.expect("insert should succeed");
log.commit_delta(idx0, CommitSeq::new(1));
for i in 2u64..=10_000 {
let idx = log
.record_update(cell_key, page_number, vec![(i % 256) as u8; 100], token)
.expect("update should succeed");
log.commit_delta(idx, CommitSeq::new(i));
if i % 1000 == 0 {
let horizon = CommitSeq::new(i.saturating_sub(100));
let stats = log.gc(horizon);
assert!(stats.reclaimed > 800, "GC should reclaim old versions");
}
}
let _stats = log.gc(CommitSeq::new(9_950));
assert!(
log.delta_count() < 200,
"after sustained GC, delta_count={} should be bounded",
log.delta_count()
);
}
#[test]
fn c3_test_high_contention_single_shard_no_deadlock() {
use std::sync::Arc;
use std::thread;
let log = Arc::new(CellVisibilityLog::new(10 * 1024 * 1024));
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let mut handles = Vec::new();
for t in 0..16 {
let log_clone = Arc::clone(&log);
let handle = thread::spawn(move || {
for i in 0..100 {
let cell_key = CellKey::table_row(btree, t * 1000 + i);
let token = txn_token_n((t * 1000 + i + 1) as u32);
let idx = log_clone
.record_insert(cell_key, page_number, vec![t as u8], token)
.expect("insert should succeed");
log_clone.commit_delta(idx, CommitSeq::new((t * 1000 + i + 1) as u64));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread should complete");
}
assert_eq!(log.delta_count(), 16 * 100);
}
#[test]
fn c3_test_64_thread_stress() {
use std::sync::Arc;
use std::thread;
let log = Arc::new(CellVisibilityLog::new(100 * 1024 * 1024));
let btree = BtreeRef::Table(TableId::new(1));
let mut handles = Vec::new();
for t in 0..64 {
let log_clone = Arc::clone(&log);
let handle = thread::spawn(move || {
let page_number = PageNumber::new((t % 64) + 1).unwrap();
for i in 0..50 {
let cell_key = CellKey::table_row(btree, i64::from(t * 1000 + i));
let token = txn_token_n((t * 1000 + i + 1) as u32);
let idx = log_clone
.record_insert(cell_key, page_number, vec![t as u8, i as u8], token)
.expect("insert should succeed");
log_clone.commit_delta(idx, CommitSeq::new((t * 1000 + i + 1) as u64));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().expect("thread should complete");
}
assert_eq!(log.delta_count(), 64 * 50);
}
#[test]
fn c3_test_shard_padding_alignment() {
let mut shard_counts = [0usize; CELL_LOG_SHARDS];
for pgno in 1..=1000 {
let page_number = PageNumber::new(pgno).unwrap();
let shard_idx = CellVisibilityLog::shard_index(page_number);
shard_counts[shard_idx] += 1;
}
for (idx, &count) in shard_counts.iter().enumerate() {
assert!(
count >= 10,
"shard {} only got {} pages, distribution is too skewed",
idx,
count
);
}
}
mod proptest_tests {
use super::*;
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(50))]
#[test]
fn prop_snapshot_isolation(
n_txns in 2usize..20,
cells_per_txn in 1usize..10,
) {
let log = CellVisibilityLog::new(100 * 1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let mut committed_at: Vec<(u64, Vec<i64>)> = Vec::new();
for txn_idx in 0..n_txns {
let token = TxnToken::new(
TxnId::new((txn_idx + 1) as u64).unwrap(),
TxnEpoch::new(1),
);
let commit_seq = (txn_idx + 1) as u64;
let mut cells = Vec::new();
for cell_idx in 0..cells_per_txn {
let rowid = (txn_idx * 1000 + cell_idx) as i64;
let cell_key = CellKey::table_row(btree, rowid);
let data = vec![txn_idx as u8, cell_idx as u8];
let idx = log
.record_insert(cell_key, page_number, data, token)
.expect("insert should succeed");
log.commit_delta(idx, CommitSeq::new(commit_seq));
cells.push(rowid);
}
committed_at.push((commit_seq, cells));
}
for check_snapshot in 1u64..=(n_txns as u64 + 5) {
for (commit_seq, cells) in &committed_at {
for &rowid in cells {
let cell_key = CellKey::table_row(btree, rowid);
let result = log.resolve(page_number, &cell_key, CommitSeq::new(check_snapshot));
if *commit_seq <= check_snapshot {
prop_assert!(
result.is_some(),
"cell {} committed at {} should be visible at snapshot {}",
rowid, commit_seq, check_snapshot
);
} else {
prop_assert!(
result.is_none(),
"cell {} committed at {} should NOT be visible at snapshot {}",
rowid, commit_seq, check_snapshot
);
}
}
}
}
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_conflict_detection(
n_txns in 2usize..8,
target_cell in 0i64..100,
) {
let log = CellVisibilityLog::new(100 * 1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let page_number = PageNumber::new(42).unwrap();
let cell_key = CellKey::table_row(btree, target_cell);
let mut tokens = Vec::new();
for txn_idx in 0..n_txns {
let token = TxnToken::new(
TxnId::new((txn_idx + 1) as u64).unwrap(),
TxnEpoch::new(1),
);
let data = vec![txn_idx as u8];
log.record_insert(cell_key, page_number, data, token)
.expect("insert should succeed");
tokens.push(token);
}
for i in 0..n_txns {
for j in (i + 1)..n_txns {
let conflict = log.check_conflict(tokens[i], tokens[j]);
prop_assert!(
matches!(conflict, CellConflict::Conflict { .. }),
"txn {} and {} both wrote cell {} but no conflict detected",
i, j, target_cell
);
}
}
}
}
proptest! {
#![proptest_config(ProptestConfig::with_cases(50))]
#[test]
fn prop_gc_safety(
n_versions in 5usize..50,
gc_horizon_offset in 1usize..10,
) {
let log = CellVisibilityLog::new(100 * 1024 * 1024);
let btree = BtreeRef::Table(TableId::new(1));
let cell_key = CellKey::table_row(btree, 1);
let page_number = PageNumber::new(42).unwrap();
for i in 1u64..=(n_versions as u64) {
let token = TxnToken::new(TxnId::new(i).unwrap(), TxnEpoch::new(1));
let idx = if i == 1 {
log.record_insert(cell_key, page_number, vec![i as u8], token)
.expect("insert should succeed")
} else {
log.record_update(cell_key, page_number, vec![i as u8], token)
.expect("update should succeed")
};
log.commit_delta(idx, CommitSeq::new(i));
}
let gc_horizon = n_versions.saturating_sub(gc_horizon_offset) as u64;
log.gc(CommitSeq::new(gc_horizon));
let final_snapshot = n_versions as u64;
let result = log.resolve(page_number, &cell_key, CommitSeq::new(final_snapshot));
prop_assert!(
result.is_some(),
"GC should not reclaim the version visible at snapshot {}",
final_snapshot
);
let expected_data = vec![n_versions as u8];
prop_assert_eq!(
result.as_ref(),
Some(&expected_data),
"visible version at snapshot {} should be {}",
final_snapshot, n_versions
);
let above_horizon_snapshot = gc_horizon + 1;
if above_horizon_snapshot <= final_snapshot {
let above_result = log.resolve(
page_number,
&cell_key,
CommitSeq::new(above_horizon_snapshot),
);
prop_assert!(
above_result.is_some(),
"version at snapshot {} (above GC horizon {}) should be visible",
above_horizon_snapshot, gc_horizon
);
}
}
}
}
}