use std::cell::Cell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use fsqlite_types::{
CommitSeq, PageData, PageNumber, Snapshot, TxnEpoch, TxnId, TxnToken, WitnessKey,
};
use parking_lot::{Mutex, MutexGuard};
use crate::core_types::{CommitIndex, InProcessPageLockTable, TransactionMode, TransactionState};
use crate::lifecycle::MvccError;
use crate::ssi_validation::{
ActiveTxnView, CommittedReaderInfo, CommittedWriterInfo, DiscoveredEdge, SsiAbortReason,
discover_incoming_edges, discover_outgoing_edges, evaluate_t3_dro, witness_key_page,
};
pub const MAX_CONCURRENT_WRITERS: usize = 128;
pub type SharedConcurrentHandle = Arc<Mutex<ConcurrentHandle>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FcwResult {
Clean,
Conflict {
conflicting_pages: Vec<PageNumber>,
conflicting_commit_seq: CommitSeq,
},
Abort {
reason: crate::ssi_validation::SsiAbortReason,
},
}
#[derive(Debug)]
pub struct ConcurrentHandle {
snapshot: Snapshot,
page_states: HashMap<PageNumber, PageTxnState>,
state: TransactionState,
read_set: HashSet<PageNumber>,
read_index: HashMap<PageNumber, smallvec::SmallVec<[WitnessKey; 4]>>,
global_read_witnesses: Vec<WitnessKey>,
write_index: HashMap<PageNumber, smallvec::SmallVec<[WitnessKey; 4]>>,
global_write_witnesses: Vec<WitnessKey>,
txn_token: TxnToken,
has_in_rw: Cell<bool>,
has_out_rw: Cell<bool>,
marked_for_abort: Cell<bool>,
}
#[derive(Debug, Clone, Default)]
struct PageTxnState {
staged_data: Option<PageData>,
is_freed: bool,
is_conflict_only: bool,
held_lock: bool,
}
impl PageTxnState {
#[must_use]
fn tracks_write_conflict(&self) -> bool {
self.staged_data.is_some() || self.is_freed || self.is_conflict_only
}
#[must_use]
fn is_empty(&self) -> bool {
!self.held_lock && !self.tracks_write_conflict()
}
}
pub struct WriteSetView<'a> {
page_states: &'a HashMap<PageNumber, PageTxnState>,
}
impl WriteSetView<'_> {
#[must_use]
pub fn contains_key(&self, page: &PageNumber) -> bool {
self.page_states
.get(page)
.is_some_and(|state| state.staged_data.is_some())
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.page_states
.values()
.all(|state| state.staged_data.is_none())
}
#[must_use]
pub fn len(&self) -> usize {
self.page_states
.values()
.filter(|state| state.staged_data.is_some())
.count()
}
}
pub struct HeldLocksView<'a> {
page_states: &'a HashMap<PageNumber, PageTxnState>,
}
impl HeldLocksView<'_> {
#[must_use]
pub fn contains(&self, page: &PageNumber) -> bool {
self.page_states
.get(page)
.is_some_and(|state| state.held_lock)
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.page_states.values().all(|state| !state.held_lock)
}
#[must_use]
pub fn len(&self) -> usize {
self.page_states
.values()
.filter(|state| state.held_lock)
.count()
}
}
#[derive(Debug, Clone)]
pub struct ConcurrentPageState {
page: PageNumber,
staged_data: Option<PageData>,
was_freed: bool,
was_conflict_only: bool,
held_lock: bool,
}
impl ConcurrentPageState {
#[must_use]
pub fn is_synthetic_conflict_only(&self) -> bool {
self.was_conflict_only && self.staged_data.is_none() && !self.was_freed
}
}
#[derive(Debug, Clone, Default)]
struct SavepointPageState {
staged_data: Option<PageData>,
is_freed: bool,
is_conflict_only: bool,
}
impl ConcurrentHandle {
#[must_use]
pub fn new(snapshot: Snapshot, txn_token: TxnToken) -> Self {
Self {
snapshot,
page_states: HashMap::new(),
state: TransactionState::Active,
read_set: HashSet::new(),
read_index: HashMap::new(),
global_read_witnesses: Vec::new(),
write_index: HashMap::new(),
global_write_witnesses: Vec::new(),
txn_token,
has_in_rw: Cell::new(false),
has_out_rw: Cell::new(false),
marked_for_abort: Cell::new(false),
}
}
pub fn reset_for_new_transaction(&mut self, snapshot: Snapshot, txn_token: TxnToken) {
self.snapshot = snapshot;
self.page_states.clear();
self.state = TransactionState::Active;
self.read_set.clear();
self.read_index.clear();
self.global_read_witnesses.clear();
self.write_index.clear();
self.global_write_witnesses.clear();
self.txn_token = txn_token;
self.has_in_rw.set(false);
self.has_out_rw.set(false);
self.marked_for_abort.set(false);
}
#[must_use]
pub const fn snapshot(&self) -> &Snapshot {
&self.snapshot
}
#[must_use]
pub const fn state(&self) -> TransactionState {
self.state
}
#[must_use]
pub fn write_set_pages(&self) -> smallvec::SmallVec<[PageNumber; 16]> {
let mut pages: smallvec::SmallVec<[PageNumber; 16]> =
self.tracked_write_conflict_pages_iter().collect();
pages.sort_unstable();
pages
}
fn tracked_write_conflict_pages_iter(&self) -> impl Iterator<Item = PageNumber> + '_ {
self.page_states
.iter()
.filter_map(|(&page, state)| state.tracks_write_conflict().then_some(page))
}
fn held_lock_pages_iter(&self) -> impl Iterator<Item = PageNumber> + '_ {
self.page_states
.iter()
.filter_map(|(&page, state)| state.held_lock.then_some(page))
}
#[must_use]
pub fn write_set_len(&self) -> usize {
self.page_states
.values()
.filter(|state| state.staged_data.is_some())
.count()
}
#[must_use]
pub fn write_set(&self) -> WriteSetView<'_> {
WriteSetView {
page_states: &self.page_states,
}
}
#[must_use]
pub fn is_page_freed(&self, page: PageNumber) -> bool {
self.page_states
.get(&page)
.is_some_and(|state| state.is_freed)
}
#[must_use]
pub fn tracks_write_conflict_page(&self, page: PageNumber) -> bool {
self.page_states
.get(&page)
.is_some_and(PageTxnState::tracks_write_conflict)
}
#[must_use]
pub fn held_locks(&self) -> HeldLocksView<'_> {
HeldLocksView {
page_states: &self.page_states,
}
}
#[must_use]
pub fn held_lock_pages(&self) -> Vec<PageNumber> {
let mut pages = self.held_lock_pages_iter().collect::<Vec<_>>();
pages.sort_unstable();
pages
}
#[must_use]
pub fn holds_page_lock(&self, page: PageNumber) -> bool {
self.page_states
.get(&page)
.is_some_and(|state| state.held_lock)
}
#[must_use]
pub const fn is_active(&self) -> bool {
matches!(self.state, TransactionState::Active)
}
#[must_use]
pub const fn txn_token(&self) -> TxnToken {
self.txn_token
}
pub fn mark_committed(&mut self) {
self.state = TransactionState::Committed;
}
pub fn mark_aborted(&mut self) {
self.state = TransactionState::Aborted;
}
pub fn record_read(&mut self, page: PageNumber) {
if self.read_set.insert(page) {
self.read_index
.entry(page)
.or_default()
.push(WitnessKey::Page(page));
}
}
pub fn record_read_witness(&mut self, key: WitnessKey) {
if let Some(p) = witness_key_page(&key) {
self.read_set.insert(p);
self.read_index.entry(p).or_default().push(key);
} else {
self.global_read_witnesses.push(key);
}
}
pub fn record_write_witness(&mut self, key: WitnessKey) {
if let Some(p) = witness_key_page(&key) {
let witnesses = self.write_index.entry(p).or_default();
if !witnesses.iter().any(|existing| existing == &key) {
witnesses.push(key);
}
} else if !self
.global_write_witnesses
.iter()
.any(|existing| existing == &key)
{
self.global_write_witnesses.push(key);
}
}
#[must_use]
pub fn read_set(&self) -> &HashSet<PageNumber> {
&self.read_set
}
#[must_use]
pub fn read_set_len(&self) -> usize {
self.read_set.len()
}
#[must_use]
pub fn read_witness_keys(&self) -> Vec<WitnessKey> {
let mut keys: Vec<_> = self.read_index.values().flatten().cloned().collect();
keys.extend(self.global_read_witnesses.iter().cloned());
keys
}
#[must_use]
pub fn write_witness_keys(&self) -> Vec<WitnessKey> {
let mut keys: Vec<_> = self.write_index.values().flatten().cloned().collect();
keys.extend(self.global_write_witnesses.iter().cloned());
keys
}
#[must_use]
pub fn has_global_read_witnesses(&self) -> bool {
!self.global_read_witnesses.is_empty()
}
#[must_use]
pub fn has_global_write_witnesses(&self) -> bool {
!self.global_write_witnesses.is_empty()
}
#[must_use]
pub fn has_in_rw(&self) -> bool {
self.has_in_rw.get()
}
#[must_use]
pub fn has_out_rw(&self) -> bool {
self.has_out_rw.get()
}
#[must_use]
pub fn is_marked_for_abort(&self) -> bool {
self.marked_for_abort.get()
}
fn page_state(&self, page: PageNumber) -> Option<&PageTxnState> {
self.page_states.get(&page)
}
fn ensure_page_state(&mut self, page: PageNumber) -> &mut PageTxnState {
self.page_states.entry(page).or_default()
}
fn remove_page_state_if_empty(&mut self, page: PageNumber) {
if self
.page_states
.get(&page)
.is_some_and(PageTxnState::is_empty)
{
self.page_states.remove(&page);
}
}
}
impl ActiveTxnView for ConcurrentHandle {
fn token(&self) -> TxnToken {
self.txn_token
}
fn begin_seq(&self) -> CommitSeq {
self.snapshot.high
}
fn is_active(&self) -> bool {
matches!(self.state, TransactionState::Active)
}
fn read_keys(&self) -> &[WitnessKey] {
&[]
}
fn write_keys(&self) -> &[WitnessKey] {
&[]
}
fn check_read_overlap(&self, key: &WitnessKey) -> bool {
if self
.global_read_witnesses
.iter()
.any(|w| crate::witness_plane::witness_keys_overlap(w, key))
{
return true;
}
let page = match witness_key_page(key) {
Some(p) => p,
None => return !self.read_set.is_empty() || !self.global_read_witnesses.is_empty(),
};
if !self.read_set.contains(&page) {
return false;
}
if let Some(witnesses) = self.read_index.get(&page) {
return witnesses
.iter()
.any(|w| crate::witness_plane::witness_keys_overlap(w, key));
}
true
}
fn check_write_overlap(&self, key: &WitnessKey) -> bool {
if self
.global_write_witnesses
.iter()
.any(|w| crate::witness_plane::witness_keys_overlap(w, key))
{
return true;
}
let page = match witness_key_page(key) {
Some(p) => p,
None => {
return !self.write_index.is_empty()
|| self
.page_states
.values()
.any(PageTxnState::tracks_write_conflict)
|| !self.global_write_witnesses.is_empty();
}
};
if let Some(witnesses) = self.write_index.get(&page) {
return witnesses
.iter()
.any(|w| crate::witness_plane::witness_keys_overlap(w, key));
}
self.tracks_write_conflict_page(page)
}
fn has_in_rw(&self) -> bool {
self.has_in_rw.get()
}
fn has_out_rw(&self) -> bool {
self.has_out_rw.get()
}
fn set_has_out_rw(&self, val: bool) {
self.has_out_rw.set(val);
}
fn set_has_in_rw(&self, val: bool) {
self.has_in_rw.set(val);
}
fn set_marked_for_abort(&self, val: bool) {
self.marked_for_abort.set(val);
}
}
#[derive(Debug, Clone)]
pub struct ConcurrentSavepoint {
pub name: String,
page_states_snapshot: HashMap<PageNumber, SavepointPageState>,
write_set_len: usize,
}
impl ConcurrentSavepoint {
#[must_use]
pub fn captured_len(&self) -> usize {
self.write_set_len
}
}
#[derive(Debug)]
pub struct ConcurrentRegistry {
active: HashMap<u64, SharedConcurrentHandle>,
active_snapshot_highs: HashMap<u64, CommitSeq>,
gc_horizon_counts: BTreeMap<CommitSeq, usize>,
committed_readers: Vec<CommittedReaderInfo>,
committed_readers_by_page: HashMap<PageNumber, Vec<usize>>,
committed_readers_with_global_keys: Vec<usize>,
committed_writers: Vec<CommittedWriterInfo>,
committed_writers_by_page: HashMap<PageNumber, Vec<usize>>,
committed_writers_with_global_keys: Vec<usize>,
recycled_handles: Vec<SharedConcurrentHandle>,
next_session_id: u64,
epoch_counter: u32,
}
impl ConcurrentRegistry {
#[must_use]
pub fn new() -> Self {
Self {
active: HashMap::new(),
active_snapshot_highs: HashMap::new(),
gc_horizon_counts: BTreeMap::new(),
committed_readers: Vec::new(),
committed_readers_by_page: HashMap::new(),
committed_readers_with_global_keys: Vec::new(),
committed_writers: Vec::new(),
committed_writers_by_page: HashMap::new(),
committed_writers_with_global_keys: Vec::new(),
recycled_handles: Vec::new(),
next_session_id: 1,
epoch_counter: 0,
}
}
#[must_use]
pub fn active_count(&self) -> usize {
self.active.len()
}
pub fn begin_concurrent(&mut self, snapshot: Snapshot) -> Result<u64, MvccError> {
if self.active.len() >= MAX_CONCURRENT_WRITERS {
return Err(MvccError::Busy);
}
let session_id = self.next_session_id;
self.next_session_id = self.next_session_id.wrapping_add(1);
self.epoch_counter = self.epoch_counter.wrapping_add(1);
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
let txn_token = TxnToken::new(txn_id, TxnEpoch::new(self.epoch_counter));
let handle = if let Some(handle) = self.recycled_handles.pop() {
handle.lock().reset_for_new_transaction(snapshot, txn_token);
handle
} else {
Arc::new(Mutex::new(ConcurrentHandle::new(snapshot, txn_token)))
};
self.active.insert(session_id, handle);
self.active_snapshot_highs.insert(session_id, snapshot.high);
self.increment_gc_horizon_count(snapshot.high);
Ok(session_id)
}
pub fn iter_active(&self) -> impl Iterator<Item = (u64, SharedConcurrentHandle)> + '_ {
self.active
.iter()
.map(|(&id, handle)| (id, Arc::clone(handle)))
}
#[must_use]
pub fn handle(&self, session_id: u64) -> Option<SharedConcurrentHandle> {
self.active.get(&session_id).map(Arc::clone)
}
#[must_use]
pub fn get(&self, session_id: u64) -> Option<MutexGuard<'_, ConcurrentHandle>> {
self.active.get(&session_id).map(|handle| handle.lock())
}
pub fn get_mut(&self, session_id: u64) -> Option<MutexGuard<'_, ConcurrentHandle>> {
self.active.get(&session_id).map(|handle| handle.lock())
}
pub fn remove(&mut self, session_id: u64) -> Option<SharedConcurrentHandle> {
let handle = self.active.remove(&session_id)?;
if let Some(snapshot_high) = self.active_snapshot_highs.remove(&session_id) {
self.decrement_gc_horizon_count(snapshot_high);
} else {
debug_assert!(
false,
"active concurrent session {session_id} missing cached snapshot.high"
);
}
Some(handle)
}
pub fn remove_and_recycle(&mut self, session_id: u64) -> bool {
self.remove(session_id)
.map(|handle| self.recycle_handle(handle))
.is_some()
}
pub fn recycle_handle(&mut self, handle: SharedConcurrentHandle) {
const MAX_RECYCLED_HANDLES: usize = 8;
if self.recycled_handles.len() >= MAX_RECYCLED_HANDLES || Arc::strong_count(&handle) != 1 {
return;
}
self.recycled_handles.push(handle);
}
fn can_use_uncontended_prepare_fast_path(&self, session_id: u64, begin_seq: CommitSeq) -> bool {
self.active.len() == 1
&& self.active.contains_key(&session_id)
&& self
.committed_readers
.last()
.is_none_or(|reader| reader.commit_seq <= begin_seq)
&& self
.committed_writers
.last()
.is_none_or(|writer| writer.commit_seq <= begin_seq)
}
fn can_use_uncontended_finalize_fast_path(
&self,
session_id: u64,
begin_seq: CommitSeq,
) -> bool {
(self.active.is_empty()
|| (self.active.len() == 1 && self.active.contains_key(&session_id)))
&& self
.committed_readers
.last()
.is_none_or(|reader| reader.commit_seq <= begin_seq)
&& self
.committed_writers
.last()
.is_none_or(|writer| writer.commit_seq <= begin_seq)
}
fn prune_committed_conflict_history(&mut self) {
let Some(min_active_begin) = self.history_retention_horizon() else {
self.committed_readers.clear();
self.committed_readers_by_page.clear();
self.committed_readers_with_global_keys.clear();
self.committed_writers.clear();
self.committed_writers_by_page.clear();
self.committed_writers_with_global_keys.clear();
return;
};
self.committed_readers
.retain(|reader| reader.commit_seq > min_active_begin);
self.committed_writers
.retain(|writer| writer.commit_seq > min_active_begin);
const MAX_HISTORY_ENTRIES: usize = 4096;
while self.committed_readers.len() + self.committed_writers.len() > MAX_HISTORY_ENTRIES {
let mut oldest_id = None;
let mut oldest_seq = CommitSeq::new(u64::MAX);
for (&id, handle) in &self.active {
let handle = handle.lock();
if handle.is_active()
&& !handle.is_marked_for_abort()
&& handle.snapshot.high < oldest_seq
{
oldest_seq = handle.snapshot.high;
oldest_id = Some(id);
}
}
if let Some(id) = oldest_id {
tracing::warn!(
session_id = id,
snapshot_high = oldest_seq.get(),
"prune_committed_conflict_history: marking long-running transaction for abort due to SSI history limit"
);
if let Some(handle) = self.active.get_mut(&id) {
let handle = handle.lock();
handle.set_marked_for_abort(true);
}
let new_horizon = self
.history_retention_horizon()
.unwrap_or(CommitSeq::new(u64::MAX));
self.committed_readers
.retain(|reader| reader.commit_seq > new_horizon);
self.committed_writers
.retain(|writer| writer.commit_seq > new_horizon);
} else {
tracing::warn!("prune_committed_conflict_history: forced to clear SSI history");
self.committed_readers.clear();
self.committed_readers_by_page.clear();
self.committed_readers_with_global_keys.clear();
self.committed_writers.clear();
self.committed_writers_by_page.clear();
self.committed_writers_with_global_keys.clear();
break;
}
}
let reader_count = self.committed_readers.len();
let writer_count = self.committed_writers.len();
if reader_count + writer_count > 0 {
tracing::debug!(
reader_entries = reader_count,
writer_entries = writer_count,
total = reader_count + writer_count,
active_txns = self.active.len(),
"ssi_history_status"
);
}
self.rebuild_committed_history_indexes();
}
fn rebuild_committed_history_indexes(&mut self) {
self.committed_readers_by_page.clear();
self.committed_readers_with_global_keys.clear();
for (idx, reader) in self.committed_readers.iter().enumerate() {
let summary = summarize_witness_keys(&reader.keys);
if summary.has_global_keys {
self.committed_readers_with_global_keys.push(idx);
}
for page in summary.pages {
self.committed_readers_by_page
.entry(page)
.or_default()
.push(idx);
}
}
self.committed_writers_by_page.clear();
self.committed_writers_with_global_keys.clear();
for (idx, writer) in self.committed_writers.iter().enumerate() {
let summary = summarize_witness_keys(&writer.keys);
if summary.has_global_keys {
self.committed_writers_with_global_keys.push(idx);
}
for page in summary.pages {
self.committed_writers_by_page
.entry(page)
.or_default()
.push(idx);
}
}
}
fn increment_gc_horizon_count(&mut self, snapshot_high: CommitSeq) {
*self.gc_horizon_counts.entry(snapshot_high).or_default() += 1;
}
fn decrement_gc_horizon_count(&mut self, snapshot_high: CommitSeq) {
let Some(count) = self.gc_horizon_counts.get_mut(&snapshot_high) else {
debug_assert!(
false,
"missing gc-horizon count for snapshot.high={}",
snapshot_high.get()
);
return;
};
let should_remove = if *count == 1 {
true
} else {
*count -= 1;
false
};
if should_remove {
self.gc_horizon_counts.remove(&snapshot_high);
}
}
fn index_committed_reader(&mut self, entry_idx: usize) {
let Some(reader) = self.committed_readers.get(entry_idx) else {
return;
};
let summary = summarize_witness_keys(&reader.keys);
if summary.has_global_keys {
self.committed_readers_with_global_keys.push(entry_idx);
}
for page in summary.pages {
self.committed_readers_by_page
.entry(page)
.or_default()
.push(entry_idx);
}
}
fn index_committed_writer(&mut self, entry_idx: usize) {
let Some(writer) = self.committed_writers.get(entry_idx) else {
return;
};
let summary = summarize_witness_keys(&writer.keys);
if summary.has_global_keys {
self.committed_writers_with_global_keys.push(entry_idx);
}
for page in summary.pages {
self.committed_writers_by_page
.entry(page)
.or_default()
.push(entry_idx);
}
}
fn committed_reader_candidates(
&self,
committing_txn: TxnToken,
committing_begin_seq: CommitSeq,
committing_commit_seq: CommitSeq,
write_key_summary: &WitnessKeySummary,
) -> Vec<CommittedReaderInfo> {
if self.committed_readers.is_empty() {
return Vec::new();
}
let candidate_indexes = if write_key_summary.has_global_keys {
(0..self.committed_readers.len()).collect::<Vec<_>>()
} else {
collect_indexed_candidates(
&self.committed_readers_with_global_keys,
&self.committed_readers_by_page,
&write_key_summary.pages,
)
};
let committing_begin = committing_begin_seq.get();
let committing_end = committing_commit_seq.get();
candidate_indexes
.into_iter()
.filter_map(|idx| self.committed_readers.get(idx))
.filter(|reader| {
reader.token != committing_txn
&& committing_begin < reader.commit_seq.get()
&& reader.begin_seq.get() < committing_end
})
.cloned()
.collect()
}
fn committed_writer_candidates(
&self,
committing_txn: TxnToken,
committing_begin_seq: CommitSeq,
_committing_commit_seq: CommitSeq,
read_key_summary: &WitnessKeySummary,
) -> Vec<CommittedWriterInfo> {
if self.committed_writers.is_empty() {
return Vec::new();
}
let candidate_indexes = if read_key_summary.has_global_keys {
(0..self.committed_writers.len()).collect::<Vec<_>>()
} else {
collect_indexed_candidates(
&self.committed_writers_with_global_keys,
&self.committed_writers_by_page,
&read_key_summary.pages,
)
};
let committing_begin = committing_begin_seq.get();
candidate_indexes
.into_iter()
.filter_map(|idx| self.committed_writers.get(idx))
.filter(|writer| {
writer.token != committing_txn && committing_begin < writer.commit_seq.get()
})
.cloned()
.collect()
}
#[must_use]
pub fn gc_horizon(&self) -> Option<CommitSeq> {
self.gc_horizon_counts.keys().next().copied()
}
#[must_use]
fn history_retention_horizon(&self) -> Option<CommitSeq> {
self.active
.values()
.filter_map(|handle| {
let handle = handle.lock();
(handle.is_active() && !handle.is_marked_for_abort())
.then_some(handle.snapshot.high)
})
.min()
}
}
impl Default for ConcurrentRegistry {
fn default() -> Self {
Self::new()
}
}
pub fn concurrent_prepare_write_page(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
page: PageNumber,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let (holds_lock, was_freed, was_conflict_only, has_staged_data) = handle
.page_state(page)
.map_or((false, false, false, false), |state| {
(
state.held_lock,
state.is_freed,
state.is_conflict_only,
state.staged_data.is_some(),
)
});
if has_staged_data {
debug_assert!(holds_lock, "staged write pages must retain their page lock");
debug_assert!(!was_freed, "staged write pages cannot also be marked freed");
debug_assert!(
!was_conflict_only,
"staged write pages cannot also be conflict-only"
);
return Ok(());
}
if holds_lock && (was_freed || was_conflict_only) {
let state = handle.ensure_page_state(page);
state.is_freed = false;
state.is_conflict_only = false;
handle.remove_page_state_if_empty(page);
return Ok(());
}
let already_tracked = handle.tracks_write_conflict_page(page);
if !holds_lock {
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
if lock_table.try_acquire(page, txn_id).is_err() {
handle.remove_page_state_if_empty(page);
return Err(MvccError::Busy);
}
handle.ensure_page_state(page).held_lock = true;
}
let state = handle.ensure_page_state(page);
state.is_freed = false;
state.is_conflict_only = false;
if !already_tracked {
handle.record_write_witness(fsqlite_types::WitnessKey::Page(page));
}
handle.remove_page_state_if_empty(page);
Ok(())
}
pub fn concurrent_stage_prepared_write_page(
handle: &mut ConcurrentHandle,
page: PageNumber,
data: PageData,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
debug_assert!(
handle.holds_page_lock(page),
"prepared page writes must retain their page lock before staging bytes"
);
debug_assert!(
!handle
.page_state(page)
.is_some_and(|state| state.is_conflict_only),
"prepared write staging must clear conflict-only tracking first"
);
debug_assert!(
!handle.page_state(page).is_some_and(|state| state.is_freed),
"prepared write staging must clear freed-page tracking first"
);
handle.ensure_page_state(page).staged_data = Some(data);
Ok(())
}
pub fn concurrent_write_page(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
page: PageNumber,
data: PageData,
) -> Result<(), MvccError> {
concurrent_prepare_write_page(handle, lock_table, session_id, page)?;
concurrent_stage_prepared_write_page(handle, page, data)
}
#[must_use]
pub fn concurrent_page_state(handle: &ConcurrentHandle, page: PageNumber) -> ConcurrentPageState {
let state = handle.page_state(page);
ConcurrentPageState {
page,
staged_data: state.and_then(|state| state.staged_data.clone()),
was_freed: state.is_some_and(|state| state.is_freed),
was_conflict_only: state.is_some_and(|state| state.is_conflict_only),
held_lock: state.is_some_and(|state| state.held_lock),
}
}
pub fn concurrent_restore_page_state(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
state: &ConcurrentPageState,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
{
let restored = handle.ensure_page_state(state.page);
restored.staged_data.clone_from(&state.staged_data);
restored.is_freed = state.was_freed;
restored.is_conflict_only = state.was_conflict_only;
restored.held_lock = state.held_lock;
}
if !state.held_lock {
lock_table.release(state.page, txn_id);
}
handle.remove_page_state_if_empty(state.page);
Ok(())
}
pub fn concurrent_clear_page_state(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
page: PageNumber,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
if let Some(state) = handle.page_state(page)
&& state.held_lock
{
lock_table.release(page, txn_id);
}
handle.page_states.remove(&page);
Ok(())
}
pub fn concurrent_track_write_conflict_page(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
page: PageNumber,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let already_tracked = handle.tracks_write_conflict_page(page);
let holds_lock = handle.holds_page_lock(page);
if already_tracked && holds_lock {
debug_assert!(
holds_lock,
"tracked conflict pages must retain their page lock"
);
return Ok(());
}
if holds_lock {
let state = handle.ensure_page_state(page);
if state.staged_data.is_none() && !state.is_freed {
state.is_conflict_only = true;
}
if !already_tracked {
handle.record_write_witness(fsqlite_types::WitnessKey::Page(page));
}
return Ok(());
}
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
if lock_table.try_acquire(page, txn_id).is_err() {
handle.remove_page_state_if_empty(page);
return Err(MvccError::Busy);
}
let state = handle.ensure_page_state(page);
state.held_lock = true;
if state.staged_data.is_none() && !state.is_freed {
state.is_conflict_only = true;
}
if !already_tracked {
handle.record_write_witness(fsqlite_types::WitnessKey::Page(page));
}
Ok(())
}
pub fn concurrent_free_page(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
page: PageNumber,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
if handle.is_page_freed(page) {
debug_assert!(
handle.holds_page_lock(page),
"freed pages must retain their page lock"
);
debug_assert!(
handle
.page_state(page)
.is_none_or(|state| state.staged_data.is_none()),
"freed pages cannot retain staged page bytes"
);
return Ok(());
}
if handle.holds_page_lock(page) {
let already_tracked = handle.tracks_write_conflict_page(page);
let state = handle.ensure_page_state(page);
state.staged_data = None;
state.is_conflict_only = false;
state.is_freed = true;
if !already_tracked {
handle.record_write_witness(fsqlite_types::WitnessKey::Page(page));
}
return Ok(());
}
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
let already_tracked = handle.tracks_write_conflict_page(page);
if lock_table.try_acquire(page, txn_id).is_err() {
handle.remove_page_state_if_empty(page);
return Err(MvccError::Busy);
}
let state = handle.ensure_page_state(page);
state.held_lock = true;
state.staged_data = None;
state.is_conflict_only = false;
state.is_freed = true;
if !already_tracked {
handle.record_write_witness(fsqlite_types::WitnessKey::Page(page));
}
Ok(())
}
#[must_use]
pub fn concurrent_read_page(handle: &ConcurrentHandle, page: PageNumber) -> Option<&PageData> {
handle
.page_state(page)
.and_then(|state| state.staged_data.as_ref())
}
#[must_use]
pub fn concurrent_page_is_freed(handle: &ConcurrentHandle, page: PageNumber) -> bool {
handle.is_page_freed(page)
}
pub fn validate_first_committer_wins(
handle: &ConcurrentHandle,
commit_index: &CommitIndex,
) -> FcwResult {
let snapshot_seq = handle.snapshot.high;
let mut conflicting_pages = smallvec::SmallVec::<[PageNumber; 8]>::new();
let mut max_conflicting_seq = CommitSeq::ZERO;
for page in handle.tracked_write_conflict_pages_iter() {
if let Some(committed_seq) = commit_index.latest(page) {
if committed_seq > snapshot_seq {
conflicting_pages.push(page);
if committed_seq > max_conflicting_seq {
max_conflicting_seq = committed_seq;
}
}
}
}
if conflicting_pages.is_empty() {
tracing::debug!(
write_set_size = handle.write_set_len(),
snapshot_seq = snapshot_seq.get(),
"fcw_validation: clean (no base drift)"
);
FcwResult::Clean
} else {
conflicting_pages.sort_unstable();
tracing::warn!(
conflicting_page_count = conflicting_pages.len(),
max_conflicting_seq = max_conflicting_seq.get(),
snapshot_seq = snapshot_seq.get(),
"fcw_validation: base drift detected"
);
FcwResult::Conflict {
conflicting_pages: conflicting_pages.into_vec(),
conflicting_commit_seq: max_conflicting_seq,
}
}
}
fn release_tracked_page_locks(
lock_table: &InProcessPageLockTable,
handle: &ConcurrentHandle,
txn_id: TxnId,
) {
lock_table.release_set(handle.held_lock_pages_iter(), txn_id);
}
fn merge_unique_incoming_edges(
incoming_edges: &mut Vec<DiscoveredEdge>,
seen_sources: &mut HashSet<TxnToken>,
discovered_edges: impl IntoIterator<Item = DiscoveredEdge>,
) {
for edge in discovered_edges {
if seen_sources.insert(edge.from) {
incoming_edges.push(edge);
}
}
}
fn merge_unique_outgoing_edges(
outgoing_edges: &mut Vec<DiscoveredEdge>,
seen_targets: &mut HashSet<TxnToken>,
discovered_edges: impl IntoIterator<Item = DiscoveredEdge>,
) {
for edge in discovered_edges {
if seen_targets.insert(edge.to) {
outgoing_edges.push(edge);
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SsiResult {
Clean,
Abort { reason: SsiAbortReason },
}
#[derive(Debug, Clone)]
pub struct PreparedConcurrentCommit {
session_id: u64,
planned_commit_seq: CommitSeq,
txn_token: TxnToken,
begin_seq: CommitSeq,
read_keys: Vec<WitnessKey>,
read_key_summary: WitnessKeySummary,
write_keys: Vec<WitnessKey>,
write_key_summary: WitnessKeySummary,
write_set_pages: Vec<PageNumber>,
held_lock_pages: Vec<PageNumber>,
has_in_rw: bool,
has_out_rw: bool,
incoming_edges: Vec<DiscoveredEdge>,
outgoing_edges: Vec<DiscoveredEdge>,
dro_t3_decision: Option<crate::ssi_abort_policy::DroHotPathDecision>,
used_uncontended_prepare_fast_path: bool,
}
impl PreparedConcurrentCommit {
#[must_use]
pub const fn session_id(&self) -> u64 {
self.session_id
}
#[must_use]
pub const fn planned_commit_seq(&self) -> CommitSeq {
self.planned_commit_seq
}
#[must_use]
pub const fn txn_token(&self) -> TxnToken {
self.txn_token
}
#[must_use]
pub const fn begin_seq(&self) -> CommitSeq {
self.begin_seq
}
#[must_use]
pub const fn has_in_rw(&self) -> bool {
self.has_in_rw
}
#[must_use]
pub const fn has_out_rw(&self) -> bool {
self.has_out_rw
}
#[must_use]
pub const fn used_uncontended_prepare_fast_path(&self) -> bool {
self.used_uncontended_prepare_fast_path
}
#[must_use]
pub fn read_keys(&self) -> &[WitnessKey] {
&self.read_keys
}
#[must_use]
pub fn write_keys(&self) -> &[WitnessKey] {
&self.write_keys
}
#[must_use]
pub fn read_pages(&self) -> Vec<PageNumber> {
self.read_key_summary.pages.clone()
}
#[must_use]
pub fn write_set_pages(&self) -> &[PageNumber] {
&self.write_set_pages
}
#[must_use]
pub fn held_lock_pages(&self) -> &[PageNumber] {
&self.held_lock_pages
}
#[must_use]
pub fn conflicting_txns(&self) -> Vec<TxnToken> {
let mut txns = self
.incoming_edges
.iter()
.map(|edge| edge.from)
.chain(self.outgoing_edges.iter().map(|edge| edge.to))
.collect::<Vec<_>>();
txns.sort_by(|left, right| {
left.id
.get()
.cmp(&right.id.get())
.then_with(|| left.epoch.get().cmp(&right.epoch.get()))
});
txns.dedup();
txns
}
#[must_use]
pub fn conflict_pages(&self) -> Vec<PageNumber> {
let mut pages = self
.incoming_edges
.iter()
.chain(self.outgoing_edges.iter())
.filter_map(|edge| witness_key_page(&edge.overlap_key))
.collect::<Vec<_>>();
if pages.is_empty() {
pages.extend(self.write_set_pages.iter().copied());
}
pages.sort_unstable();
pages.dedup();
pages
}
#[must_use]
pub const fn dro_t3_decision(&self) -> Option<crate::ssi_abort_policy::DroHotPathDecision> {
self.dro_t3_decision
}
}
#[derive(Debug, Clone, Copy, Default)]
struct HandleViewFlags(u8);
impl HandleViewFlags {
const ACTIVE: u8 = 1 << 0;
const GLOBAL_READ_WITNESSES: u8 = 1 << 1;
const GLOBAL_WRITE_WITNESSES: u8 = 1 << 2;
fn from_handle(handle: &ConcurrentHandle) -> Self {
let mut bits = 0;
if handle.is_active() {
bits |= Self::ACTIVE;
}
if handle.has_global_read_witnesses() {
bits |= Self::GLOBAL_READ_WITNESSES;
}
if handle.has_global_write_witnesses() {
bits |= Self::GLOBAL_WRITE_WITNESSES;
}
Self(bits)
}
const fn contains(self, flag: u8) -> bool {
self.0 & flag != 0
}
const fn is_active(self) -> bool {
self.contains(Self::ACTIVE)
}
const fn has_global_read_witnesses(self) -> bool {
self.contains(Self::GLOBAL_READ_WITNESSES)
}
const fn has_global_write_witnesses(self) -> bool {
self.contains(Self::GLOBAL_WRITE_WITNESSES)
}
}
struct HandleView {
token: TxnToken,
begin_seq: CommitSeq,
flags: HandleViewFlags,
read_pages: HashSet<PageNumber>,
write_pages: HashSet<PageNumber>,
has_in_rw: Cell<bool>,
has_out_rw: Cell<bool>,
}
impl HandleView {
fn new(handle: &ConcurrentHandle) -> Self {
let mut write_pages: HashSet<PageNumber> =
handle.tracked_write_conflict_pages_iter().collect();
write_pages.extend(handle.write_index.keys().copied());
Self {
token: handle.token(),
begin_seq: handle.begin_seq(),
flags: HandleViewFlags::from_handle(handle),
read_pages: handle.read_set().clone(),
write_pages,
has_in_rw: Cell::new(handle.has_in_rw()),
has_out_rw: Cell::new(handle.has_out_rw()),
}
}
const fn is_currently_active(&self) -> bool {
self.flags.is_active()
}
fn has_read_witnesses(&self) -> bool {
!self.read_pages.is_empty() || self.has_global_read_witnesses()
}
fn has_write_witnesses(&self) -> bool {
!self.write_pages.is_empty() || self.has_global_write_witnesses()
}
const fn has_global_read_witnesses(&self) -> bool {
self.flags.has_global_read_witnesses()
}
const fn has_global_write_witnesses(&self) -> bool {
self.flags.has_global_write_witnesses()
}
}
#[derive(Debug, Clone, Default)]
struct WitnessKeySummary {
pages: Vec<PageNumber>,
has_global_keys: bool,
}
fn summarize_witness_keys(keys: &[WitnessKey]) -> WitnessKeySummary {
let mut pages = Vec::new();
let mut has_global_keys = false;
for key in keys {
if let Some(page) = witness_key_page(key) {
pages.push(page);
} else {
has_global_keys = true;
}
}
pages.sort_unstable();
pages.dedup();
WitnessKeySummary {
pages,
has_global_keys,
}
}
fn hydrate_finalize_witness_state(
registry: &ConcurrentRegistry,
session_id: u64,
) -> Option<(
Vec<WitnessKey>,
WitnessKeySummary,
Vec<WitnessKey>,
WitnessKeySummary,
)> {
let handle = registry.get(session_id)?;
if !handle.is_active() {
return None;
}
let mut read_keys = handle.read_witness_keys();
read_keys.sort_unstable();
let read_key_summary = summarize_witness_keys(&read_keys);
let mut write_keys = handle.write_witness_keys();
write_keys.sort_unstable();
let write_key_summary = summarize_witness_keys(&write_keys);
Some((read_keys, read_key_summary, write_keys, write_key_summary))
}
fn collect_indexed_candidates(
global_indexes: &[usize],
indexes_by_page: &HashMap<PageNumber, Vec<usize>>,
pages: &[PageNumber],
) -> Vec<usize> {
let mut candidate_indexes = global_indexes.to_vec();
for &page in pages {
if let Some(indexes) = indexes_by_page.get(&page) {
candidate_indexes.extend(indexes.iter().copied());
}
}
candidate_indexes.sort_unstable();
candidate_indexes.dedup();
candidate_indexes
}
#[derive(Debug, Default)]
struct ActiveEdgeDiscoveryIndex {
all_readers: Vec<usize>,
all_writers: Vec<usize>,
readers_with_global_keys: Vec<usize>,
writers_with_global_keys: Vec<usize>,
readers_by_page: HashMap<PageNumber, Vec<usize>>,
writers_by_page: HashMap<PageNumber, Vec<usize>>,
}
impl ActiveEdgeDiscoveryIndex {
fn build(views: &[HandleView]) -> Self {
let mut index = Self::default();
for (idx, view) in views.iter().enumerate() {
if view.has_read_witnesses() {
index.all_readers.push(idx);
if view.has_global_read_witnesses() {
index.readers_with_global_keys.push(idx);
}
for &page in &view.read_pages {
index.readers_by_page.entry(page).or_default().push(idx);
}
}
if view.has_write_witnesses() {
index.all_writers.push(idx);
if view.has_global_write_witnesses() {
index.writers_with_global_keys.push(idx);
}
for &page in &view.write_pages {
index.writers_by_page.entry(page).or_default().push(idx);
}
}
}
index
}
fn incoming_candidate_refs<'a>(
&'a self,
views: &'a [HandleView],
committing_txn: TxnToken,
_committing_begin_seq: CommitSeq,
committing_commit_seq: CommitSeq,
write_key_summary: &WitnessKeySummary,
) -> Vec<&'a dyn ActiveTxnView> {
let candidate_indexes = if write_key_summary.has_global_keys {
self.all_readers.clone()
} else {
collect_indexed_candidates(
&self.readers_with_global_keys,
&self.readers_by_page,
&write_key_summary.pages,
)
};
let committing_end = committing_commit_seq.get();
candidate_indexes
.into_iter()
.filter_map(|idx| views.get(idx))
.filter(|view| {
view.token != committing_txn
&& view.is_currently_active()
&& view.begin_seq.get() < committing_end
})
.map(|view| view as &dyn ActiveTxnView)
.collect()
}
fn outgoing_candidate_refs<'a>(
&'a self,
views: &'a [HandleView],
committing_txn: TxnToken,
_committing_begin_seq: CommitSeq,
committing_commit_seq: CommitSeq,
read_key_summary: &WitnessKeySummary,
) -> Vec<&'a dyn ActiveTxnView> {
let candidate_indexes = if read_key_summary.has_global_keys {
self.all_writers.clone()
} else {
collect_indexed_candidates(
&self.writers_with_global_keys,
&self.writers_by_page,
&read_key_summary.pages,
)
};
let committing_end = committing_commit_seq.get();
candidate_indexes
.into_iter()
.filter_map(|idx| views.get(idx))
.filter(|view| {
view.token != committing_txn
&& view.is_currently_active()
&& view.begin_seq.get() < committing_end
})
.map(|view| view as &dyn ActiveTxnView)
.collect()
}
}
impl ActiveTxnView for HandleView {
fn token(&self) -> TxnToken {
self.token
}
fn begin_seq(&self) -> CommitSeq {
self.begin_seq
}
fn is_active(&self) -> bool {
self.is_currently_active()
}
fn read_keys(&self) -> &[WitnessKey] {
&[]
}
fn write_keys(&self) -> &[WitnessKey] {
&[]
}
fn check_read_overlap(&self, key: &WitnessKey) -> bool {
match key {
WitnessKey::Page(p)
| WitnessKey::Cell { btree_root: p, .. }
| WitnessKey::ByteRange { page: p, .. }
| WitnessKey::KeyRange { btree_root: p, .. } => self.read_pages.contains(p),
WitnessKey::Custom { .. } => {
!self.read_pages.is_empty() || self.has_global_read_witnesses()
}
}
}
fn check_write_overlap(&self, key: &WitnessKey) -> bool {
match key {
WitnessKey::Page(p)
| WitnessKey::Cell { btree_root: p, .. }
| WitnessKey::ByteRange { page: p, .. }
| WitnessKey::KeyRange { btree_root: p, .. } => self.write_pages.contains(p),
WitnessKey::Custom { .. } => {
!self.write_pages.is_empty() || self.has_global_write_witnesses()
}
}
}
fn has_in_rw(&self) -> bool {
self.has_in_rw.get()
}
fn has_out_rw(&self) -> bool {
self.has_out_rw.get()
}
fn set_has_out_rw(&self, val: bool) {
self.has_out_rw.set(val);
}
fn set_has_in_rw(&self, val: bool) {
self.has_in_rw.set(val);
}
fn set_marked_for_abort(&self, _val: bool) {}
}
fn evaluate_prepare_t3_dro(
txn: TxnToken,
incoming_edges: &[DiscoveredEdge],
outgoing_edges: &[DiscoveredEdge],
active_txn_count: usize,
committed_reader_count: usize,
committed_writer_count: usize,
) -> Option<crate::ssi_abort_policy::DroHotPathDecision> {
if incoming_edges.is_empty() && outgoing_edges.is_empty() {
return None;
}
let active_other_txns = active_txn_count.saturating_sub(1);
let active_reader_population = active_other_txns
.max(incoming_edges.len())
.saturating_add(committed_reader_count);
let active_writer_population = active_other_txns
.max(outgoing_edges.len())
.saturating_add(committed_writer_count);
Some(evaluate_t3_dro(
txn,
active_reader_population,
active_writer_population,
))
}
pub fn concurrent_commit(
handle: &mut ConcurrentHandle,
commit_index: &CommitIndex,
lock_table: &InProcessPageLockTable,
session_id: u64,
assign_commit_seq: CommitSeq,
) -> Result<CommitSeq, (MvccError, FcwResult)> {
if !handle.is_active() {
return Err((MvccError::InvalidState, FcwResult::Clean));
}
let txn_id = TxnId::new(session_id).ok_or((MvccError::InvalidState, FcwResult::Clean))?;
let fcw_result = validate_first_committer_wins(handle, commit_index);
match &fcw_result {
FcwResult::Clean => {
if handle.is_marked_for_abort() {
tracing::warn!(
txn = %txn_id,
"concurrent_commit: SSI marked_for_abort"
);
release_tracked_page_locks(lock_table, handle, txn_id);
handle.mark_aborted();
return Err((MvccError::BusySnapshot, FcwResult::Clean));
}
if handle.has_in_rw() && handle.has_out_rw() {
tracing::warn!(
txn = %txn_id,
"concurrent_commit: SSI pivot (in+out rw edges)"
);
release_tracked_page_locks(lock_table, handle, txn_id);
handle.mark_aborted();
return Err((MvccError::BusySnapshot, FcwResult::Clean));
}
let conflict_pages: smallvec::SmallVec<[PageNumber; 16]> =
handle.tracked_write_conflict_pages_iter().collect();
commit_index.batch_update(&conflict_pages, assign_commit_seq);
release_tracked_page_locks(lock_table, handle, txn_id);
handle.mark_committed();
Ok(assign_commit_seq)
}
FcwResult::Conflict { .. } | FcwResult::Abort { .. } => {
release_tracked_page_locks(lock_table, handle, txn_id);
handle.mark_aborted();
Err((MvccError::BusySnapshot, fcw_result))
}
}
}
#[allow(clippy::too_many_lines)]
pub fn prepare_concurrent_commit_with_ssi(
registry: &mut ConcurrentRegistry,
commit_index: &CommitIndex,
lock_table: &InProcessPageLockTable,
session_id: u64,
planned_commit_seq: CommitSeq,
) -> Result<PreparedConcurrentCommit, (MvccError, FcwResult)> {
let txn_id = TxnId::new(session_id).ok_or((MvccError::InvalidState, FcwResult::Clean))?;
let commit_view = {
let handle = registry
.get(session_id)
.ok_or((MvccError::InvalidState, FcwResult::Clean))?;
if !handle.is_active() {
return Err((MvccError::InvalidState, FcwResult::Clean));
}
let fcw_result = validate_first_committer_wins(&handle, commit_index);
if !matches!(fcw_result, FcwResult::Clean) {
Err(fcw_result)
} else {
Ok((
handle.token(),
handle.begin_seq(),
handle.write_set_pages().into_vec(),
handle.held_lock_pages(),
handle.is_marked_for_abort(),
))
}
};
let (txn, begin_seq, write_set_pages, held_lock_pages, marked_for_abort) = match commit_view {
Ok(view) => view,
Err(fcw_result) => {
if let Some(mut handle) = registry.get_mut(session_id) {
release_tracked_page_locks(lock_table, &handle, txn_id);
handle.mark_aborted();
} else {
lock_table.release_all(txn_id);
}
return Err((MvccError::BusySnapshot, fcw_result));
}
};
if marked_for_abort {
tracing::warn!(
txn = %txn_id,
"prepare_concurrent_commit_with_ssi: marked_for_abort"
);
if let Some(mut handle) = registry.get_mut(session_id) {
release_tracked_page_locks(lock_table, &handle, txn_id);
handle.mark_aborted();
} else {
lock_table.release_all(txn_id);
}
return Err((MvccError::BusySnapshot, FcwResult::Clean));
}
if registry.can_use_uncontended_prepare_fast_path(session_id, begin_seq) {
if write_set_pages.is_empty() {
let Some((sorted_read_keys, read_key_summary, sorted_write_keys, write_key_summary)) =
hydrate_finalize_witness_state(registry, session_id)
else {
if let Some(mut handle) = registry.get_mut(session_id) {
release_tracked_page_locks(lock_table, &handle, txn_id);
handle.mark_aborted();
} else {
lock_table.release_all(txn_id);
}
return Err((MvccError::InvalidState, FcwResult::Clean));
};
return Ok(PreparedConcurrentCommit {
session_id,
planned_commit_seq,
txn_token: txn,
begin_seq,
read_keys: sorted_read_keys,
read_key_summary,
write_keys: sorted_write_keys,
write_key_summary,
write_set_pages,
held_lock_pages,
has_in_rw: false,
has_out_rw: false,
incoming_edges: Vec::new(),
outgoing_edges: Vec::new(),
dro_t3_decision: None,
used_uncontended_prepare_fast_path: true,
});
}
return Ok(PreparedConcurrentCommit {
session_id,
planned_commit_seq,
txn_token: txn,
begin_seq,
read_keys: Vec::new(),
read_key_summary: WitnessKeySummary::default(),
write_keys: Vec::new(),
write_key_summary: WitnessKeySummary::default(),
write_set_pages,
held_lock_pages,
has_in_rw: false,
has_out_rw: false,
incoming_edges: Vec::new(),
outgoing_edges: Vec::new(),
dro_t3_decision: None,
used_uncontended_prepare_fast_path: true,
});
}
let Some((sorted_read_keys, _read_key_summary, sorted_write_keys, _write_key_summary)) =
hydrate_finalize_witness_state(registry, session_id)
else {
if let Some(mut handle) = registry.get_mut(session_id) {
release_tracked_page_locks(lock_table, &handle, txn_id);
handle.mark_aborted();
} else {
lock_table.release_all(txn_id);
}
return Err((MvccError::InvalidState, FcwResult::Clean));
};
let views = registry
.iter_active()
.filter_map(|(_, handle)| {
let guard = handle.lock();
guard.is_active().then_some(HandleView::new(&guard))
})
.collect::<Vec<_>>();
let active_index = ActiveEdgeDiscoveryIndex::build(&views);
let read_key_summary = summarize_witness_keys(&sorted_read_keys);
let write_key_summary = summarize_witness_keys(&sorted_write_keys);
let active_reader_candidates = active_index.incoming_candidate_refs(
&views,
txn,
begin_seq,
planned_commit_seq,
&write_key_summary,
);
let committed_reader_candidates = registry.committed_reader_candidates(
txn,
begin_seq,
planned_commit_seq,
&write_key_summary,
);
let active_writer_candidates = active_index.outgoing_candidate_refs(
&views,
txn,
begin_seq,
planned_commit_seq,
&read_key_summary,
);
let committed_writer_candidates =
registry.committed_writer_candidates(txn, begin_seq, planned_commit_seq, &read_key_summary);
let incoming_edges = discover_incoming_edges(
txn,
begin_seq,
planned_commit_seq,
&sorted_write_keys,
&active_reader_candidates,
&committed_reader_candidates,
);
let outgoing_edges = discover_outgoing_edges(
txn,
begin_seq,
planned_commit_seq,
&sorted_read_keys,
&active_writer_candidates,
&committed_writer_candidates,
);
let has_in_rw = !incoming_edges.is_empty();
let has_out_rw = !outgoing_edges.is_empty();
let dro_t3_decision = evaluate_prepare_t3_dro(
txn,
&incoming_edges,
&outgoing_edges,
registry.active.len(),
registry.committed_readers.len(),
registry.committed_writers.len(),
);
let abort_reason = if has_in_rw && has_out_rw {
Some(SsiAbortReason::Pivot)
} else if incoming_edges
.iter()
.chain(&outgoing_edges)
.any(|edge| !edge.source_is_active && edge.source_has_in_rw)
{
Some(SsiAbortReason::CommittedPivot)
} else {
None
};
if let Some(reason) = abort_reason {
tracing::warn!(
?txn,
?reason,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"SSI validation aborted"
);
return Err((MvccError::BusySnapshot, FcwResult::Abort { reason }));
}
Ok(PreparedConcurrentCommit {
session_id,
planned_commit_seq,
txn_token: txn,
begin_seq,
read_keys: sorted_read_keys,
read_key_summary,
write_keys: sorted_write_keys,
write_key_summary,
write_set_pages,
held_lock_pages,
has_in_rw,
has_out_rw,
incoming_edges,
outgoing_edges,
dro_t3_decision,
used_uncontended_prepare_fast_path: false,
})
}
#[allow(clippy::too_many_lines)]
pub fn finalize_prepared_concurrent_commit_with_ssi(
registry: &mut ConcurrentRegistry,
commit_index: &CommitIndex,
lock_table: &InProcessPageLockTable,
prepared: &PreparedConcurrentCommit,
committed_seq: CommitSeq,
) {
debug_assert!(
committed_seq >= prepared.planned_commit_seq,
"final commit sequence must not move backwards from the planning frontier"
);
let Some(txn_id) = TxnId::new(prepared.session_id) else {
return;
};
if prepared.used_uncontended_prepare_fast_path()
&& registry.can_use_uncontended_finalize_fast_path(prepared.session_id, prepared.begin_seq)
{
let mut mark_committed = false;
if let Some(handle) = registry.get_mut(prepared.session_id) {
if handle.is_active() {
handle.has_in_rw.set(false);
handle.has_out_rw.set(false);
mark_committed = true;
} else {
tracing::warn!(
session_id = prepared.session_id,
"finalize_prepared_concurrent_commit_with_ssi: uncontended fast-path session inactive during finalize; applying commit-index/lock-table side effects"
);
}
} else {
tracing::warn!(
session_id = prepared.session_id,
"finalize_prepared_concurrent_commit_with_ssi: uncontended fast-path session missing during finalize; applying commit-index/lock-table side effects"
);
}
commit_index.batch_update(&prepared.write_set_pages, committed_seq);
lock_table.release_set(prepared.held_lock_pages.iter().copied(), txn_id);
if mark_committed {
if let Some(mut handle) = registry.get_mut(prepared.session_id) {
if handle.is_active() {
handle.mark_committed();
}
}
}
registry.prune_committed_conflict_history();
return;
}
let hydrated_witness_state;
let (read_keys, read_key_summary, write_keys, write_key_summary): (
&[WitnessKey],
&WitnessKeySummary,
&[WitnessKey],
&WitnessKeySummary,
) = if prepared.used_uncontended_prepare_fast_path() {
hydrated_witness_state = hydrate_finalize_witness_state(registry, prepared.session_id)
.unwrap_or_else(|| {
(
prepared.read_keys.clone(),
prepared.read_key_summary.clone(),
prepared.write_keys.clone(),
prepared.write_key_summary.clone(),
)
});
(
&hydrated_witness_state.0,
&hydrated_witness_state.1,
&hydrated_witness_state.2,
&hydrated_witness_state.3,
)
} else {
(
&prepared.read_keys,
&prepared.read_key_summary,
&prepared.write_keys,
&prepared.write_key_summary,
)
};
let active_views: Vec<HandleView> = registry
.active
.values()
.map(|handle| {
let guard = handle.lock();
HandleView::new(&guard)
})
.collect();
let active_index = ActiveEdgeDiscoveryIndex::build(&active_views);
let active_reader_candidates = active_index.incoming_candidate_refs(
&active_views,
prepared.txn_token,
prepared.begin_seq,
committed_seq,
write_key_summary,
);
let committed_reader_candidates = registry.committed_reader_candidates(
prepared.txn_token,
prepared.begin_seq,
committed_seq,
write_key_summary,
);
let active_writer_candidates = active_index.outgoing_candidate_refs(
&active_views,
prepared.txn_token,
prepared.begin_seq,
committed_seq,
read_key_summary,
);
let committed_writer_candidates = registry.committed_writer_candidates(
prepared.txn_token,
prepared.begin_seq,
committed_seq,
read_key_summary,
);
let mut incoming_edges = prepared.incoming_edges.clone();
let mut incoming_sources = incoming_edges.iter().map(|edge| edge.from).collect();
merge_unique_incoming_edges(
&mut incoming_edges,
&mut incoming_sources,
discover_incoming_edges(
prepared.txn_token,
prepared.begin_seq,
committed_seq,
write_keys,
&active_reader_candidates,
&committed_reader_candidates,
),
);
let mut outgoing_edges = prepared.outgoing_edges.clone();
let mut outgoing_targets = outgoing_edges.iter().map(|edge| edge.to).collect();
merge_unique_outgoing_edges(
&mut outgoing_edges,
&mut outgoing_targets,
discover_outgoing_edges(
prepared.txn_token,
prepared.begin_seq,
committed_seq,
read_keys,
&active_writer_candidates,
&committed_writer_candidates,
),
);
let has_in_rw = !incoming_edges.is_empty();
let has_out_rw = !outgoing_edges.is_empty();
let dro_t3_decision = evaluate_prepare_t3_dro(
prepared.txn_token,
&incoming_edges,
&outgoing_edges,
registry.active.len(),
registry.committed_readers.len(),
registry.committed_writers.len(),
);
let should_abort_active_pivot = dro_t3_decision.is_none_or(|decision| decision.should_abort());
for edge in &incoming_edges {
if !edge.source_is_active {
continue;
}
for reader in registry.active.values() {
let reader = reader.lock();
if !reader.is_active() || reader.token() != edge.from {
continue;
}
reader.set_has_out_rw(true);
if reader.has_in_rw() {
if should_abort_active_pivot {
tracing::debug!(
pivot = ?edge.from,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"prepare/finalize T3 rule: active reader is pivot, marking for abort"
);
reader.set_marked_for_abort(true);
} else {
tracing::debug!(
pivot = ?edge.from,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"prepare/finalize T3 rule: active reader is pivot, DRO allows it to continue"
);
}
}
break;
}
}
for edge in &outgoing_edges {
if !edge.source_is_active {
continue;
}
for writer in registry.active.values() {
let writer = writer.lock();
if !writer.is_active() || writer.token() != edge.to {
continue;
}
writer.set_has_in_rw(true);
if writer.has_out_rw() {
if should_abort_active_pivot {
tracing::debug!(
pivot = ?edge.to,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"prepare/finalize T3 rule: active writer is pivot, marking for abort"
);
writer.set_marked_for_abort(true);
} else {
tracing::debug!(
pivot = ?edge.to,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"prepare/finalize T3 rule: active writer is pivot, DRO allows it to continue"
);
}
}
break;
}
}
let mut mark_committed = false;
if let Some(handle) = registry.get_mut(prepared.session_id) {
if handle.is_active() {
handle.has_in_rw.set(has_in_rw);
handle.has_out_rw.set(has_out_rw);
mark_committed = true;
} else {
tracing::warn!(
session_id = prepared.session_id,
"finalize_prepared_concurrent_commit_with_ssi: session inactive during finalize; applying commit-index/lock-table side effects"
);
}
} else {
tracing::warn!(
session_id = prepared.session_id,
"finalize_prepared_concurrent_commit_with_ssi: session missing during finalize; applying commit-index/lock-table side effects"
);
}
commit_index.batch_update(&prepared.write_set_pages, committed_seq);
lock_table.release_set(prepared.held_lock_pages.iter().copied(), txn_id);
if mark_committed {
if let Some(mut handle) = registry.get_mut(prepared.session_id) {
if handle.is_active() {
handle.mark_committed();
}
}
}
if !read_keys.is_empty() {
registry.committed_readers.push(CommittedReaderInfo {
token: prepared.txn_token,
begin_seq: prepared.begin_seq,
commit_seq: committed_seq,
had_in_rw: has_in_rw,
keys: read_keys.to_vec(),
});
registry.index_committed_reader(registry.committed_readers.len() - 1);
}
if !write_keys.is_empty() {
registry.committed_writers.push(CommittedWriterInfo {
token: prepared.txn_token,
commit_seq: committed_seq,
had_out_rw: has_out_rw,
keys: write_keys.to_vec(),
});
registry.index_committed_writer(registry.committed_writers.len() - 1);
}
registry.prune_committed_conflict_history();
}
#[allow(clippy::too_many_lines)]
pub fn concurrent_commit_with_ssi(
registry: &mut ConcurrentRegistry,
commit_index: &CommitIndex,
lock_table: &InProcessPageLockTable,
session_id: u64,
assign_commit_seq: CommitSeq,
) -> Result<CommitSeq, (MvccError, FcwResult)> {
let prepared = match prepare_concurrent_commit_with_ssi(
registry,
commit_index,
lock_table,
session_id,
assign_commit_seq,
) {
Ok(p) => p,
Err(e) => {
if let Some(shared_handle) = registry.remove(session_id) {
{
let mut handle = shared_handle.lock();
concurrent_abort(&mut handle, lock_table, session_id);
}
registry.recycle_handle(shared_handle);
}
return Err(e);
}
};
finalize_prepared_concurrent_commit_with_ssi(
registry,
commit_index,
lock_table,
&prepared,
assign_commit_seq,
);
Ok(assign_commit_seq)
}
pub fn concurrent_abort(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
) {
if let Some(txn_id) = TxnId::new(session_id) {
release_tracked_page_locks(lock_table, handle, txn_id);
}
handle.mark_aborted();
}
pub fn concurrent_savepoint(
handle: &ConcurrentHandle,
name: &str,
) -> Result<ConcurrentSavepoint, MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let page_states_snapshot = handle
.page_states
.iter()
.filter_map(|(&page, state)| {
state.tracks_write_conflict().then_some((
page,
SavepointPageState {
staged_data: state.staged_data.clone(),
is_freed: state.is_freed,
is_conflict_only: state.is_conflict_only,
},
))
})
.collect();
Ok(ConcurrentSavepoint {
name: name.to_owned(),
page_states_snapshot,
write_set_len: handle.write_set_len(),
})
}
pub fn concurrent_rollback_to_savepoint(
handle: &mut ConcurrentHandle,
lock_table: &InProcessPageLockTable,
session_id: u64,
savepoint: &ConcurrentSavepoint,
) -> Result<(), MvccError> {
if !handle.is_active() {
return Err(MvccError::InvalidState);
}
let txn_id = TxnId::new(session_id).ok_or(MvccError::InvalidState)?;
let mut reacquired_pages = Vec::new();
for &page in savepoint.page_states_snapshot.keys() {
if !handle.page_state(page).is_some_and(|state| state.held_lock) {
if lock_table.try_acquire(page, txn_id).is_err() {
for reacquired in reacquired_pages {
lock_table.release(reacquired, txn_id);
}
return Err(MvccError::Busy);
}
reacquired_pages.push(page);
}
}
let mut restored = HashMap::with_capacity(
handle
.page_states
.len()
.max(savepoint.page_states_snapshot.len()),
);
for (&page, snapshot_state) in &savepoint.page_states_snapshot {
restored.insert(
page,
PageTxnState {
staged_data: snapshot_state.staged_data.clone(),
is_freed: snapshot_state.is_freed,
is_conflict_only: snapshot_state.is_conflict_only,
held_lock: true,
},
);
}
for (&page, state) in &handle.page_states {
if state.held_lock {
restored.entry(page).or_insert_with(|| PageTxnState {
held_lock: true,
..PageTxnState::default()
});
}
}
handle.page_states = restored;
Ok(())
}
#[must_use]
pub const fn is_concurrent_mode(mode: TransactionMode) -> bool {
matches!(mode, TransactionMode::Concurrent)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use fsqlite_types::{
CommitSeq, PageData, PageNumber, PageSize, SchemaEpoch, Snapshot, TxnEpoch, TxnId,
TxnToken, WitnessKey,
};
use crate::core_types::{CommitIndex, InProcessPageLockTable};
use crate::lifecycle::MvccError;
use crate::ssi_validation::ActiveTxnView;
use super::{
ActiveEdgeDiscoveryIndex, CommittedReaderInfo, CommittedWriterInfo, ConcurrentHandle,
ConcurrentRegistry, FcwResult, HandleView, MAX_CONCURRENT_WRITERS, concurrent_abort,
concurrent_clear_page_state, concurrent_commit, concurrent_commit_with_ssi,
concurrent_free_page, concurrent_page_is_freed, concurrent_page_state,
concurrent_read_page, concurrent_restore_page_state, concurrent_rollback_to_savepoint,
concurrent_savepoint, concurrent_track_write_conflict_page, concurrent_write_page,
finalize_prepared_concurrent_commit_with_ssi, prepare_concurrent_commit_with_ssi,
summarize_witness_keys, validate_first_committer_wins,
};
fn test_snapshot(high: u64) -> Snapshot {
Snapshot {
high: CommitSeq::new(high),
schema_epoch: SchemaEpoch::ZERO,
}
}
fn test_page(n: u32) -> PageNumber {
PageNumber::new(n).expect("page number must be nonzero")
}
fn test_data() -> PageData {
PageData::zeroed(PageSize::DEFAULT)
}
fn test_token(id: u64) -> TxnToken {
TxnToken::new(
TxnId::new(id).expect("test transaction id"),
TxnEpoch::new(id as u32 + 1),
)
}
#[test]
fn test_begin_concurrent_multiple_writers() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 1");
let s2 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 2");
{
let mut h1 = registry.get_mut(s1).expect("handle 1");
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data())
.expect("write page 5");
}
{
let mut h2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut h2, &lock_table, s2, test_page(10), test_data())
.expect("write page 10");
}
{
let mut h1 = registry.get_mut(s1).expect("handle 1");
let seq1 =
concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11))
.expect("commit 1");
assert_eq!(seq1, CommitSeq::new(11));
}
{
let mut h2 = registry.get_mut(s2).expect("handle 2");
let seq2 =
concurrent_commit(&mut h2, &commit_index, &lock_table, s2, CommitSeq::new(12))
.expect("commit 2");
assert_eq!(seq2, CommitSeq::new(12));
}
}
#[test]
fn test_begin_concurrent_page_conflict_busy_snapshot() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 1");
let s2 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 2");
{
let mut h1 = registry.get_mut(s1).expect("handle 1");
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data())
.expect("s1 write page 5");
}
{
let mut h1 = registry.get_mut(s1).expect("handle 1");
concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11))
.expect("s1 commits first");
}
{
let mut h2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut h2, &lock_table, s2, test_page(5), test_data())
.expect("s2 write page 5");
}
{
let mut h2 = registry.get_mut(s2).expect("handle 2");
let result =
concurrent_commit(&mut h2, &commit_index, &lock_table, s2, CommitSeq::new(12));
assert!(result.is_err());
let (err, fcw) = result.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
assert!(matches!(fcw, FcwResult::Conflict { .. }));
}
}
#[test]
fn test_begin_concurrent_first_committer_wins() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 1");
let s2 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 2");
let s3 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 3");
{
let mut h1 = registry.get_mut(s1).expect("h1");
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
}
{
let mut h3 = registry.get_mut(s3).expect("h3");
concurrent_write_page(&mut h3, &lock_table, s3, test_page(10), test_data()).unwrap();
}
{
let mut h1 = registry.get_mut(s1).expect("h1");
concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11))
.expect("s1 commits");
}
{
let mut h2 = registry.get_mut(s2).expect("h2");
concurrent_write_page(&mut h2, &lock_table, s2, test_page(5), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).expect("h2");
let result =
concurrent_commit(&mut h2, &commit_index, &lock_table, s2, CommitSeq::new(12));
assert!(result.is_err());
let (err, _) = result.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
}
{
let mut h3 = registry.get_mut(s3).expect("h3");
let seq3 =
concurrent_commit(&mut h3, &commit_index, &lock_table, s3, CommitSeq::new(13))
.expect("s3 commits");
assert_eq!(seq3, CommitSeq::new(13));
}
}
#[test]
fn test_savepoint_within_concurrent() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(1), test_data()).unwrap();
}
let sp = {
let handle = registry.get(s1).expect("handle");
concurrent_savepoint(&handle, "sp1").unwrap()
};
assert_eq!(sp.captured_len(), 1);
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(2), test_data()).unwrap();
assert_eq!(handle.write_set_len(), 2);
}
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_rollback_to_savepoint(&mut handle, &lock_table, s1, &sp).unwrap();
assert_eq!(handle.write_set_len(), 1);
assert!(handle.held_locks().contains(&test_page(2))); }
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(3), test_data()).unwrap();
}
{
let handle = registry.get_mut(s1).expect("handle");
let mut pages = handle.write_set_pages();
pages.sort();
assert_eq!(pages.as_slice(), &[test_page(1), test_page(3)]);
}
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_commit(
&mut handle,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("commit succeeds");
}
let s2 = registry
.begin_concurrent(test_snapshot(11))
.expect("session 2");
let mut handle2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut handle2, &lock_table, s2, test_page(2), test_data())
.expect("savepoint-preserved lock must be released on commit");
}
#[test]
fn test_savepoint_within_concurrent_restores_freed_pages() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(1), test_data()).unwrap();
}
let sp = {
let handle = registry.get(s1).expect("handle");
concurrent_savepoint(&handle, "sp1").unwrap()
};
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_free_page(&mut handle, &lock_table, s1, test_page(1)).unwrap();
assert!(concurrent_page_is_freed(&handle, test_page(1)));
concurrent_rollback_to_savepoint(&mut handle, &lock_table, s1, &sp).unwrap();
assert!(!concurrent_page_is_freed(&handle, test_page(1)));
assert!(concurrent_read_page(&handle, test_page(1)).is_some());
assert_eq!(handle.write_set_pages().as_slice(), &[test_page(1)]);
}
}
#[test]
fn test_concurrent_read_local_vs_mvcc() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let handle = registry.get(s1).expect("handle");
assert!(concurrent_read_page(&handle, test_page(5)).is_none());
}
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
}
{
let handle = registry.get(s1).expect("handle");
assert!(concurrent_read_page(&handle, test_page(5)).is_some());
assert!(concurrent_read_page(&handle, test_page(6)).is_none());
}
}
#[test]
fn test_concurrent_free_page_removes_local_read_and_still_tracks_conflict_page() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
concurrent_free_page(&mut handle, &lock_table, s1, test_page(5)).unwrap();
assert!(concurrent_page_is_freed(&handle, test_page(5)));
assert!(concurrent_read_page(&handle, test_page(5)).is_none());
assert_eq!(handle.write_set_len(), 0);
assert_eq!(handle.write_set_pages().as_slice(), &[test_page(5)]);
}
#[test]
fn test_validate_first_committer_wins_considers_freed_pages() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_free_page(&mut handle, &lock_table, s1, test_page(5)).unwrap();
commit_index.update(test_page(5), CommitSeq::new(11));
assert_eq!(
validate_first_committer_wins(&handle, &commit_index),
FcwResult::Conflict {
conflicting_pages: vec![test_page(5)],
conflicting_commit_seq: CommitSeq::new(11),
}
);
}
#[test]
fn test_restore_page_state_releases_new_lock_and_clears_failed_free() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
let saved = concurrent_page_state(&handle, test_page(8));
concurrent_free_page(&mut handle, &lock_table, s1, test_page(8)).unwrap();
assert!(concurrent_page_is_freed(&handle, test_page(8)));
concurrent_restore_page_state(&mut handle, &lock_table, s1, &saved).unwrap();
assert!(!concurrent_page_is_freed(&handle, test_page(8)));
assert!(concurrent_read_page(&handle, test_page(8)).is_none());
assert!(!handle.held_locks().contains(&test_page(8)));
let other_txn = fsqlite_types::TxnId::new(999).unwrap();
assert!(
lock_table.try_acquire(test_page(8), other_txn).is_ok(),
"restoring clean state must release the transient page lock"
);
assert!(lock_table.release(test_page(8), other_txn));
}
#[test]
fn test_clear_page_state_releases_lock_and_conflict_tracking() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, PageNumber::ONE)
.unwrap();
assert!(handle.tracks_write_conflict_page(PageNumber::ONE));
assert!(handle.held_locks().contains(&PageNumber::ONE));
concurrent_clear_page_state(&mut handle, &lock_table, s1, PageNumber::ONE).unwrap();
assert!(
!handle.tracks_write_conflict_page(PageNumber::ONE),
"clearing the page state must remove the synthetic conflict surface"
);
assert!(
!handle.held_locks().contains(&PageNumber::ONE),
"clearing the page state must release the page lock"
);
let other_txn = fsqlite_types::TxnId::new(999).unwrap();
assert!(
lock_table.try_acquire(PageNumber::ONE, other_txn).is_ok(),
"clearing the page state must make the page lock available to other writers"
);
assert!(lock_table.release(PageNumber::ONE, other_txn));
}
#[test]
fn test_rollback_to_savepoint_reacquires_cleared_page_state_lock() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let savepoint = {
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, PageNumber::ONE)
.unwrap();
concurrent_savepoint(&handle, "sp1").unwrap()
};
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_clear_page_state(&mut handle, &lock_table, s1, PageNumber::ONE).unwrap();
assert!(!handle.tracks_write_conflict_page(PageNumber::ONE));
assert!(!handle.holds_page_lock(PageNumber::ONE));
concurrent_rollback_to_savepoint(&mut handle, &lock_table, s1, &savepoint).unwrap();
assert!(handle.tracks_write_conflict_page(PageNumber::ONE));
assert!(handle.holds_page_lock(PageNumber::ONE));
}
let other_txn = fsqlite_types::TxnId::new(999).unwrap();
assert!(
lock_table.try_acquire(PageNumber::ONE, other_txn).is_err(),
"rollback-to-savepoint must reacquire the cleared page lock"
);
}
#[test]
fn test_concurrent_commit_updates_commit_index_for_freed_pages() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_free_page(&mut handle, &lock_table, s1, test_page(11)).unwrap();
concurrent_commit(
&mut handle,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("commit should succeed");
assert_eq!(commit_index.latest(test_page(11)), Some(CommitSeq::new(11)));
}
#[test]
fn test_concurrent_commit_updates_commit_index_for_conflict_only_pages() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, PageNumber::ONE)
.unwrap();
concurrent_commit(
&mut handle,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("commit should succeed");
assert_eq!(
commit_index.latest(PageNumber::ONE),
Some(CommitSeq::new(11))
);
}
#[test]
fn test_concurrent_write_page_fast_path_reuses_owned_page_without_duplicate_witnesses() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let page = test_page(5);
let updated = PageData::from_vec(vec![0x7A; PageSize::DEFAULT.as_usize()]);
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, page, test_data()).unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, page, updated.clone()).unwrap();
assert_eq!(concurrent_read_page(&handle, page), Some(&updated));
assert_eq!(handle.held_locks().len(), 1);
assert_eq!(
handle
.write_witness_keys()
.iter()
.filter(
|key| matches!(key, WitnessKey::Page(witness_page) if *witness_page == page)
)
.count(),
1,
"rewriting an already-owned page should not duplicate page witnesses"
);
}
#[test]
fn test_concurrent_track_write_conflict_page_fast_path_reuses_tracked_page_without_duplicate_witnesses()
{
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let page = PageNumber::ONE;
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, page).unwrap();
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, page).unwrap();
assert!(handle.tracks_write_conflict_page(page));
assert_eq!(handle.held_locks().len(), 1);
assert_eq!(
handle
.write_witness_keys()
.iter()
.filter(
|key| matches!(key, WitnessKey::Page(witness_page) if *witness_page == page)
)
.count(),
1,
"retracking an already-owned conflict-only page should not duplicate page witnesses"
);
}
#[test]
fn test_concurrent_write_page_reuses_savepoint_preserved_lock_without_duplicate_witnesses() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let page = test_page(11);
let expected = test_data();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, page, expected.clone()).unwrap();
let savepoint = concurrent_savepoint(&handle, "sp1").unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, test_page(12), test_data()).unwrap();
concurrent_rollback_to_savepoint(&mut handle, &lock_table, s1, &savepoint).unwrap();
assert!(handle.held_locks().contains(&test_page(12)));
assert!(!handle.tracks_write_conflict_page(test_page(12)));
concurrent_write_page(
&mut handle,
&lock_table,
s1,
test_page(12),
expected.clone(),
)
.unwrap();
assert_eq!(
concurrent_read_page(&handle, test_page(12)),
Some(&expected)
);
assert_eq!(
handle
.write_witness_keys()
.iter()
.filter(
|key| matches!(key, WitnessKey::Page(witness_page) if *witness_page == test_page(12))
)
.count(),
1,
"reusing a savepoint-preserved lock should not duplicate page witnesses"
);
}
#[test]
fn test_concurrent_write_page_promotes_conflict_only_page_without_duplicate_witnesses() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let page = PageNumber::ONE;
let expected = test_data();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_track_write_conflict_page(&mut handle, &lock_table, s1, page).unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, page, expected.clone()).unwrap();
assert_eq!(concurrent_read_page(&handle, page), Some(&expected));
assert!(
!handle
.page_state(page)
.is_some_and(|state| state.is_conflict_only)
);
assert_eq!(
handle
.write_witness_keys()
.iter()
.filter(
|key| matches!(key, WitnessKey::Page(witness_page) if *witness_page == page)
)
.count(),
1,
"promoting a conflict-only page into the write set should not duplicate page witnesses"
);
}
#[test]
fn test_concurrent_write_page_rewrites_freed_page_without_duplicate_witnesses() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let page = test_page(13);
let expected = test_data();
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, page, expected.clone()).unwrap();
concurrent_free_page(&mut handle, &lock_table, s1, page).unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, page, expected.clone()).unwrap();
assert_eq!(concurrent_read_page(&handle, page), Some(&expected));
assert!(!concurrent_page_is_freed(&handle, page));
assert_eq!(
handle
.write_witness_keys()
.iter()
.filter(
|key| matches!(key, WitnessKey::Page(witness_page) if *witness_page == page)
)
.count(),
1,
"rewriting a previously freed page should not duplicate page witnesses"
);
}
#[test]
fn test_concurrent_registry_remove_keeps_shared_handle_alive_for_existing_clones() {
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let shared = registry.handle(s1).expect("shared handle");
let removed = registry.remove(s1).expect("removed handle");
assert!(Arc::ptr_eq(&shared, &removed));
assert_eq!(registry.active_count(), 0);
assert!(removed.lock().is_active());
}
#[test]
fn test_concurrent_abort_releases_locks() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, test_page(6), test_data()).unwrap();
assert_eq!(handle.held_locks().len(), 2);
drop(handle);
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_abort(&mut handle, &lock_table, s1);
assert!(!handle.is_active());
drop(handle);
let s2 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 2");
let mut handle2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut handle2, &lock_table, s2, test_page(5), test_data())
.expect("lock should be available after abort");
}
#[test]
fn test_concurrent_abort_releases_savepoint_preserved_lock() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
let savepoint = concurrent_savepoint(&handle, "sp1").unwrap();
concurrent_write_page(&mut handle, &lock_table, s1, test_page(6), test_data()).unwrap();
concurrent_rollback_to_savepoint(&mut handle, &lock_table, s1, &savepoint).unwrap();
assert!(handle.held_locks().contains(&test_page(6)));
assert!(!handle.tracks_write_conflict_page(test_page(6)));
concurrent_abort(&mut handle, &lock_table, s1);
assert!(!handle.is_active());
}
let s2 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 2");
let mut handle2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut handle2, &lock_table, s2, test_page(6), test_data())
.expect("savepoint-preserved lock should be available after abort");
}
#[test]
fn test_registry_max_concurrent_writers() {
let mut registry = ConcurrentRegistry::new();
for _ in 0..MAX_CONCURRENT_WRITERS {
registry
.begin_concurrent(test_snapshot(1))
.expect("should succeed");
}
let result = registry.begin_concurrent(test_snapshot(1));
assert_eq!(result.unwrap_err(), MvccError::Busy);
}
#[test]
fn test_fcw_validation_clean() {
let commit_index = CommitIndex::new();
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
}
let handle = registry.get(s1).expect("handle");
assert_eq!(
validate_first_committer_wins(&handle, &commit_index),
FcwResult::Clean
);
}
#[test]
fn test_fcw_validation_conflict() {
let commit_index = CommitIndex::new();
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
commit_index.update(test_page(5), CommitSeq::new(15));
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_write_page(&mut handle, &lock_table, s1, test_page(5), test_data()).unwrap();
}
let handle = registry.get(s1).expect("handle");
let result = validate_first_committer_wins(&handle, &commit_index);
match result {
FcwResult::Conflict {
conflicting_pages,
conflicting_commit_seq,
} => {
assert_eq!(conflicting_pages, vec![test_page(5)]);
assert_eq!(conflicting_commit_seq, CommitSeq::new(15));
}
FcwResult::Clean => panic!("expected conflict"),
FcwResult::Abort { .. } => panic!("expected conflict, got abort"),
}
}
#[test]
fn test_busy_snapshot_vs_busy() {
assert_ne!(MvccError::BusySnapshot, MvccError::Busy);
assert_eq!(
format!("{}", MvccError::BusySnapshot),
"SQLITE_BUSY_SNAPSHOT"
);
assert_eq!(format!("{}", MvccError::Busy), "SQLITE_BUSY");
}
#[test]
fn test_concurrent_session_lifecycle() {
let mut registry = ConcurrentRegistry::new();
assert_eq!(registry.active_count(), 0);
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
assert_eq!(registry.active_count(), 1);
let handle = registry.get(s1).expect("handle");
assert!(handle.is_active());
drop(handle);
let removed = registry.remove(s1);
assert!(removed.is_some());
assert_eq!(registry.active_count(), 0);
}
#[test]
fn test_marked_for_abort_txn_still_pins_gc_but_not_history_retention() {
let mut registry = ConcurrentRegistry::new();
let doomed = registry
.begin_concurrent(test_snapshot(10))
.expect("doomed session");
let survivor = registry
.begin_concurrent(test_snapshot(20))
.expect("survivor session");
registry
.get_mut(doomed)
.expect("doomed handle")
.set_marked_for_abort(true);
let survivor_token = registry.get(survivor).expect("survivor handle").txn_token();
registry
.committed_readers
.push(crate::ssi_validation::CommittedReaderInfo {
token: survivor_token,
begin_seq: CommitSeq::new(20),
commit_seq: CommitSeq::new(15),
had_in_rw: false,
keys: vec![WitnessKey::Page(test_page(7))],
});
assert_eq!(
registry.gc_horizon(),
Some(CommitSeq::new(10)),
"marked-for-abort transactions remain active until they actually abort"
);
assert_eq!(
registry.history_retention_horizon(),
Some(CommitSeq::new(20)),
"committed SSI history may ignore transactions already doomed to abort"
);
registry.prune_committed_conflict_history();
assert!(
registry.committed_readers.is_empty(),
"history older than the surviving retention horizon should be pruned"
);
}
#[test]
fn test_gc_horizon_cache_advances_on_remove_and_is_conservative_until_then() {
let mut registry = ConcurrentRegistry::new();
let earlier = registry
.begin_concurrent(test_snapshot(10))
.expect("earlier session");
let later = registry
.begin_concurrent(test_snapshot(20))
.expect("later session");
assert_eq!(registry.gc_horizon(), Some(CommitSeq::new(10)));
registry
.get_mut(earlier)
.expect("earlier handle")
.mark_committed();
assert_eq!(
registry.gc_horizon(),
Some(CommitSeq::new(10)),
"cached horizon remains conservative until the committed session is removed"
);
let removed = registry.remove(earlier);
assert!(removed.is_some());
assert_eq!(registry.gc_horizon(), Some(CommitSeq::new(20)));
let removed = registry.remove(later);
assert!(removed.is_some());
assert_eq!(registry.gc_horizon(), None);
}
#[test]
fn test_can_use_uncontended_prepare_fast_path_only_for_single_active_session_without_overlap() {
let mut registry = ConcurrentRegistry::new();
let session_id = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
assert!(
registry.can_use_uncontended_prepare_fast_path(session_id, CommitSeq::new(10)),
"single active session with no newer committed history should use the fast path"
);
let other_session_id = registry
.begin_concurrent(test_snapshot(10))
.expect("other session");
assert!(
!registry.can_use_uncontended_prepare_fast_path(session_id, CommitSeq::new(10)),
"multiple active sessions must force full SSI edge discovery"
);
registry
.remove(other_session_id)
.expect("remove other session");
let session_token = registry.get(session_id).expect("handle").txn_token();
registry
.committed_writers
.push(crate::ssi_validation::CommittedWriterInfo {
token: session_token,
commit_seq: CommitSeq::new(11),
had_out_rw: false,
keys: vec![WitnessKey::Page(test_page(7))],
});
assert!(
!registry.can_use_uncontended_prepare_fast_path(session_id, CommitSeq::new(10)),
"committed history newer than the begin sequence must disable the fast path"
);
}
#[test]
fn test_begin_concurrent_reuses_recycled_handle_and_clears_state() {
let mut registry = ConcurrentRegistry::new();
let session1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut handle = registry.get_mut(session1).unwrap();
handle.record_read(test_page(5));
handle.record_write_witness(WitnessKey::Page(test_page(7)));
handle.has_in_rw.set(true);
handle.has_out_rw.set(true);
handle.set_marked_for_abort(true);
handle.mark_aborted();
}
let removed = registry.remove(session1).unwrap();
let recycled_ptr = Arc::as_ptr(&removed);
registry.recycle_handle(removed);
let session2 = registry.begin_concurrent(test_snapshot(20)).unwrap();
let handle = registry.handle(session2).unwrap();
assert_eq!(Arc::as_ptr(&handle), recycled_ptr);
let handle = handle.lock();
assert_eq!(handle.snapshot().high, CommitSeq::new(20));
assert!(handle.is_active());
assert!(handle.read_set().is_empty());
assert!(handle.write_set_pages().is_empty());
assert!(!handle.has_in_rw());
assert!(!handle.has_out_rw());
assert!(!handle.is_marked_for_abort());
assert_eq!(handle.txn_token().id.get(), session2);
}
#[test]
fn test_prepare_concurrent_commit_with_ssi_uses_uncontended_fast_path_after_stale_history_is_pruned()
{
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let seed_session = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut seed = registry.get_mut(seed_session).unwrap();
seed.record_read(test_page(3));
concurrent_write_page(
&mut seed,
&lock_table,
seed_session,
test_page(5),
test_data(),
)
.unwrap();
}
concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
seed_session,
CommitSeq::new(11),
)
.unwrap();
registry
.remove(seed_session)
.expect("remove committed seed");
assert!(
registry.committed_writers.is_empty(),
"once no active sessions remain, stale committed history should already be pruned"
);
let session_id = registry.begin_concurrent(test_snapshot(11)).unwrap();
{
let mut handle = registry.get_mut(session_id).unwrap();
handle.record_read(test_page(7));
concurrent_write_page(
&mut handle,
&lock_table,
session_id,
test_page(9),
test_data(),
)
.unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
session_id,
CommitSeq::new(12),
)
.expect("prepare should succeed");
assert!(
prepared.conflicting_txns().is_empty(),
"the uncontended fast path should not manufacture SSI conflicts"
);
assert!(
prepared.conflict_pages().contains(&test_page(9)),
"the plan should still carry the write set pages forward to finalize"
);
assert_eq!(
prepared.dro_t3_decision(),
None,
"uncontended fast path should bypass DRO evaluation"
);
assert!(
prepared.used_uncontended_prepare_fast_path(),
"prepare should tag uncontended plans so finalize can re-check for the matching fast path"
);
assert!(
prepared.read_keys().is_empty() && prepared.write_keys().is_empty(),
"uncontended prepare should defer witness materialization until slow finalize actually needs it"
);
}
#[test]
fn test_finalize_uncontended_fast_path_skips_ssi_history_when_still_uncontended() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let session_id = registry.begin_concurrent(test_snapshot(11)).unwrap();
{
let mut handle = registry.get_mut(session_id).unwrap();
handle.record_read(test_page(7));
concurrent_write_page(
&mut handle,
&lock_table,
session_id,
test_page(9),
test_data(),
)
.unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
session_id,
CommitSeq::new(12),
)
.expect("prepare should succeed");
assert!(
prepared.used_uncontended_prepare_fast_path(),
"setup should hit the uncontended prepare fast path"
);
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(12),
);
let committed = registry.get(session_id).expect("committed handle");
assert!(
!committed.has_in_rw() && !committed.has_out_rw(),
"uncontended finalize should not manufacture SSI edges"
);
drop(committed);
assert_eq!(
commit_index.latest(test_page(9)),
Some(CommitSeq::new(12)),
"finalize must still publish the committed page version"
);
assert!(
registry.committed_readers.is_empty(),
"uncontended finalize should skip committed reader history publication"
);
assert!(
registry.committed_writers.is_empty(),
"uncontended finalize should skip committed writer history publication"
);
let next_session = registry.begin_concurrent(test_snapshot(12)).unwrap();
let mut next_handle = registry.get_mut(next_session).unwrap();
concurrent_write_page(
&mut next_handle,
&lock_table,
next_session,
test_page(9),
test_data(),
)
.expect("uncontended finalize must still release the page lock");
}
#[test]
fn test_finalize_rehydrates_deferred_witnesses_when_fast_path_plan_loses_eligibility() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let session_id = registry.begin_concurrent(test_snapshot(11)).unwrap();
{
let mut handle = registry.get_mut(session_id).unwrap();
handle.record_read(test_page(7));
concurrent_write_page(
&mut handle,
&lock_table,
session_id,
test_page(9),
test_data(),
)
.unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
session_id,
CommitSeq::new(12),
)
.expect("prepare should succeed");
assert!(
prepared.used_uncontended_prepare_fast_path(),
"setup should hit the uncontended prepare fast path"
);
assert!(
prepared.read_keys().is_empty() && prepared.write_keys().is_empty(),
"prepare should not materialize witness vectors on the uncontended path"
);
let blocker = registry.begin_concurrent(test_snapshot(11)).unwrap();
{
let blocker_handle = registry.get(blocker).unwrap();
assert!(blocker_handle.is_active(), "blocker txn should stay active");
}
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(12),
);
assert_eq!(
registry.committed_readers.len(),
1,
"slow finalize fallback should rehydrate deferred read witnesses before publishing history"
);
assert_eq!(
registry.committed_writers.len(),
1,
"slow finalize fallback should rehydrate deferred write witnesses before publishing history"
);
}
#[test]
fn test_finalize_releases_locks_and_updates_commit_index_when_session_missing() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session 1");
{
let mut h1 = registry.get_mut(s1).expect("handle 1");
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data())
.expect("session 1 writes page 5");
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("prepare should succeed");
let removed = registry.remove(s1);
assert!(
removed.is_some(),
"session should be removable to simulate handle disappearance"
);
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(11),
);
assert_eq!(
commit_index.latest(test_page(5)),
Some(CommitSeq::new(11)),
"finalize must update commit index even if handle is missing"
);
let s2 = registry
.begin_concurrent(test_snapshot(11))
.expect("session 2");
let mut h2 = registry.get_mut(s2).expect("handle 2");
concurrent_write_page(&mut h2, &lock_table, s2, test_page(5), test_data())
.expect("page lock should be released during finalize");
}
#[test]
fn test_finalize_rechecks_committed_writers_that_commit_after_prepare() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(7));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(9), test_data()).unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("prepare should succeed before the late writer commits");
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h2 = registry.get_mut(s2).unwrap();
concurrent_write_page(&mut h2, &lock_table, s2, test_page(7), test_data()).unwrap();
}
concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(11),
)
.expect("late writer should commit before the prepared txn finalizes");
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(12),
);
let committed = registry
.get(s1)
.expect("prepared txn handle should remain present until explicit removal");
assert!(
committed.has_out_rw(),
"finalize must discover committed writers that were not present during prepare"
);
}
#[test]
fn test_finalize_rechecks_committed_readers_that_commit_after_prepare() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(3));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(9), test_data()).unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("prepare should succeed before the late reader commits");
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(9));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(11), test_data()).unwrap();
}
concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(11),
)
.expect("late reader should commit before the prepared txn finalizes");
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(12),
);
let committed = registry
.get(s1)
.expect("prepared txn handle should remain present until explicit removal");
assert!(
committed.has_in_rw(),
"finalize must discover committed readers that were not present during prepare"
);
}
#[test]
fn test_committed_reader_candidates_include_matching_page_and_global_entries_only() {
let mut registry = ConcurrentRegistry::new();
let relevant = test_token(101);
let global = test_token(102);
let unrelated = test_token(103);
registry.committed_readers.push(CommittedReaderInfo {
token: unrelated,
begin_seq: CommitSeq::new(5),
commit_seq: CommitSeq::new(12),
had_in_rw: false,
keys: vec![WitnessKey::Page(test_page(99))],
});
registry.index_committed_reader(0);
registry.committed_readers.push(CommittedReaderInfo {
token: relevant,
begin_seq: CommitSeq::new(5),
commit_seq: CommitSeq::new(12),
had_in_rw: false,
keys: vec![WitnessKey::Page(test_page(7))],
});
registry.index_committed_reader(1);
registry.committed_readers.push(CommittedReaderInfo {
token: global,
begin_seq: CommitSeq::new(5),
commit_seq: CommitSeq::new(12),
had_in_rw: false,
keys: vec![WitnessKey::Custom {
namespace: 7,
bytes: b"global-reader".to_vec(),
}],
});
registry.index_committed_reader(2);
let summary = summarize_witness_keys(&[WitnessKey::Page(test_page(7))]);
let mut candidates = registry
.committed_reader_candidates(
test_token(999),
CommitSeq::new(10),
CommitSeq::new(20),
&summary,
)
.into_iter()
.map(|reader| reader.token)
.collect::<Vec<_>>();
candidates.sort_by_key(|token| token.id.get());
assert_eq!(candidates, vec![relevant, global]);
}
#[test]
fn test_committed_writer_candidates_include_matching_page_and_global_entries_only() {
let mut registry = ConcurrentRegistry::new();
let relevant = test_token(201);
let global = test_token(202);
let unrelated = test_token(203);
registry.committed_writers.push(CommittedWriterInfo {
token: unrelated,
commit_seq: CommitSeq::new(12),
had_out_rw: false,
keys: vec![WitnessKey::Page(test_page(88))],
});
registry.index_committed_writer(0);
registry.committed_writers.push(CommittedWriterInfo {
token: relevant,
commit_seq: CommitSeq::new(12),
had_out_rw: false,
keys: vec![WitnessKey::Page(test_page(9))],
});
registry.index_committed_writer(1);
registry.committed_writers.push(CommittedWriterInfo {
token: global,
commit_seq: CommitSeq::new(12),
had_out_rw: false,
keys: vec![WitnessKey::Custom {
namespace: 9,
bytes: b"global-writer".to_vec(),
}],
});
registry.index_committed_writer(2);
let summary = summarize_witness_keys(&[WitnessKey::Page(test_page(9))]);
let mut candidates = registry
.committed_writer_candidates(
test_token(999),
CommitSeq::new(10),
CommitSeq::new(20),
&summary,
)
.into_iter()
.map(|writer| writer.token)
.collect::<Vec<_>>();
candidates.sort_by_key(|token| token.id.get());
assert_eq!(candidates, vec![relevant, global]);
}
#[test]
fn test_handle_view_custom_overlap_respects_global_witnesses() {
let mut handle = ConcurrentHandle::new(test_snapshot(10), test_token(301));
handle.record_read_witness(WitnessKey::Custom {
namespace: 3,
bytes: b"read-global".to_vec(),
});
handle.record_write_witness(WitnessKey::Custom {
namespace: 4,
bytes: b"write-global".to_vec(),
});
let view = HandleView::new(&handle);
assert!(view.check_read_overlap(&WitnessKey::Custom {
namespace: 5,
bytes: b"candidate".to_vec(),
}));
assert!(view.check_write_overlap(&WitnessKey::Custom {
namespace: 6,
bytes: b"candidate".to_vec(),
}));
}
#[test]
fn test_page_write_witness_preserves_overlap_without_claiming_lock() {
let page = test_page(17);
let mut handle = ConcurrentHandle::new(test_snapshot(10), test_token(302));
handle.record_write_witness(WitnessKey::Page(page));
assert!(
!handle.holds_page_lock(page),
"page-scoped SSI witnesses must not claim physical page-lock ownership"
);
assert!(handle.check_write_overlap(&WitnessKey::Page(page)));
assert!(!handle.check_write_overlap(&WitnessKey::Page(test_page(18))));
}
#[test]
fn test_active_edge_discovery_index_indexes_page_write_witnesses_without_page_state() {
let mut page_writer = ConcurrentHandle::new(test_snapshot(10), test_token(303));
page_writer.record_write_witness(WitnessKey::Page(test_page(7)));
assert!(
!page_writer.holds_page_lock(test_page(7)),
"page-scoped SSI witnesses must not manufacture a held page lock"
);
let mut global_writer = ConcurrentHandle::new(test_snapshot(10), test_token(304));
global_writer.record_write_witness(WitnessKey::Custom {
namespace: 8,
bytes: b"global-writer".to_vec(),
});
let mut unrelated_writer = ConcurrentHandle::new(test_snapshot(10), test_token(305));
unrelated_writer.record_write_witness(WitnessKey::Page(test_page(9)));
let views = vec![
HandleView::new(&page_writer),
HandleView::new(&global_writer),
HandleView::new(&unrelated_writer),
];
let index = ActiveEdgeDiscoveryIndex::build(&views);
let read_summary = summarize_witness_keys(&[WitnessKey::Page(test_page(7))]);
let mut candidate_tokens = index
.outgoing_candidate_refs(
&views,
test_token(399),
CommitSeq::new(10),
CommitSeq::new(20),
&read_summary,
)
.into_iter()
.map(|candidate| candidate.token())
.collect::<Vec<_>>();
candidate_tokens.sort_by_key(|token| token.id.get());
assert_eq!(candidate_tokens, vec![test_token(303), test_token(304)]);
}
#[test]
fn test_active_edge_discovery_index_keeps_tracked_write_pages_without_write_index() {
let mut tracked_writer = ConcurrentHandle::new(test_snapshot(10), test_token(306));
tracked_writer
.ensure_page_state(test_page(11))
.is_conflict_only = true;
assert!(tracked_writer.tracks_write_conflict_page(test_page(11)));
assert!(tracked_writer.write_index.is_empty());
let views = vec![HandleView::new(&tracked_writer)];
let index = ActiveEdgeDiscoveryIndex::build(&views);
let read_summary = summarize_witness_keys(&[WitnessKey::Page(test_page(11))]);
let candidate_tokens = index
.outgoing_candidate_refs(
&views,
test_token(399),
CommitSeq::new(10),
CommitSeq::new(20),
&read_summary,
)
.into_iter()
.map(|candidate| candidate.token())
.collect::<Vec<_>>();
assert_eq!(candidate_tokens, vec![test_token(306)]);
}
#[test]
fn test_active_edge_discovery_index_uses_presence_flags_without_materialized_keys() {
let mut page_reader = ConcurrentHandle::new(test_snapshot(10), test_token(311));
page_reader.record_read(test_page(7));
let mut global_reader = ConcurrentHandle::new(test_snapshot(10), test_token(312));
global_reader.record_read_witness(WitnessKey::Custom {
namespace: 7,
bytes: b"global-reader".to_vec(),
});
let mut unrelated_reader = ConcurrentHandle::new(test_snapshot(10), test_token(313));
unrelated_reader.record_read(test_page(9));
let views = vec![
HandleView::new(&page_reader),
HandleView::new(&global_reader),
HandleView::new(&unrelated_reader),
];
let index = ActiveEdgeDiscoveryIndex::build(&views);
let write_summary = summarize_witness_keys(&[WitnessKey::Page(test_page(7))]);
let mut candidate_tokens = index
.incoming_candidate_refs(
&views,
test_token(399),
CommitSeq::new(10),
CommitSeq::new(20),
&write_summary,
)
.into_iter()
.map(|candidate| candidate.token())
.collect::<Vec<_>>();
candidate_tokens.sort_by_key(|token| token.id.get());
assert_eq!(candidate_tokens, vec![test_token(311), test_token(312)]);
}
#[test]
fn test_prepare_materializes_dro_decision_for_edgeful_commit() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(20));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(10), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(30));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(20), test_data()).unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("prepare should succeed");
let decision = prepared
.dro_t3_decision()
.expect("edgeful prepare should materialize a DRO decision");
assert_eq!(decision.active_readers, 1);
assert_eq!(decision.active_writers, 1);
assert!(decision.threshold >= 0.0);
}
#[test]
fn test_prepare_skips_dro_decision_for_edge_free_commit() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("prepare should succeed");
assert!(
prepared.dro_t3_decision().is_none(),
"edge-free prepare should not emit a DRO decision"
);
}
#[test]
fn test_prepare_captures_held_lock_pages_separately_from_write_set() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let session_id = registry.begin_concurrent(test_snapshot(10)).unwrap();
let txn_id = TxnId::new(session_id).unwrap();
let extra_lock_page = test_page(99);
{
let mut handle = registry.get_mut(session_id).unwrap();
concurrent_write_page(
&mut handle,
&lock_table,
session_id,
test_page(5),
test_data(),
)
.unwrap();
lock_table.try_acquire(extra_lock_page, txn_id).unwrap();
handle.ensure_page_state(extra_lock_page).held_lock = true;
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
session_id,
CommitSeq::new(11),
)
.expect("prepare should succeed");
assert_eq!(prepared.write_set_pages(), &[test_page(5)]);
assert_eq!(prepared.held_lock_pages(), &[test_page(5), extra_lock_page]);
}
#[test]
fn test_finalize_releases_prepared_held_lock_pages_when_session_missing() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let session_id = registry.begin_concurrent(test_snapshot(10)).unwrap();
let txn_id = TxnId::new(session_id).unwrap();
let extra_lock_page = test_page(101);
{
let mut handle = registry.get_mut(session_id).unwrap();
concurrent_write_page(
&mut handle,
&lock_table,
session_id,
test_page(7),
test_data(),
)
.unwrap();
lock_table.try_acquire(extra_lock_page, txn_id).unwrap();
handle.ensure_page_state(extra_lock_page).held_lock = true;
}
let prepared = prepare_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
session_id,
CommitSeq::new(11),
)
.expect("prepare should succeed");
let shared_handle = registry.remove(session_id).expect("active handle");
registry.recycle_handle(shared_handle);
let other_txn = TxnId::new(999).unwrap();
assert!(
lock_table.try_acquire(extra_lock_page, other_txn).is_err(),
"extra held lock should still be owned before finalize"
);
finalize_prepared_concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
&prepared,
CommitSeq::new(11),
);
assert!(
lock_table.try_acquire(extra_lock_page, other_txn).is_ok(),
"finalize should release held locks captured at prepare time"
);
assert!(
lock_table.try_acquire(test_page(7), other_txn).is_ok(),
"finalize should also release normal write-set locks"
);
}
#[test]
fn test_operations_on_inactive_handle() {
let lock_table = InProcessPageLockTable::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
{
let mut handle = registry.get_mut(s1).expect("handle");
concurrent_abort(&mut handle, &lock_table, s1);
}
{
let mut handle = registry.get_mut(s1).expect("handle");
let result =
concurrent_write_page(&mut handle, &lock_table, s1, test_page(1), test_data());
assert_eq!(result.unwrap_err(), MvccError::InvalidState);
}
{
let handle = registry.get(s1).expect("handle");
let result = concurrent_savepoint(&handle, "sp1");
assert_eq!(result.unwrap_err(), MvccError::InvalidState);
}
}
#[test]
fn test_ssi_read_tracking() {
let mut registry = ConcurrentRegistry::new();
let s1 = registry
.begin_concurrent(test_snapshot(10))
.expect("session");
let mut handle = registry.get_mut(s1).expect("handle");
assert_eq!(handle.read_set_len(), 0);
handle.record_read(test_page(5));
handle.record_read(test_page(10));
handle.record_read(test_page(5));
assert_eq!(handle.read_set_len(), 2);
assert!(handle.read_set().contains(&test_page(5)));
assert!(handle.read_set().contains(&test_page(10)));
}
#[test]
fn test_ssi_no_conflict_disjoint() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(5));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(10), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(20));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(30), test_data()).unwrap();
}
let seq1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("T1 commits");
assert_eq!(seq1, CommitSeq::new(11));
let seq2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(12),
)
.expect("T2 commits");
assert_eq!(seq2, CommitSeq::new(12));
}
#[test]
fn test_ssi_committed_history_pruned_on_completion() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(5));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(10), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(10));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(20), test_data()).unwrap();
}
concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
)
.expect("first txn commits while second is still active");
assert_eq!(
registry.committed_readers.len(),
1,
"reader history retained while overlapping txn is active"
);
assert_eq!(
registry.committed_writers.len(),
1,
"writer history retained while overlapping txn is active"
);
concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(11),
)
.expect("second txn commits");
assert!(
registry.committed_readers.is_empty(),
"reader history pruned once no active transactions remain"
);
assert!(
registry.committed_writers.is_empty(),
"writer history pruned once no active transactions remain"
);
}
#[test]
fn test_ssi_pivot_abort() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(5)); concurrent_write_page(&mut h1, &lock_table, s1, test_page(10), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(10)); concurrent_write_page(&mut h2, &lock_table, s2, test_page(5), test_data()).unwrap();
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
);
assert!(
result1.is_err(),
"T1 should abort as pivot (both in and out edges)"
);
let (err, _) = result1.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
let result2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(11),
);
assert!(result2.is_ok(), "T2 should commit after T1 aborted");
}
#[test]
fn test_ssi_marked_for_abort() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
h1.marked_for_abort.set(true);
let result = concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11));
assert!(result.is_err());
let (err, _) = result.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
}
#[test]
fn test_ssi_only_incoming_edge_commits() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
h1.has_in_rw.set(true);
h1.has_out_rw.set(false);
let result = concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11));
assert!(result.is_ok(), "only incoming edge should allow commit");
}
#[test]
fn test_ssi_only_outgoing_edge_commits() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
h1.has_in_rw.set(false);
h1.has_out_rw.set(true);
let result = concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11));
assert!(result.is_ok(), "only outgoing edge should allow commit");
}
#[test]
fn test_ssi_both_edges_aborts() {
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(5), test_data()).unwrap();
h1.has_in_rw.set(true);
h1.has_out_rw.set(true);
let result = concurrent_commit(&mut h1, &commit_index, &lock_table, s1, CommitSeq::new(11));
assert!(result.is_err());
let (err, _) = result.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
}
#[test]
fn test_ssi_witness_keys() {
let mut registry = ConcurrentRegistry::new();
let lock_table = InProcessPageLockTable::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(5));
h1.record_read(test_page(10));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(15), test_data()).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(20), test_data()).unwrap();
let read_keys = h1.read_witness_keys();
let write_keys = h1.write_witness_keys();
assert_eq!(read_keys.len(), 2);
assert_eq!(write_keys.len(), 2);
}
#[test]
fn test_ssi_three_txn_pivot_abort_real_components() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s3 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(10));
h1.record_read(test_page(20));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(30), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(50));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(10), test_data()).unwrap();
}
{
let mut h3 = registry.get_mut(s3).unwrap();
h3.record_read(test_page(30));
concurrent_write_page(&mut h3, &lock_table, s3, test_page(40), test_data()).unwrap();
}
let result3 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s3,
CommitSeq::new(11),
);
assert!(result3.is_ok(), "T3 commits (only outgoing edge)");
{
let h1 = registry.get(s1).unwrap();
assert!(
h1.has_in_rw(),
"T1 must have has_in_rw: T1 writes 30, T3 reads 30"
);
assert!(!h1.has_out_rw(), "T1 must NOT have has_out_rw yet");
assert!(
!h1.is_marked_for_abort(),
"T1 must NOT be marked_for_abort yet"
);
}
let result2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(12),
);
assert!(
result2.is_ok(),
"T2 commits (only incoming edge, not pivot)"
);
{
let h1 = registry.get(s1).unwrap();
assert!(h1.has_in_rw(), "T1 still has has_in_rw (from T3's commit)");
assert!(
h1.has_out_rw(),
"T1 now has has_out_rw (T2's incoming edge scan set it)"
);
assert!(
!h1.is_marked_for_abort(),
"low-contention DRO should defer the active-pivot abort mark"
);
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(13),
);
assert!(
result1.is_err(),
"T1 must still abort when its own commit observes the full pivot"
);
let (err, _) = result1.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
}
#[test]
fn test_ssi_low_contention_dro_defers_marked_for_abort() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s3 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(10));
h1.record_read(test_page(20));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(30), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(50));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(10), test_data()).unwrap();
}
{
let mut h3 = registry.get_mut(s3).unwrap();
h3.record_read(test_page(30));
concurrent_write_page(&mut h3, &lock_table, s3, test_page(40), test_data()).unwrap();
}
let result3 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s3,
CommitSeq::new(11),
);
assert!(result3.is_ok(), "T3 commits (only outgoing edge)");
{
let h1 = registry.get(s1).unwrap();
assert!(
h1.has_in_rw(),
"T1 must have has_in_rw: T1 writes 30, T3 reads 30"
);
assert!(!h1.has_out_rw(), "T1 must NOT have has_out_rw yet");
assert!(
!h1.is_marked_for_abort(),
"T1 must NOT be marked_for_abort yet"
);
}
let result2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(12),
);
assert!(
result2.is_ok(),
"T2 commits (only incoming edge, not pivot)"
);
{
let h1 = registry.get(s1).unwrap();
assert!(h1.has_in_rw(), "T1 still has has_in_rw (from T3's commit)");
assert!(
h1.has_out_rw(),
"T1 now has has_out_rw (T2's incoming edge scan set it)"
);
assert!(
!h1.is_marked_for_abort(),
"low-contention DRO should defer the active-pivot abort mark"
);
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(13),
);
assert!(
result1.is_err(),
"T1 must still abort when its own commit observes the full pivot"
);
let (err, _) = result1.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
}
#[test]
fn test_ssi_edge_propagation_sets_flags_automatically() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(100));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(200), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(200));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(300), test_data()).unwrap();
}
{
let h1 = registry.get(s1).unwrap();
let h2 = registry.get(s2).unwrap();
assert!(!h1.has_in_rw());
assert!(!h1.has_out_rw());
assert!(!h2.has_in_rw());
assert!(!h2.has_out_rw());
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
);
assert!(result1.is_ok(), "T1 commits (only incoming edge)");
{
let h2 = registry.get(s2).unwrap();
assert!(
h2.has_out_rw(),
"T2.has_out_rw must be set: T2 read page 200 that T1 wrote"
);
assert!(
!h2.has_in_rw(),
"T2.has_in_rw must NOT be set: no outgoing edge from T1 to T2"
);
}
let result2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(12),
);
assert!(
result2.is_ok(),
"T2 commits (only outgoing edge, not pivot)"
);
}
#[test]
fn test_fcw_real_commit_index_conflict() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
concurrent_write_page(&mut h1, &lock_table, s1, test_page(42), test_data()).unwrap();
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
);
assert!(result1.is_ok(), "T1 first-committer wins");
{
let mut h2 = registry.get_mut(s2).unwrap();
concurrent_write_page(&mut h2, &lock_table, s2, test_page(42), test_data()).unwrap();
}
let result2 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s2,
CommitSeq::new(11),
);
assert!(result2.is_err(), "T2 must fail: FCW conflict on page 42");
let (err, fcw) = result2.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
assert!(
matches!(fcw, FcwResult::Conflict { .. }),
"FCW must report conflict"
);
}
#[test]
fn test_fcw_deterministic_tiebreak_lower_txn_id_wins() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let token1 = registry.get(s1).unwrap().txn_token();
let token2 = registry.get(s2).unwrap().txn_token();
let (winner_session, loser_session, winner_token) = if token1.id <= token2.id {
(s1, s2, token1)
} else {
(s2, s1, token2)
};
{
let mut winner_handle = registry.get_mut(winner_session).unwrap();
concurrent_write_page(
&mut winner_handle,
&lock_table,
winner_session,
test_page(77),
test_data(),
)
.unwrap();
}
let winner = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
winner_session,
CommitSeq::new(11),
);
assert!(
winner.is_ok(),
"lower txn_id should deterministically win tie window"
);
{
let mut loser_handle = registry.get_mut(loser_session).unwrap();
concurrent_write_page(
&mut loser_handle,
&lock_table,
loser_session,
test_page(77),
test_data(),
)
.unwrap();
}
let loser = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
loser_session,
CommitSeq::new(11),
);
assert!(
loser.is_err(),
"higher txn_id should lose deterministic tie"
);
let (err, fcw) = loser.unwrap_err();
assert_eq!(err, MvccError::BusySnapshot);
assert!(matches!(fcw, FcwResult::Conflict { .. }));
let committed = registry
.committed_writers
.iter()
.find(|entry| entry.token.id.get() == winner_session)
.map(|entry| entry.token)
.expect("winning writer should be recorded");
assert_eq!(committed, winner_token);
}
#[test]
fn test_prepare_aborts_on_committed_writer_pivot() {
use super::concurrent_commit_with_ssi;
let lock_table = InProcessPageLockTable::new();
let commit_index = CommitIndex::new();
let mut registry = ConcurrentRegistry::new();
let s1 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s2 = registry.begin_concurrent(test_snapshot(10)).unwrap();
let s3 = registry.begin_concurrent(test_snapshot(10)).unwrap();
{
let mut h1 = registry.get_mut(s1).unwrap();
h1.record_read(test_page(20));
concurrent_write_page(&mut h1, &lock_table, s1, test_page(10), test_data()).unwrap();
}
{
let mut h2 = registry.get_mut(s2).unwrap();
h2.record_read(test_page(30));
concurrent_write_page(&mut h2, &lock_table, s2, test_page(20), test_data()).unwrap();
}
let result1 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s1,
CommitSeq::new(11),
);
assert!(
result1.is_ok(),
"T1 should commit with only outgoing rw edge"
);
let t1_writer = registry
.committed_writers
.iter()
.find(|entry| entry.token.id.get() == s1)
.expect("T1 writer history should be present");
assert!(
t1_writer.had_out_rw,
"T1 should be recorded with had_out_rw"
);
{
let mut h3 = registry.get_mut(s3).unwrap();
h3.record_read(test_page(10));
concurrent_write_page(&mut h3, &lock_table, s3, test_page(40), test_data()).unwrap();
}
let result3 = concurrent_commit_with_ssi(
&mut registry,
&commit_index,
&lock_table,
s3,
CommitSeq::new(12),
);
assert!(
result3.is_err(),
"T3 must abort when it depends on committed writer pivot T1"
);
let (err3, _) = result3.unwrap_err();
assert_eq!(err3, MvccError::BusySnapshot);
}
}