use std::collections::HashSet;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU8, AtomicU64, AtomicUsize, Ordering};
use fsqlite_types::{CommitSeq, ObjectId, PageNumber, TxnToken, WitnessKey};
use tracing::{debug, info, warn};
use crate::observability;
use crate::ssi_abort_policy::{
DroHotPathDecision, DroLossMatrix, DroRiskTolerance, DroVolatilityTracker,
DroVolatilityTrackerConfig, SsiDecisionCard, SsiDecisionCardDraft, SsiDecisionQuery,
SsiDecisionType, SsiEvidenceLedger,
};
use crate::witness_objects::{
AbortPolicy, AbortReason, AbortWitness, DependencyEdgeKind, EcsCommitProof, EcsDependencyEdge,
EdgeKeyBasis, KeySummary,
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SsiBusySnapshot {
pub txn: TxnToken,
pub reason: SsiAbortReason,
pub witness: AbortWitness,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SsiAbortReason {
Pivot,
CommittedPivot,
MarkedForAbort,
}
static FSQLITE_EVIDENCE_RECORDS_TOTAL_COMMIT: AtomicU64 = AtomicU64::new(0);
static FSQLITE_EVIDENCE_RECORDS_TOTAL_ABORT: AtomicU64 = AtomicU64::new(0);
static FSQLITE_EVIDENCE_RECORDS_TOTAL_BUDGET_COMPACT: AtomicU64 = AtomicU64::new(0);
static FSQLITE_SSI_EVIDENCE_MODE: AtomicU8 =
AtomicU8::new(SsiEvidenceRecordingMode::CompactCommit as u8);
static FSQLITE_SSI_EVIDENCE_MAX_PENDING_FULL_COMMIT_RECORDS: AtomicUsize = AtomicUsize::new(32);
static FSQLITE_SSI_EVIDENCE_MAX_FULL_COMMIT_BYTES: AtomicU64 = AtomicU64::new(4 * 1024);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SsiEvidenceRecordingMode {
Full = 0,
CompactCommit = 1,
BudgetedCommit = 2,
}
impl SsiEvidenceRecordingMode {
#[must_use]
const fn from_raw(raw: u8) -> Self {
match raw {
1 => Self::CompactCommit,
2 => Self::BudgetedCommit,
_ => Self::Full,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SsiEvidenceBudgetConfig {
pub max_pending_records_before_compact: usize,
pub max_commit_evidence_bytes: u64,
}
impl Default for SsiEvidenceBudgetConfig {
fn default() -> Self {
Self {
max_pending_records_before_compact: 32,
max_commit_evidence_bytes: 4 * 1024,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct EvidenceRecordMetricsSnapshot {
pub fsqlite_evidence_records_total_commit: u64,
pub fsqlite_evidence_records_total_abort: u64,
pub fsqlite_evidence_records_total_budget_compact: u64,
pub fsqlite_evidence_pending_records: usize,
}
impl EvidenceRecordMetricsSnapshot {
#[must_use]
pub fn fsqlite_evidence_records_total(self) -> u64 {
self.fsqlite_evidence_records_total_commit + self.fsqlite_evidence_records_total_abort
}
}
fn ssi_evidence_ledger() -> &'static SsiEvidenceLedger {
static LEDGER: OnceLock<SsiEvidenceLedger> = OnceLock::new();
LEDGER.get_or_init(SsiEvidenceLedger::default)
}
#[must_use]
pub fn ssi_evidence_snapshot() -> Vec<SsiDecisionCard> {
ssi_evidence_ledger().snapshot()
}
#[must_use]
pub fn ssi_evidence_query(query: &SsiDecisionQuery) -> Vec<SsiDecisionCard> {
ssi_evidence_ledger().query(query)
}
#[must_use]
pub fn ssi_evidence_metrics_snapshot() -> EvidenceRecordMetricsSnapshot {
EvidenceRecordMetricsSnapshot {
fsqlite_evidence_records_total_commit: FSQLITE_EVIDENCE_RECORDS_TOTAL_COMMIT
.load(Ordering::Relaxed),
fsqlite_evidence_records_total_abort: FSQLITE_EVIDENCE_RECORDS_TOTAL_ABORT
.load(Ordering::Relaxed),
fsqlite_evidence_records_total_budget_compact:
FSQLITE_EVIDENCE_RECORDS_TOTAL_BUDGET_COMPACT.load(Ordering::Relaxed),
fsqlite_evidence_pending_records: ssi_evidence_ledger().pending_count(),
}
}
pub fn reset_ssi_evidence_metrics() {
FSQLITE_EVIDENCE_RECORDS_TOTAL_COMMIT.store(0, Ordering::Relaxed);
FSQLITE_EVIDENCE_RECORDS_TOTAL_ABORT.store(0, Ordering::Relaxed);
FSQLITE_EVIDENCE_RECORDS_TOTAL_BUDGET_COMPACT.store(0, Ordering::Relaxed);
}
#[must_use]
pub fn ssi_evidence_recording_mode() -> SsiEvidenceRecordingMode {
SsiEvidenceRecordingMode::from_raw(FSQLITE_SSI_EVIDENCE_MODE.load(Ordering::Relaxed))
}
pub fn set_ssi_evidence_recording_mode(mode: SsiEvidenceRecordingMode) -> SsiEvidenceRecordingMode {
SsiEvidenceRecordingMode::from_raw(
FSQLITE_SSI_EVIDENCE_MODE.swap(mode as u8, Ordering::Relaxed),
)
}
#[must_use]
pub fn ssi_evidence_budget_config() -> SsiEvidenceBudgetConfig {
SsiEvidenceBudgetConfig {
max_pending_records_before_compact: FSQLITE_SSI_EVIDENCE_MAX_PENDING_FULL_COMMIT_RECORDS
.load(Ordering::Relaxed),
max_commit_evidence_bytes: FSQLITE_SSI_EVIDENCE_MAX_FULL_COMMIT_BYTES
.load(Ordering::Relaxed),
}
}
pub fn set_ssi_evidence_budget_config(config: SsiEvidenceBudgetConfig) -> SsiEvidenceBudgetConfig {
SsiEvidenceBudgetConfig {
max_pending_records_before_compact: FSQLITE_SSI_EVIDENCE_MAX_PENDING_FULL_COMMIT_RECORDS
.swap(config.max_pending_records_before_compact, Ordering::Relaxed),
max_commit_evidence_bytes: FSQLITE_SSI_EVIDENCE_MAX_FULL_COMMIT_BYTES
.swap(config.max_commit_evidence_bytes, Ordering::Relaxed),
}
}
fn default_t3_dro_matrix() -> &'static DroLossMatrix {
static MATRIX: OnceLock<DroLossMatrix> = OnceLock::new();
MATRIX.get_or_init(|| {
let mut tracker = DroVolatilityTracker::new(DroVolatilityTrackerConfig {
window_size: 4,
min_samples: 4,
});
for (abort_rate, edge_rate) in [(0.03, 0.04), (0.05, 0.06), (0.08, 0.09), (0.13, 0.15)] {
tracker
.observe_window(abort_rate, edge_rate)
.expect("default DRO tracker windows must be valid");
}
let certificate = tracker
.radius_certificate(DroRiskTolerance::Low)
.expect("default DRO certificate must be constructible");
DroLossMatrix::from_radius_certificate(32, 32, 0.45, certificate)
})
}
pub(crate) fn evaluate_t3_dro(
txn: TxnToken,
active_readers: usize,
active_writers: usize,
) -> DroHotPathDecision {
let decision = default_t3_dro_matrix().evaluate(active_readers, active_writers);
info!(
target: "fsqlite::ssi::dro",
event = "t3_decision",
txn_id = txn.id.get(),
active_readers = decision.active_readers,
active_writers = decision.active_writers,
cvar_penalty = decision.cvar_penalty,
threshold = decision.threshold,
radius = decision.radius,
tolerance = %decision.tolerance,
decision = if decision.should_abort() { "abort" } else { "allow" },
"dro t3 decision evaluated"
);
decision
}
#[derive(Debug, Clone)]
pub struct SsiState {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub has_in_rw: bool,
pub has_out_rw: bool,
pub rw_in_from: HashSet<TxnToken>,
pub rw_out_to: HashSet<TxnToken>,
pub edges_emitted: Vec<ObjectId>,
pub marked_for_abort: bool,
}
impl SsiState {
#[must_use]
pub fn new(txn: TxnToken, begin_seq: CommitSeq) -> Self {
Self {
txn,
begin_seq,
has_in_rw: false,
has_out_rw: false,
rw_in_from: HashSet::new(),
rw_out_to: HashSet::new(),
edges_emitted: Vec::new(),
marked_for_abort: false,
}
}
#[must_use]
pub fn has_dangerous_structure(&self) -> bool {
self.has_in_rw && self.has_out_rw
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DiscoveredEdge {
pub from: TxnToken,
pub to: TxnToken,
pub overlap_key: WitnessKey,
pub source_is_active: bool,
pub source_has_in_rw: bool,
}
pub trait ActiveTxnView {
fn token(&self) -> TxnToken;
fn begin_seq(&self) -> CommitSeq;
fn is_active(&self) -> bool;
fn read_keys(&self) -> &[WitnessKey];
fn write_keys(&self) -> &[WitnessKey];
fn check_read_overlap(&self, write_key: &WitnessKey) -> bool;
fn check_write_overlap(&self, read_key: &WitnessKey) -> bool;
fn has_in_rw(&self) -> bool;
fn has_out_rw(&self) -> bool;
fn set_has_out_rw(&self, val: bool);
fn set_has_in_rw(&self, val: bool);
fn set_marked_for_abort(&self, val: bool);
}
#[derive(Debug, Clone)]
pub struct CommittedReaderInfo {
pub token: TxnToken,
pub begin_seq: CommitSeq,
pub commit_seq: CommitSeq,
pub had_in_rw: bool,
pub keys: Vec<WitnessKey>,
}
#[derive(Debug, Clone)]
pub struct CommittedWriterInfo {
pub token: TxnToken,
pub commit_seq: CommitSeq,
pub had_out_rw: bool,
pub keys: Vec<WitnessKey>,
}
pub fn discover_incoming_edges(
committing_txn: TxnToken,
committing_begin_seq: CommitSeq,
committing_commit_seq: CommitSeq,
write_keys: &[WitnessKey],
active_readers: &[&dyn ActiveTxnView],
committed_readers: &[CommittedReaderInfo],
) -> Vec<DiscoveredEdge> {
let mut edges = Vec::new();
if write_keys.is_empty() {
return edges;
}
let committing_begin = committing_begin_seq.get();
let committing_end = committing_commit_seq.get();
let mut seen_sources = HashSet::new();
for candidate in active_readers {
if candidate.token() == committing_txn || !candidate.is_active() {
continue;
}
let candidate_begin = candidate.begin_seq().get();
let candidate_end = u64::MAX;
let overlaps = committing_begin < candidate_end && candidate_begin < committing_end;
if !overlaps || seen_sources.contains(&candidate.token()) {
continue;
}
for write_key in write_keys {
if candidate.check_read_overlap(write_key) {
if seen_sources.insert(candidate.token()) {
debug!(
bead_id = "bd-31bo",
from = ?candidate.token(),
to = ?committing_txn,
key = ?write_key,
source = "hot_plane_index",
"discovered incoming rw-antidependency edge"
);
edges.push(DiscoveredEdge {
from: candidate.token(),
to: committing_txn,
overlap_key: write_key.clone(),
source_is_active: true,
source_has_in_rw: candidate.has_in_rw(),
});
}
break;
}
}
}
for reader in committed_readers {
if reader.token == committing_txn {
continue;
}
let candidate_begin = reader.begin_seq.get();
let candidate_end = reader.commit_seq.get();
let overlaps = committing_begin < candidate_end && candidate_begin < committing_end;
if !overlaps || seen_sources.contains(&reader.token) {
continue;
}
for write_key in write_keys {
if reader
.keys
.iter()
.any(|k| crate::witness_plane::witness_keys_overlap(k, write_key))
{
if seen_sources.insert(reader.token) {
debug!(
bead_id = "bd-31bo",
from = ?reader.token,
to = ?committing_txn,
key = ?write_key,
source = "rcri_index",
"discovered incoming rw-antidependency edge"
);
edges.push(DiscoveredEdge {
from: reader.token,
to: committing_txn,
overlap_key: write_key.clone(),
source_is_active: false,
source_has_in_rw: reader.had_in_rw,
});
}
break;
}
}
}
edges
}
pub fn discover_outgoing_edges(
committing_txn: TxnToken,
committing_begin_seq: CommitSeq,
committing_commit_seq: CommitSeq,
read_keys: &[WitnessKey],
active_writers: &[&dyn ActiveTxnView],
committed_writers: &[CommittedWriterInfo],
) -> Vec<DiscoveredEdge> {
let mut edges = Vec::new();
if read_keys.is_empty() {
return edges;
}
let committing_begin = committing_begin_seq.get();
let committing_end = committing_commit_seq.get();
let mut seen_targets = HashSet::new();
for candidate in active_writers {
if candidate.token() == committing_txn || !candidate.is_active() {
continue;
}
let candidate_begin = candidate.begin_seq().get();
let candidate_end = u64::MAX;
let overlaps = committing_begin < candidate_end && candidate_begin < committing_end;
if !overlaps || seen_targets.contains(&candidate.token()) {
continue;
}
for read_key in read_keys {
if candidate.check_write_overlap(read_key) {
if seen_targets.insert(candidate.token()) {
debug!(
bead_id = "bd-31bo",
from = ?committing_txn,
to = ?candidate.token(),
key = ?read_key,
source = "hot_plane_index",
"discovered outgoing rw-antidependency edge"
);
edges.push(DiscoveredEdge {
from: committing_txn,
to: candidate.token(),
overlap_key: read_key.clone(),
source_is_active: true,
source_has_in_rw: candidate.has_out_rw(),
});
}
break;
}
}
}
for writer in committed_writers {
if writer.token == committing_txn {
continue;
}
let candidate_begin = 0; let candidate_end = writer.commit_seq.get();
let overlaps = committing_begin < candidate_end && candidate_begin < committing_end;
if !overlaps || seen_targets.contains(&writer.token) {
continue;
}
for read_key in read_keys {
if writer
.keys
.iter()
.any(|k| crate::witness_plane::witness_keys_overlap(k, read_key))
{
if seen_targets.insert(writer.token) {
debug!(
bead_id = "bd-31bo",
from = ?committing_txn,
to = ?writer.token,
key = ?read_key,
source = "commit_log_index",
"discovered outgoing rw-antidependency edge"
);
edges.push(DiscoveredEdge {
from: committing_txn,
to: writer.token,
overlap_key: read_key.clone(),
source_is_active: false,
source_has_in_rw: writer.had_out_rw,
});
}
break;
}
}
}
edges
}
pub(crate) fn witness_key_page(key: &WitnessKey) -> Option<PageNumber> {
match key {
WitnessKey::Page(p) => Some(*p),
WitnessKey::Cell { btree_root, .. } | WitnessKey::KeyRange { btree_root, .. } => {
Some(*btree_root)
}
WitnessKey::ByteRange { page, .. } => Some(*page),
WitnessKey::Custom { .. } => None,
}
}
#[derive(Debug, Clone)]
pub struct SsiValidationOk {
pub edges: Vec<EcsDependencyEdge>,
pub edge_ids: Vec<ObjectId>,
pub commit_proof: EcsCommitProof,
pub ssi_state: SsiState,
}
#[allow(clippy::too_many_lines, clippy::too_many_arguments)]
pub fn ssi_validate_and_publish(
txn: TxnToken,
begin_seq: CommitSeq,
commit_seq: CommitSeq,
read_keys: &[WitnessKey],
write_keys: &[WitnessKey],
active_readers: &[&dyn ActiveTxnView],
active_writers: &[&dyn ActiveTxnView],
committed_readers: &[CommittedReaderInfo],
committed_writers: &[CommittedWriterInfo],
marked_for_abort: bool,
) -> Result<SsiValidationOk, SsiBusySnapshot> {
let mut state = SsiState::new(txn, begin_seq);
state.marked_for_abort = marked_for_abort;
let span = tracing::span!(
tracing::Level::INFO,
"ssi_validate",
txn_id = txn.id.get(),
read_set_size = read_keys.len(),
write_set_size = write_keys.len(),
conflict_detected = tracing::field::Empty,
decision_reason = tracing::field::Empty,
);
let _guard = span.enter();
info!(
bead_id = "bd-31bo",
txn = ?txn,
read_keys = read_keys.len(),
write_keys = write_keys.len(),
marked_for_abort,
"ssi_validate_and_publish: starting"
);
if write_keys.is_empty() {
record_evidence_decision(
SsiDecisionType::CommitAllowed,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&[],
"read_only_fast_path",
);
span.record("conflict_detected", false);
span.record("decision_reason", "read_only_fast_path");
debug!(
bead_id = "bd-31bo",
txn = ?txn,
"ssi_validate: read-only fast path, skipping SSI"
);
let proof = build_commit_proof(txn, begin_seq, commit_seq, &state, &[], &[]);
observability::record_ssi_commit();
return Ok(SsiValidationOk {
edges: Vec::new(),
edge_ids: Vec::new(),
commit_proof: proof,
ssi_state: state,
});
}
if marked_for_abort {
record_evidence_decision(
SsiDecisionType::AbortCycle,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&[],
"marked_for_abort",
);
span.record("conflict_detected", true);
span.record("decision_reason", "marked_for_abort");
warn!(
bead_id = "bd-31bo",
txn = ?txn,
"ssi_validate: transaction marked for abort by another committer"
);
observability::record_ssi_abort(fsqlite_observability::SsiAbortCategory::MarkedForAbort);
let witness = AbortWitness {
txn,
begin_seq,
abort_seq: commit_seq,
reason: AbortReason::SsiPivot,
edges_observed: Vec::new(),
};
return Err(SsiBusySnapshot {
txn,
reason: SsiAbortReason::MarkedForAbort,
witness,
});
}
let in_edges = discover_incoming_edges(
txn,
begin_seq,
commit_seq,
write_keys,
active_readers,
committed_readers,
);
let out_edges = discover_outgoing_edges(
txn,
begin_seq,
commit_seq,
read_keys,
active_writers,
committed_writers,
);
state.has_in_rw |= !in_edges.is_empty();
state.has_out_rw |= !out_edges.is_empty();
for edge in &in_edges {
state.rw_in_from.insert(edge.from);
}
for edge in &out_edges {
state.rw_out_to.insert(edge.to);
}
info!(
bead_id = "bd-31bo",
txn = ?txn,
incoming = in_edges.len(),
outgoing = out_edges.len(),
has_in_rw = state.has_in_rw,
has_out_rw = state.has_out_rw,
"ssi_validate: edge discovery complete"
);
let mut in_edges = in_edges;
in_edges.sort_by(|a, b| {
a.from
.id
.get()
.cmp(&b.from.id.get())
.then_with(|| a.from.epoch.get().cmp(&b.from.epoch.get()))
.then_with(|| a.overlap_key.cmp(&b.overlap_key))
});
let mut out_edges = out_edges;
out_edges.sort_by(|a, b| {
a.to.id
.get()
.cmp(&b.to.id.get())
.then_with(|| a.to.epoch.get().cmp(&b.to.epoch.get()))
.then_with(|| a.overlap_key.cmp(&b.overlap_key))
});
if !out_edges.is_empty() {
debug!(
bead_id = "bd-31bo",
outgoing_edges = out_edges.len(),
"ssi_validate: outgoing edge propagation complete"
);
}
if state.has_in_rw && state.has_out_rw {
let all_edges = build_dependency_edges(&in_edges, &out_edges, txn, commit_seq);
let discovered_edges: Vec<DiscoveredEdge> = in_edges
.iter()
.cloned()
.chain(out_edges.iter().cloned())
.collect();
record_evidence_decision(
SsiDecisionType::AbortWriteSkew,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&discovered_edges,
"pivot_abort_dangerous_structure",
);
span.record("conflict_detected", true);
span.record("decision_reason", "pivot_abort");
warn!(
bead_id = "bd-31bo",
txn = ?txn,
in_sources = ?state.rw_in_from,
out_targets = ?state.rw_out_to,
"ssi_validate: PIVOT ABORT — dangerous structure detected"
);
observability::record_ssi_abort(fsqlite_observability::SsiAbortCategory::Pivot);
let witness = AbortWitness {
txn,
begin_seq,
abort_seq: commit_seq,
reason: AbortReason::SsiPivot,
edges_observed: all_edges,
};
return Err(SsiBusySnapshot {
txn,
reason: SsiAbortReason::Pivot,
witness,
});
}
let dro_t3_decision = if in_edges.is_empty() && out_edges.is_empty() {
None
} else {
Some(evaluate_t3_dro(
txn,
active_readers.len().saturating_add(committed_readers.len()),
active_writers.len().saturating_add(committed_writers.len()),
))
};
let should_abort_active_pivot = dro_t3_decision.is_none_or(DroHotPathDecision::should_abort);
for edge in &in_edges {
if edge.source_is_active {
for reader in active_readers {
if reader.token() == edge.from {
reader.set_has_out_rw(true);
if reader.has_in_rw() {
if should_abort_active_pivot {
debug!(
bead_id = "bd-31bo",
pivot = ?edge.from,
dro_penalty = dro_t3_decision
.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision
.map_or(0.0, |decision| decision.threshold),
"T3 rule: active reader is pivot, marking for abort"
);
reader.set_marked_for_abort(true);
} else {
debug!(
bead_id = "bd-31bo",
pivot = ?edge.from,
dro_penalty = dro_t3_decision
.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision
.map_or(0.0, |decision| decision.threshold),
"T3 rule: active reader is pivot, DRO allows it to continue"
);
}
}
break;
}
}
} else {
if edge.source_has_in_rw {
let dro_rationale = dro_t3_decision.map_or_else(
|| "committed_pivot_abort".to_owned(),
|decision| {
format!(
"committed_pivot_abort dro_penalty={:.6} dro_threshold={:.6} active_readers={} active_writers={}",
decision.cvar_penalty,
decision.threshold,
decision.active_readers,
decision.active_writers
)
},
);
let discovered_edges: Vec<DiscoveredEdge> = in_edges
.iter()
.cloned()
.chain(out_edges.iter().cloned())
.collect();
record_evidence_decision(
SsiDecisionType::AbortCycle,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&discovered_edges,
dro_rationale.as_str(),
);
span.record("conflict_detected", true);
span.record("decision_reason", "committed_pivot_abort");
warn!(
bead_id = "bd-31bo",
txn = ?txn,
committed_pivot = ?edge.from,
dro_penalty = dro_t3_decision
.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision
.map_or(0.0, |decision| decision.threshold),
"T3 rule: committed reader was pivot, T must abort"
);
observability::record_ssi_abort(
fsqlite_observability::SsiAbortCategory::CommittedPivot,
);
let all_edges = build_dependency_edges(&in_edges, &out_edges, txn, commit_seq);
let witness = AbortWitness {
txn,
begin_seq,
abort_seq: commit_seq,
reason: AbortReason::SsiPivot,
edges_observed: all_edges,
};
return Err(SsiBusySnapshot {
txn,
reason: SsiAbortReason::CommittedPivot,
witness,
});
}
}
}
for edge in &out_edges {
if edge.source_is_active {
for writer in active_writers {
if writer.token() == edge.to {
writer.set_has_in_rw(true);
if writer.has_out_rw() {
if should_abort_active_pivot {
debug!(
bead_id = "bd-31bo",
pivot = ?edge.to,
dro_penalty = dro_t3_decision
.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision
.map_or(0.0, |decision| decision.threshold),
"T3 rule: active writer is pivot, marking for abort"
);
writer.set_marked_for_abort(true);
} else {
debug!(
bead_id = "bd-31bo",
pivot = ?edge.to,
dro_penalty = dro_t3_decision
.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision
.map_or(0.0, |decision| decision.threshold),
"T3 rule: active writer is pivot, DRO allows it to continue"
);
}
}
break;
}
}
} else if edge.source_has_in_rw {
let dro_rationale = dro_t3_decision.map_or_else(
|| "committed_writer_pivot_abort".to_owned(),
|decision| {
format!(
"committed_writer_pivot_abort dro_penalty={:.6} dro_threshold={:.6} active_readers={} active_writers={}",
decision.cvar_penalty,
decision.threshold,
decision.active_readers,
decision.active_writers
)
},
);
let discovered_edges: Vec<DiscoveredEdge> = in_edges
.iter()
.cloned()
.chain(out_edges.iter().cloned())
.collect();
record_evidence_decision(
SsiDecisionType::AbortCycle,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&discovered_edges,
dro_rationale.as_str(),
);
span.record("conflict_detected", true);
span.record("decision_reason", "committed_writer_pivot_abort");
warn!(
bead_id = "bd-31bo",
txn = ?txn,
committed_pivot = ?edge.to,
dro_penalty = dro_t3_decision.map_or(0.0, |decision| decision.cvar_penalty),
dro_threshold = dro_t3_decision.map_or(0.0, |decision| decision.threshold),
"T3 rule: committed writer was pivot, T must abort"
);
observability::record_ssi_abort(
fsqlite_observability::SsiAbortCategory::CommittedPivot,
);
let all_edges = build_dependency_edges(&in_edges, &out_edges, txn, commit_seq);
let witness = AbortWitness {
txn,
begin_seq,
abort_seq: commit_seq,
reason: AbortReason::SsiPivot,
edges_observed: all_edges,
};
return Err(SsiBusySnapshot {
txn,
reason: SsiAbortReason::CommittedPivot,
witness,
});
}
}
let all_edges = build_dependency_edges(&in_edges, &out_edges, txn, commit_seq);
let discovered_edges: Vec<DiscoveredEdge> = in_edges
.iter()
.cloned()
.chain(out_edges.iter().cloned())
.collect();
record_evidence_decision(
SsiDecisionType::CommitAllowed,
txn,
begin_seq,
Some(commit_seq),
read_keys,
write_keys,
&discovered_edges,
"commit_approved",
);
let edge_ids: Vec<ObjectId> = all_edges
.iter()
.enumerate()
.map(|(i, _)| {
let mut bytes = [0u8; 16];
bytes[..8].copy_from_slice(&txn.id.get().to_le_bytes());
bytes[8..12].copy_from_slice(&commit_seq.get().to_le_bytes()[..4]);
#[allow(clippy::cast_possible_truncation)]
let idx = i as u32;
bytes[12..16].copy_from_slice(&idx.to_le_bytes());
ObjectId::from_bytes(bytes)
})
.collect();
state.edges_emitted.clone_from(&edge_ids);
let proof = build_commit_proof(txn, begin_seq, commit_seq, &state, &edge_ids, &[]);
span.record("conflict_detected", false);
span.record("decision_reason", "commit_approved");
info!(
bead_id = "bd-31bo",
txn = ?txn,
edges_emitted = all_edges.len(),
"ssi_validate: commit approved, evidence published"
);
observability::record_ssi_commit();
Ok(SsiValidationOk {
edges: all_edges,
edge_ids,
commit_proof: proof,
ssi_state: state,
})
}
fn build_dependency_edges(
in_edges: &[DiscoveredEdge],
out_edges: &[DiscoveredEdge],
observer: TxnToken,
observation_seq: CommitSeq,
) -> Vec<EcsDependencyEdge> {
let mut result = Vec::with_capacity(in_edges.len() + out_edges.len());
for edge in in_edges.iter().chain(out_edges.iter()) {
result.push(EcsDependencyEdge {
kind: DependencyEdgeKind::RwAntiDependency,
from: edge.from,
to: edge.to,
key_basis: EdgeKeyBasis {
level: 0,
range_prefix: witness_key_page(&edge.overlap_key)
.map(|p| p.get())
.unwrap_or(0),
refinement: Some(KeySummary::ExactKeys(vec![edge.overlap_key.clone()])),
},
observed_by: observer,
observation_seq,
});
}
result
}
fn build_commit_proof(
txn: TxnToken,
begin_seq: CommitSeq,
commit_seq: CommitSeq,
state: &SsiState,
edge_ids: &[ObjectId],
merge_witnesses: &[ObjectId],
) -> EcsCommitProof {
EcsCommitProof {
txn,
begin_seq,
commit_seq,
has_in_rw: state.has_in_rw,
has_out_rw: state.has_out_rw,
read_witness_refs: Vec::new(),
write_witness_refs: Vec::new(),
index_segments_used: Vec::new(),
edges_emitted: edge_ids.to_vec(),
merge_witnesses: merge_witnesses.to_vec(),
abort_policy: AbortPolicy::AbortPivot,
}
}
fn decision_outcome(decision_type: SsiDecisionType) -> &'static str {
match decision_type {
SsiDecisionType::CommitAllowed => "commit",
SsiDecisionType::AbortWriteSkew
| SsiDecisionType::AbortPhantom
| SsiDecisionType::AbortCycle => "abort",
}
}
fn witness_keys_to_pages(keys: &[WitnessKey]) -> Vec<PageNumber> {
let mut pages: Vec<PageNumber> = keys.iter().filter_map(witness_key_page).collect();
pages.sort_by_key(|page| page.get());
pages.dedup();
pages
}
fn edge_conflicting_txns(txn: TxnToken, edges: &[DiscoveredEdge]) -> Vec<TxnToken> {
let mut txns = Vec::new();
for edge in edges {
if edge.from != txn {
txns.push(edge.from);
}
if edge.to != txn {
txns.push(edge.to);
}
}
txns.sort_by(|left, right| {
left.id
.get()
.cmp(&right.id.get())
.then_with(|| left.epoch.get().cmp(&right.epoch.get()))
});
txns.dedup();
txns
}
fn edge_conflict_pages(edges: &[DiscoveredEdge]) -> Vec<PageNumber> {
let mut pages: Vec<PageNumber> = edges
.iter()
.filter_map(|edge| witness_key_page(&edge.overlap_key))
.collect();
pages.sort_by_key(|page| page.get());
pages.dedup();
pages
}
fn estimate_evidence_size_bytes(
read_pages: &[PageNumber],
write_pages: &[PageNumber],
conflict_pages: &[PageNumber],
conflicting_txns: &[TxnToken],
rationale: &str,
) -> u64 {
#[allow(clippy::cast_possible_truncation)]
{
let words = read_pages.len()
+ write_pages.len()
+ conflict_pages.len()
+ conflicting_txns.len() * 2;
((words * std::mem::size_of::<u64>()) + rationale.len()) as u64
}
}
fn estimate_commit_evidence_budget_bytes(
read_keys: &[WitnessKey],
write_keys: &[WitnessKey],
edges: &[DiscoveredEdge],
rationale: &str,
) -> u64 {
#[allow(clippy::cast_possible_truncation)]
{
let words = read_keys.len() + write_keys.len() + (edges.len() * 2);
((words * std::mem::size_of::<u64>()) + rationale.len()) as u64
}
}
fn commit_evidence_detail_level(
read_keys: &[WitnessKey],
write_keys: &[WitnessKey],
edges: &[DiscoveredEdge],
rationale: &str,
) -> (bool, &'static str) {
match ssi_evidence_recording_mode() {
SsiEvidenceRecordingMode::Full => (false, "full"),
SsiEvidenceRecordingMode::CompactCommit => (true, "compact_commit"),
SsiEvidenceRecordingMode::BudgetedCommit => {
let budget = ssi_evidence_budget_config();
let pending_records = ssi_evidence_ledger().pending_count();
let estimated_bytes =
estimate_commit_evidence_budget_bytes(read_keys, write_keys, edges, rationale);
if pending_records >= budget.max_pending_records_before_compact {
FSQLITE_EVIDENCE_RECORDS_TOTAL_BUDGET_COMPACT.fetch_add(1, Ordering::Relaxed);
(true, "budget_compact_pending")
} else if estimated_bytes > budget.max_commit_evidence_bytes {
FSQLITE_EVIDENCE_RECORDS_TOTAL_BUDGET_COMPACT.fetch_add(1, Ordering::Relaxed);
(true, "budget_compact_size")
} else {
(false, "budget_full")
}
}
}
}
#[allow(clippy::too_many_arguments)]
fn record_evidence_decision(
decision_type: SsiDecisionType,
txn: TxnToken,
begin_seq: CommitSeq,
commit_seq: Option<CommitSeq>,
read_keys: &[WitnessKey],
write_keys: &[WitnessKey],
edges: &[DiscoveredEdge],
rationale: &str,
) {
let (compact_commit_evidence, detail_level) =
if matches!(decision_type, SsiDecisionType::CommitAllowed) {
commit_evidence_detail_level(read_keys, write_keys, edges, rationale)
} else {
(false, "full")
};
let (read_pages, write_pages, conflict_pages, conflicting_txns, evidence_size_bytes) =
if compact_commit_evidence {
(
Vec::new(),
Vec::new(),
Vec::new(),
Vec::new(),
u64::try_from(rationale.len()).unwrap_or(u64::MAX),
)
} else {
let read_pages = witness_keys_to_pages(read_keys);
let write_pages = witness_keys_to_pages(write_keys);
let conflict_pages = edge_conflict_pages(edges);
let conflicting_txns = edge_conflicting_txns(txn, edges);
let evidence_size_bytes = estimate_evidence_size_bytes(
&read_pages,
&write_pages,
&conflict_pages,
&conflicting_txns,
rationale,
);
(
read_pages,
write_pages,
conflict_pages,
conflicting_txns,
evidence_size_bytes,
)
};
let outcome = decision_outcome(decision_type);
let decision_id = fsqlite_observability::next_decision_id();
let span = tracing::span!(
target: "fsqlite.evidence",
tracing::Level::INFO,
"evidence_record",
decision_id,
outcome,
evidence_size_bytes
);
let _guard = span.enter();
let r_len = if compact_commit_evidence {
read_keys.len()
} else {
read_pages.len()
};
let w_len = if compact_commit_evidence {
write_keys.len()
} else {
write_pages.len()
};
let cx_len = conflicting_txns.len();
let cp_len = conflict_pages.len();
info!(
decision_id,
outcome,
detail_level,
decision_type = %decision_type,
txn_id = txn.id.get(),
read_pages = r_len,
write_pages = w_len,
conflicting_txns = cx_len,
conflict_pages = cp_len,
"ssi decision evidence recorded"
);
if !compact_commit_evidence {
debug!(
decision_id,
decision_type = %decision_type,
txn = ?txn,
read_pages = ?read_pages,
write_pages = ?write_pages,
conflicting_txns = ?conflicting_txns,
conflict_pages = ?conflict_pages,
rationale,
"ssi decision evidence details"
);
}
let mut draft = SsiDecisionCardDraft::new(
decision_type,
txn,
begin_seq,
conflicting_txns,
conflict_pages,
read_pages,
write_pages,
rationale,
)
.with_decision_id(decision_id);
if let Some(seq) = commit_seq {
draft = draft.with_commit_seq(seq);
}
ssi_evidence_ledger().record_async(draft);
if matches!(decision_type, SsiDecisionType::CommitAllowed) {
FSQLITE_EVIDENCE_RECORDS_TOTAL_COMMIT.fetch_add(1, Ordering::Relaxed);
} else {
FSQLITE_EVIDENCE_RECORDS_TOTAL_ABORT.fetch_add(1, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use super::*;
use fsqlite_types::{TxnEpoch, TxnId};
use std::cell::Cell;
fn evidence_settings_guard() -> std::sync::MutexGuard<'static, ()> {
static GUARD: std::sync::OnceLock<std::sync::Mutex<()>> = std::sync::OnceLock::new();
GUARD
.get_or_init(|| std::sync::Mutex::new(()))
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
struct EvidenceConfigRestore {
mode: SsiEvidenceRecordingMode,
budget: SsiEvidenceBudgetConfig,
}
impl Drop for EvidenceConfigRestore {
fn drop(&mut self) {
let _ = set_ssi_evidence_budget_config(self.budget);
let _ = set_ssi_evidence_recording_mode(self.mode);
}
}
fn with_ssi_evidence_test_config<T>(
mode: SsiEvidenceRecordingMode,
budget: SsiEvidenceBudgetConfig,
f: impl FnOnce() -> T,
) -> T {
let _guard = evidence_settings_guard();
let restore = EvidenceConfigRestore {
mode: set_ssi_evidence_recording_mode(mode),
budget: set_ssi_evidence_budget_config(budget),
};
reset_ssi_evidence_metrics();
let result = f();
drop(restore);
result
}
struct MockActiveTxn {
token: TxnToken,
begin_seq: CommitSeq,
active: bool,
reads: Vec<WitnessKey>,
writes: Vec<WitnessKey>,
has_in: Cell<bool>,
has_out: Cell<bool>,
marked: Cell<bool>,
}
impl MockActiveTxn {
fn new(id: u64, epoch: u32, begin_seq: u64) -> Self {
Self {
token: TxnToken::new(TxnId::new(id).unwrap(), TxnEpoch::new(epoch)),
begin_seq: CommitSeq::new(begin_seq),
active: true,
reads: Vec::new(),
writes: Vec::new(),
has_in: Cell::new(false),
has_out: Cell::new(false),
marked: Cell::new(false),
}
}
fn with_reads(mut self, keys: Vec<WitnessKey>) -> Self {
self.reads = keys;
self
}
fn with_writes(mut self, keys: Vec<WitnessKey>) -> Self {
self.writes = keys;
self
}
fn with_has_in_rw(self, val: bool) -> Self {
self.has_in.set(val);
self
}
#[allow(dead_code)]
fn committed(mut self) -> Self {
self.active = false;
self
}
}
impl ActiveTxnView for MockActiveTxn {
fn token(&self) -> TxnToken {
self.token
}
fn begin_seq(&self) -> CommitSeq {
self.begin_seq
}
fn is_active(&self) -> bool {
self.active
}
fn read_keys(&self) -> &[WitnessKey] {
&self.reads
}
fn write_keys(&self) -> &[WitnessKey] {
&self.writes
}
fn check_read_overlap(&self, key: &WitnessKey) -> bool {
self.reads
.iter()
.any(|k| crate::witness_plane::witness_keys_overlap(k, key))
}
fn check_write_overlap(&self, key: &WitnessKey) -> bool {
self.writes
.iter()
.any(|k| crate::witness_plane::witness_keys_overlap(k, key))
}
fn has_in_rw(&self) -> bool {
self.has_in.get()
}
fn has_out_rw(&self) -> bool {
self.has_out.get()
}
fn set_has_out_rw(&self, val: bool) {
self.has_out.set(val);
}
fn set_has_in_rw(&self, val: bool) {
self.has_in.set(val);
}
fn set_marked_for_abort(&self, val: bool) {
self.marked.set(val);
}
}
fn page_key(pgno: u32) -> WitnessKey {
WitnessKey::Page(PageNumber::new(pgno).unwrap())
}
#[test]
fn test_ssi_read_only_skip() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(10)], &[], &[],
&[],
&[],
&[],
false,
);
let ok = result.expect("read-only txn should commit");
assert!(ok.edges.is_empty(), "no edges for read-only");
assert!(!ok.ssi_state.has_in_rw);
assert!(!ok.ssi_state.has_out_rw);
}
#[test]
fn test_ssi_no_edges_commit() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(10)],
&[page_key(20)], &[],
&[],
&[],
&[],
false,
);
let ok = result.expect("no overlap → no edges → commit");
assert!(ok.edges.is_empty());
assert!(!ok.ssi_state.has_in_rw);
assert!(!ok.ssi_state.has_out_rw);
}
#[test]
fn test_safe_snapshot_shortcut_no_conflict_commit() {
let txn = TxnToken::new(TxnId::new(8_001).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(10),
CommitSeq::new(11),
&[page_key(1_000)],
&[page_key(2_000)],
&[],
&[],
&[],
&[],
false,
)
.expect("safe snapshot should commit without conflict work");
assert!(
result.edges.is_empty(),
"safe snapshot should emit no edges"
);
assert!(
result.edge_ids.is_empty(),
"safe snapshot should emit no edge identifiers"
);
assert!(!result.ssi_state.has_in_rw);
assert!(!result.ssi_state.has_out_rw);
}
#[test]
fn test_ssi_only_incoming_edge_commit() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[], &[page_key(5)], &readers, &[],
&[],
&[],
false,
);
let ok = result.expect("only incoming edge → commit allowed");
assert!(ok.ssi_state.has_in_rw);
assert!(!ok.ssi_state.has_out_rw);
assert!(!ok.edges.is_empty());
}
#[test]
fn test_ssi_only_outgoing_edge_commit() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let writer = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(7)]);
let writers: Vec<&dyn ActiveTxnView> = vec![&writer];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)], &[page_key(20)], &[],
&writers,
&[],
&[],
false,
);
let ok = result.expect("only outgoing edge → commit allowed");
assert!(!ok.ssi_state.has_in_rw);
assert!(ok.ssi_state.has_out_rw);
}
#[test]
fn test_ssi_pivot_both_edges_abort() {
let txn = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(1, 0, 1).with_reads(vec![page_key(5)]);
let writer = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(7)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let writers: Vec<&dyn ActiveTxnView> = vec![&writer];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)], &[page_key(5)], &readers,
&writers,
&[],
&[],
false,
);
let err = result.expect_err("both in + out rw → MUST abort");
assert_eq!(err.reason, SsiAbortReason::Pivot);
assert_eq!(err.witness.reason, AbortReason::SsiPivot);
}
#[test]
fn test_ssi_dangerous_structure_detection() {
let t2 = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let t1 = MockActiveTxn::new(1, 0, 1).with_reads(vec![page_key(10)]);
let t3 = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(20)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&t1];
let writers: Vec<&dyn ActiveTxnView> = vec![&t3];
let result = ssi_validate_and_publish(
t2,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(20)], &[page_key(10)], &readers,
&writers,
&[],
&[],
false,
);
assert!(result.is_err(), "dangerous structure → abort");
let err = result.unwrap_err();
assert_eq!(err.reason, SsiAbortReason::Pivot);
}
#[test]
fn test_discover_incoming_from_hot_plane() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let edges = discover_incoming_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(5)],
&readers,
&[],
);
assert_eq!(edges.len(), 1);
assert!(edges[0].source_is_active);
assert_eq!(edges[0].from.id.get(), 2);
}
#[test]
fn test_discover_outgoing_from_hot_plane() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let writer = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(7)]);
let writers: Vec<&dyn ActiveTxnView> = vec![&writer];
let edges = discover_outgoing_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)],
&writers,
&[],
);
assert_eq!(edges.len(), 1);
assert!(edges[0].source_is_active);
assert_eq!(edges[0].to.id.get(), 3);
}
#[test]
fn test_discover_outgoing_from_commit_index() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let committed_w = CommittedWriterInfo {
token: TxnToken::new(TxnId::new(3).unwrap(), TxnEpoch::new(0)),
commit_seq: CommitSeq::new(3),
had_out_rw: false,
keys: vec![page_key(7).clone()],
};
let edges = discover_outgoing_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)],
&[], &[committed_w],
);
assert_eq!(edges.len(), 1);
assert!(!edges[0].source_is_active);
}
#[test]
fn test_discover_incoming_from_recently_committed() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let committed_r = CommittedReaderInfo {
token: TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0)),
begin_seq: CommitSeq::new(0),
commit_seq: CommitSeq::new(3),
had_in_rw: false,
keys: vec![page_key(5).clone()],
};
let edges = discover_incoming_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(5)],
&[],
&[committed_r],
);
assert_eq!(edges.len(), 1);
assert!(!edges[0].source_is_active);
}
#[test]
fn test_edge_gap_without_commit_index() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let committed_w = CommittedWriterInfo {
token: TxnToken::new(TxnId::new(3).unwrap(), TxnEpoch::new(0)),
commit_seq: CommitSeq::new(3),
had_out_rw: false,
keys: vec![page_key(7).clone()],
};
let committed_writers = vec![committed_w];
let edges_hot_only = discover_outgoing_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[], &[],
&committed_writers,
);
assert!(
edges_hot_only.is_empty(),
"hot-plane only misses committed writer"
);
let edges_full = discover_outgoing_edges(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)],
&[],
&committed_writers,
);
assert_eq!(edges_full.len(), 1, "commit index catches the edge");
}
#[test]
fn test_interval_overlap_excludes_stale_committed_writer() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let stale_writer = CommittedWriterInfo {
token: TxnToken::new(TxnId::new(3).unwrap(), TxnEpoch::new(0)),
commit_seq: CommitSeq::new(4),
had_out_rw: false,
keys: vec![page_key(7).clone()],
};
let edges = discover_outgoing_edges(
txn,
CommitSeq::new(5),
CommitSeq::new(8),
&[page_key(7)],
&[],
&[stale_writer],
);
assert!(
edges.is_empty(),
"writer interval (-inf,4] does not overlap [5,8]"
);
}
#[test]
fn test_bank_transfer_write_skew_prevented() {
let t1 = TxnToken::new(TxnId::new(11).unwrap(), TxnEpoch::new(0));
let t2 = TxnToken::new(TxnId::new(12).unwrap(), TxnEpoch::new(0));
let t2_active = MockActiveTxn::new(12, 0, 1).with_reads(vec![page_key(100), page_key(200)]);
let readers_for_t1: Vec<&dyn ActiveTxnView> = vec![&t2_active];
let t1_reads = [page_key(100), page_key(200)];
let t1_writes = [page_key(100)];
let t1_commit = ssi_validate_and_publish(
t1,
CommitSeq::new(1),
CommitSeq::new(2),
&t1_reads,
&t1_writes,
&readers_for_t1,
&[],
&[],
&[],
false,
)
.expect("first transfer leg should commit");
let committed_reader_t1 = CommittedReaderInfo {
token: t1,
begin_seq: CommitSeq::new(1),
commit_seq: CommitSeq::new(2),
had_in_rw: t1_commit.ssi_state.has_in_rw,
keys: vec![page_key(100).clone(), page_key(200).clone()],
};
let committed_writer_t1 = CommittedWriterInfo {
token: t1,
commit_seq: CommitSeq::new(2),
had_out_rw: t1_commit.ssi_state.has_out_rw,
keys: vec![page_key(100).clone()],
};
let t2_reads = [page_key(100), page_key(200)];
let t2_writes = [page_key(200)];
let t2_result = ssi_validate_and_publish(
t2,
CommitSeq::new(1),
CommitSeq::new(3),
&t2_reads,
&t2_writes,
&[],
&[],
&[committed_reader_t1],
&[committed_writer_t1],
false,
);
assert!(
t2_result.is_err(),
"second transfer leg must abort to prevent write skew"
);
}
#[test]
fn test_doctor_on_call_write_skew_prevented() {
let d1 = TxnToken::new(TxnId::new(21).unwrap(), TxnEpoch::new(0));
let d2 = TxnToken::new(TxnId::new(22).unwrap(), TxnEpoch::new(0));
let d2_active = MockActiveTxn::new(22, 0, 1).with_reads(vec![page_key(310), page_key(311)]);
let readers_for_d1: Vec<&dyn ActiveTxnView> = vec![&d2_active];
let d1_reads = [page_key(310), page_key(311)];
let d1_writes = [page_key(310)];
let d1_commit = ssi_validate_and_publish(
d1,
CommitSeq::new(1),
CommitSeq::new(2),
&d1_reads,
&d1_writes,
&readers_for_d1,
&[],
&[],
&[],
false,
)
.expect("first doctor update should commit");
let committed_reader_d1 = CommittedReaderInfo {
token: d1,
begin_seq: CommitSeq::new(1),
commit_seq: CommitSeq::new(2),
had_in_rw: d1_commit.ssi_state.has_in_rw,
keys: vec![page_key(310).clone(), page_key(311).clone()],
};
let committed_writer_d1 = CommittedWriterInfo {
token: d1,
commit_seq: CommitSeq::new(2),
had_out_rw: d1_commit.ssi_state.has_out_rw,
keys: vec![page_key(310).clone()],
};
let d2_reads = [page_key(310), page_key(311)];
let d2_writes = [page_key(311)];
let d2_result = ssi_validate_and_publish(
d2,
CommitSeq::new(1),
CommitSeq::new(3),
&d2_reads,
&d2_writes,
&[],
&[],
&[committed_reader_d1],
&[committed_writer_d1],
false,
);
assert!(
d2_result.is_err(),
"second doctor update must abort to preserve on-call invariant"
);
}
#[test]
fn test_t3_rule_active_pivot_low_contention_not_marked() {
let t = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let r = MockActiveTxn::new(2, 0, 1)
.with_reads(vec![page_key(5)])
.with_has_in_rw(true);
let readers: Vec<&dyn ActiveTxnView> = vec![&r];
let result = ssi_validate_and_publish(
t,
CommitSeq::new(1),
CommitSeq::new(5),
&[], &[page_key(5)], &readers,
&[],
&[],
&[],
false,
);
result.expect("T has only incoming edge, should commit");
let dro = super::default_t3_dro_matrix().evaluate(1, 0);
assert!(
!dro.should_abort(),
"low-contention DRO seam should allow the active pivot to continue"
);
assert!(r.has_out.get(), "R.has_out_rw should be set to true");
assert!(
!r.marked.get(),
"R should remain unmarked when the DRO penalty stays below threshold"
);
}
#[test]
fn test_t3_rule_active_pivot_marked_under_high_skew() {
let t = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let readers: Vec<MockActiveTxn> = (0_u64..20_u64)
.map(|idx| {
let reader = MockActiveTxn::new(2 + idx, 0, 1).with_reads(vec![page_key(5)]);
if idx == 0 {
reader.with_has_in_rw(true)
} else {
reader
}
})
.collect();
let reader_refs: Vec<&dyn ActiveTxnView> = readers
.iter()
.map(|reader| reader as &dyn ActiveTxnView)
.collect();
let result = ssi_validate_and_publish(
t,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&reader_refs,
&[],
&[],
&[],
false,
);
result.expect("incoming-only commit should still succeed");
let dro = super::default_t3_dro_matrix().evaluate(readers.len(), 0);
assert!(
dro.should_abort(),
"skewed reader population should exceed the default DRO threshold"
);
assert!(
readers[0].marked.get(),
"the active pivot should be marked once the DRO penalty exceeds threshold"
);
}
#[test]
fn test_t3_rule_committed_pivot_forces_abort() {
let t = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let committed_r = CommittedReaderInfo {
token: TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0)),
begin_seq: CommitSeq::new(0),
commit_seq: CommitSeq::new(3),
had_in_rw: true, keys: vec![page_key(5).clone()],
};
let result = ssi_validate_and_publish(
t,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&[],
&[],
&[committed_r],
&[],
false,
);
let err = result.expect_err("committed pivot → T must abort");
assert_eq!(err.reason, SsiAbortReason::CommittedPivot);
let dro = super::default_t3_dro_matrix().evaluate(1, 0);
assert_eq!(dro.active_readers, 1);
assert_eq!(dro.active_writers, 0);
assert!(
dro.cvar_penalty >= 0.0,
"DRO seam should always produce a non-negative penalty"
);
}
#[test]
fn test_t3_rule_active_no_in_rw_no_mark() {
let t = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let r = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&r];
let result = ssi_validate_and_publish(
t,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&readers,
&[],
&[],
&[],
false,
);
result.expect("T should commit");
assert!(r.has_out.get(), "R.has_out_rw should be set");
assert!(!r.marked.get(), "R should NOT be marked (no in_rw)");
}
#[test]
fn test_refinement_eliminates_false_edge() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(10)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(30)],
&[page_key(20)],
&readers,
&[],
&[],
&[],
false,
);
let ok = result.expect("no overlap → commit");
assert!(!ok.ssi_state.has_in_rw);
}
#[test]
fn test_skip_refinement_safe() {
let t = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let t1 = MockActiveTxn::new(1, 0, 1).with_reads(vec![page_key(5)]);
let t3 = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&t1];
let writers: Vec<&dyn ActiveTxnView> = vec![&t3];
let result = ssi_validate_and_publish(
t,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(5)],
&[page_key(5)],
&readers,
&writers,
&[],
&[],
false,
);
assert!(result.is_err(), "without refinement, overlap → abort");
}
#[test]
fn test_dependency_edge_published() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&readers,
&[],
&[],
&[],
false,
);
let ok = result.unwrap();
assert!(!ok.edges.is_empty(), "edge must be published");
assert_eq!(ok.edges[0].kind, DependencyEdgeKind::RwAntiDependency);
assert_eq!(ok.edges[0].from.id.get(), 2); assert_eq!(ok.edges[0].to.id.get(), 1); }
#[test]
fn test_commit_proof_published() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(10)],
&[page_key(20)],
&[],
&[],
&[],
&[],
false,
);
let ok = result.unwrap();
assert_eq!(ok.commit_proof.txn, txn);
assert_eq!(ok.commit_proof.begin_seq.get(), 1);
assert_eq!(ok.commit_proof.commit_seq.get(), 5);
assert_eq!(ok.commit_proof.abort_policy, AbortPolicy::AbortPivot);
}
#[test]
fn test_abort_witness_published() {
let txn = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(1, 0, 1).with_reads(vec![page_key(5)]);
let writer = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(7)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let writers: Vec<&dyn ActiveTxnView> = vec![&writer];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)],
&[page_key(5)],
&readers,
&writers,
&[],
&[],
false,
);
let err = result.unwrap_err();
assert_eq!(err.witness.txn, txn);
assert_eq!(err.witness.reason, AbortReason::SsiPivot);
assert!(
!err.witness.edges_observed.is_empty(),
"abort witness must contain edges"
);
}
#[test]
fn test_ssi_state_has_in_rw_flag() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&readers,
&[],
&[],
&[],
false,
);
let ok = result.unwrap();
assert!(ok.ssi_state.has_in_rw, "incoming edge must set has_in_rw");
}
#[test]
fn test_ssi_state_has_out_rw_flag() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let writer = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(7)]);
let writers: Vec<&dyn ActiveTxnView> = vec![&writer];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(7)],
&[page_key(20)],
&[],
&writers,
&[],
&[],
false,
);
let ok = result.unwrap();
assert!(ok.ssi_state.has_out_rw, "outgoing edge must set has_out_rw");
}
#[test]
fn test_ssi_state_marked_for_abort() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(10)],
&[page_key(20)],
&[],
&[],
&[],
&[],
true, );
let err = result.expect_err("marked_for_abort → must abort");
assert_eq!(err.reason, SsiAbortReason::MarkedForAbort);
}
#[test]
fn test_ssi_state_edges_emitted_tracking() {
let txn = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let reader = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_key(5)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&reader];
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[],
&[page_key(5)],
&readers,
&[],
&[],
&[],
false,
);
let ok = result.unwrap();
assert_eq!(ok.edge_ids.len(), ok.edges.len());
assert_eq!(ok.ssi_state.edges_emitted.len(), ok.edges.len());
}
#[test]
fn test_conservative_pivot_rule() {
let t2 = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let t1 = MockActiveTxn::new(1, 0, 1).with_reads(vec![page_key(10)]);
let t3 = MockActiveTxn::new(3, 0, 1).with_writes(vec![page_key(20)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&t1];
let writers: Vec<&dyn ActiveTxnView> = vec![&t3];
let result = ssi_validate_and_publish(
t2,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(20)],
&[page_key(10)],
&readers,
&writers,
&[],
&[],
false,
);
assert!(
result.is_err(),
"conservative rule: abort even with all active"
);
}
#[test]
fn test_false_positive_bounded() {
let mut commits = 0_u32;
let mut aborts = 0_u32;
for i in 0..100_u64 {
let txn = TxnToken::new(TxnId::new(i + 1).unwrap(), TxnEpoch::new(0));
#[allow(clippy::cast_possible_truncation)]
let pg = (i as u32) * 2 + 1;
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(i + 2),
&[page_key(pg)],
&[page_key(pg + 1)],
&[], &[], &[],
&[],
false,
);
match result {
Ok(_) => commits += 1,
Err(_) => aborts += 1,
}
}
assert_eq!(aborts, 0, "no false positives with non-overlapping writes");
assert_eq!(commits, 100);
}
#[test]
#[allow(clippy::redundant_clone, clippy::cloned_ref_to_slice_refs)]
fn test_write_skew_prevented() {
let t1_token = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let t2_token = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let page_a = page_key(10);
let page_b = page_key(20);
let t2_mock = MockActiveTxn::new(2, 0, 1).with_reads(vec![page_a.clone()]);
let t2_readers: Vec<&dyn ActiveTxnView> = vec![&t2_mock];
let result_t1 = ssi_validate_and_publish(
t1_token,
CommitSeq::new(1),
CommitSeq::new(2),
&[page_a.clone(), page_b.clone()], &[page_a.clone()], &t2_readers, &[],
&[],
&[],
false,
);
let ok_t1 = result_t1.expect("T1 should commit (only incoming)");
assert!(ok_t1.ssi_state.has_in_rw);
let reader_t1 = CommittedReaderInfo {
token: t1_token,
begin_seq: CommitSeq::new(1),
commit_seq: CommitSeq::new(2),
had_in_rw: ok_t1.ssi_state.has_in_rw,
keys: vec![page_a.clone(), page_b.clone()],
};
let writer_t1 = CommittedWriterInfo {
token: t1_token,
commit_seq: CommitSeq::new(2),
had_out_rw: ok_t1.ssi_state.has_out_rw,
keys: vec![page_a.clone()],
};
let result_t2 = ssi_validate_and_publish(
t2_token,
CommitSeq::new(1),
CommitSeq::new(3),
&[page_a.clone(), page_b.clone()], &[page_b], &[],
&[],
&[reader_t1], &[writer_t1], false,
);
assert!(result_t2.is_err(), "write skew must be prevented");
}
#[test]
fn test_concurrent_inserts_different_pages_no_abort() {
let t1 = TxnToken::new(TxnId::new(1).unwrap(), TxnEpoch::new(0));
let _t2 = TxnToken::new(TxnId::new(2).unwrap(), TxnEpoch::new(0));
let t2_view = MockActiveTxn::new(2, 0, 1)
.with_reads(vec![page_key(10)])
.with_writes(vec![page_key(10)]);
let readers: Vec<&dyn ActiveTxnView> = vec![&t2_view];
let writers: Vec<&dyn ActiveTxnView> = vec![&t2_view];
let result = ssi_validate_and_publish(
t1,
CommitSeq::new(1),
CommitSeq::new(2),
&[page_key(5)],
&[page_key(5)],
&readers,
&writers,
&[],
&[],
false,
);
result.expect("different pages → no conflict → both commit");
}
#[test]
fn test_phantom_batch_insert_scan_conflict_prevented() {
let t_scan = TxnToken::new(TxnId::new(301).unwrap(), TxnEpoch::new(0));
let t_insert = TxnToken::new(TxnId::new(302).unwrap(), TxnEpoch::new(0));
let range_witness = page_key(900);
let aggregate_page = page_key(901);
let active_insert = MockActiveTxn::new(302, 0, 1)
.with_reads(vec![aggregate_page.clone()])
.with_writes(vec![range_witness.clone()]);
let readers_for_scan: Vec<&dyn ActiveTxnView> = vec![&active_insert];
let writers_for_scan: Vec<&dyn ActiveTxnView> = vec![&active_insert];
let scan_result = ssi_validate_and_publish(
t_scan,
CommitSeq::new(1),
CommitSeq::new(2),
std::slice::from_ref(&range_witness),
std::slice::from_ref(&aggregate_page),
&readers_for_scan,
&writers_for_scan,
&[],
&[],
false,
);
assert!(
scan_result.is_err(),
"scan+aggregate transaction should abort under phantom-style cycle"
);
let insert_result = ssi_validate_and_publish(
t_insert,
CommitSeq::new(1),
CommitSeq::new(3),
std::slice::from_ref(&aggregate_page),
std::slice::from_ref(&range_witness),
&[],
&[],
&[],
&[],
false,
);
assert!(
insert_result.is_ok(),
"one side of the phantom-style cycle must still commit"
);
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_adversarial_three_way_cycle_breaks_with_single_abort() {
let begin_seq = CommitSeq::new(1);
let t1 = TxnToken::new(TxnId::new(401).unwrap(), TxnEpoch::new(0));
let t2 = TxnToken::new(TxnId::new(402).unwrap(), TxnEpoch::new(0));
let t3 = TxnToken::new(TxnId::new(403).unwrap(), TxnEpoch::new(0));
let v1 = MockActiveTxn::new(401, 0, begin_seq.get())
.with_reads(vec![page_key(1000)])
.with_writes(vec![page_key(1001)]);
let v2 = MockActiveTxn::new(402, 0, begin_seq.get())
.with_reads(vec![page_key(1001)])
.with_writes(vec![page_key(1002)]);
let v3 = MockActiveTxn::new(403, 0, begin_seq.get())
.with_reads(vec![page_key(1002)])
.with_writes(vec![page_key(1000)]);
let mut committed_readers = Vec::new();
let mut committed_writers = Vec::new();
let mut commits = 0_u32;
let mut aborts = 0_u32;
let readers_t1: Vec<&dyn ActiveTxnView> = vec![&v2];
let writers_t1: Vec<&dyn ActiveTxnView> = vec![&v3];
let t1_res = ssi_validate_and_publish(
t1,
begin_seq,
CommitSeq::new(2),
&v1.reads,
&v1.writes,
&readers_t1,
&writers_t1,
&committed_readers,
&committed_writers,
false,
);
match t1_res {
Ok(ok) => {
commits += 1;
committed_readers.push(CommittedReaderInfo {
token: t1,
begin_seq,
commit_seq: CommitSeq::new(2),
had_in_rw: ok.ssi_state.has_in_rw,
keys: v1.reads.clone(),
});
committed_writers.push(CommittedWriterInfo {
token: t1,
commit_seq: CommitSeq::new(2),
had_out_rw: ok.ssi_state.has_out_rw,
keys: v1.writes.clone(),
});
}
Err(_) => aborts += 1,
}
let readers_t2: Vec<&dyn ActiveTxnView> = vec![&v3];
let writers_t2: Vec<&dyn ActiveTxnView> = vec![&v3];
let t2_res = ssi_validate_and_publish(
t2,
begin_seq,
CommitSeq::new(3),
&v2.reads,
&v2.writes,
&readers_t2,
&writers_t2,
&committed_readers,
&committed_writers,
false,
);
match t2_res {
Ok(ok) => {
commits += 1;
committed_readers.push(CommittedReaderInfo {
token: t2,
begin_seq,
commit_seq: CommitSeq::new(3),
had_in_rw: ok.ssi_state.has_in_rw,
keys: v2.reads.clone(),
});
committed_writers.push(CommittedWriterInfo {
token: t2,
commit_seq: CommitSeq::new(3),
had_out_rw: ok.ssi_state.has_out_rw,
keys: v2.writes.clone(),
});
}
Err(_) => aborts += 1,
}
let t3_res = ssi_validate_and_publish(
t3,
begin_seq,
CommitSeq::new(4),
&v3.reads,
&v3.writes,
&[],
&[],
&committed_readers,
&committed_writers,
false,
);
match t3_res {
Ok(_) => commits += 1,
Err(_) => aborts += 1,
}
assert_eq!(commits + aborts, 3);
assert_eq!(aborts, 1, "exactly one abort should break the 3-cycle");
}
#[test]
#[allow(clippy::too_many_lines)]
fn test_100_writer_adversarial_schedule_with_serialization_checker() {
struct StressTxn {
token_id: u64,
token: TxnToken,
reads: Vec<WitnessKey>,
writes: Vec<WitnessKey>,
view: MockActiveTxn,
}
let begin_seq = CommitSeq::new(1);
let mut txns = Vec::new();
let mut next_id = 1_u64;
for pair in 0..10_u32 {
let base = 2000_u32 + pair * 10;
let reads = vec![page_key(base), page_key(base + 1)];
let token_a = TxnToken::new(TxnId::new(next_id).unwrap(), TxnEpoch::new(0));
let view_a = MockActiveTxn::new(next_id, 0, begin_seq.get())
.with_reads(reads.clone())
.with_writes(vec![page_key(base)]);
txns.push(StressTxn {
token_id: next_id,
token: token_a,
reads: reads.clone(),
writes: vec![page_key(base)],
view: view_a,
});
next_id += 1;
let token_b = TxnToken::new(TxnId::new(next_id).unwrap(), TxnEpoch::new(0));
let view_b = MockActiveTxn::new(next_id, 0, begin_seq.get())
.with_reads(reads.clone())
.with_writes(vec![page_key(base + 1)]);
txns.push(StressTxn {
token_id: next_id,
token: token_b,
reads,
writes: vec![page_key(base + 1)],
view: view_b,
});
next_id += 1;
}
while txns.len() < 100 {
let disjoint = 5000_u32 + u32::try_from(txns.len()).unwrap();
let token = TxnToken::new(TxnId::new(next_id).unwrap(), TxnEpoch::new(0));
let reads = vec![page_key(disjoint)];
let writes = vec![page_key(disjoint + 10_000)];
let view = MockActiveTxn::new(next_id, 0, begin_seq.get())
.with_reads(reads.clone())
.with_writes(writes.clone());
txns.push(StressTxn {
token_id: next_id,
token,
reads,
writes,
view,
});
next_id += 1;
}
let mut committed_ids = HashSet::new();
let mut committed_readers = Vec::new();
let mut committed_writers = Vec::new();
let mut abort_count = 0_u32;
for idx in 0..txns.len() {
let current = &txns[idx];
let active_tail = &txns[idx + 1..];
let active_readers: Vec<&dyn ActiveTxnView> = active_tail
.iter()
.map(|txn| &txn.view as &dyn ActiveTxnView)
.collect();
let active_writers: Vec<&dyn ActiveTxnView> = active_tail
.iter()
.map(|txn| &txn.view as &dyn ActiveTxnView)
.collect();
let commit_seq = CommitSeq::new(u64::try_from(idx).unwrap() + 2);
let result = ssi_validate_and_publish(
current.token,
begin_seq,
commit_seq,
¤t.reads,
¤t.writes,
&active_readers,
&active_writers,
&committed_readers,
&committed_writers,
false,
);
match result {
Ok(ok) => {
committed_ids.insert(current.token_id);
committed_readers.push(CommittedReaderInfo {
token: current.token,
begin_seq,
commit_seq,
had_in_rw: ok.ssi_state.has_in_rw,
keys: current.reads.clone(),
});
committed_writers.push(CommittedWriterInfo {
token: current.token,
commit_seq,
had_out_rw: ok.ssi_state.has_out_rw,
keys: current.writes.clone(),
});
}
Err(_) => abort_count += 1,
}
}
let total = u32::try_from(txns.len()).unwrap();
assert_eq!(
u32::try_from(committed_ids.len()).unwrap() + abort_count,
total,
"all 100 commit attempts must complete (no deadlock/livelock)"
);
let mut mandatory_aborts = 0_u32;
for pair in 0..10_u64 {
let tx_a = pair * 2 + 1;
let tx_b = pair * 2 + 2;
let a_committed = committed_ids.contains(&tx_a);
let b_committed = committed_ids.contains(&tx_b);
assert!(
a_committed ^ b_committed,
"conflict pair ({tx_a},{tx_b}) must commit exactly one member"
);
mandatory_aborts += 1;
}
for id in 21_u64..=100_u64 {
assert!(
committed_ids.contains(&id),
"disjoint writer {id} should commit"
);
}
let false_positive_aborts = abort_count.saturating_sub(mandatory_aborts);
assert!(
false_positive_aborts <= 5,
"false positive aborts must stay under 5%: {false_positive_aborts}/100"
);
}
#[test]
fn test_long_running_reader_stable_snapshot_under_writer_churn() {
let long_reader =
MockActiveTxn::new(9_001, 0, 1).with_reads(vec![page_key(700), page_key(701)]);
let active_readers: Vec<&dyn ActiveTxnView> = vec![&long_reader];
let mut commits = 0_u32;
for i in 0..200_u64 {
let writer = TxnToken::new(TxnId::new(9_100 + i).unwrap(), TxnEpoch::new(0));
let write_key = if i % 2 == 0 {
page_key(700)
} else {
page_key(701)
};
let result = ssi_validate_and_publish(
writer,
CommitSeq::new(1),
CommitSeq::new(i + 2),
&[],
&[write_key],
&active_readers,
&[],
&[],
&[],
false,
);
assert!(
result.is_ok(),
"writer churn should not deadlock/abort readers"
);
commits += 1;
}
assert_eq!(commits, 200);
assert!(
!long_reader.marked.get(),
"long-running read-only snapshot must not be marked for abort"
);
}
#[test]
fn test_evidence_metrics_count_by_outcome() {
let before = ssi_evidence_metrics_snapshot();
let commit_txn = TxnToken::new(TxnId::new(90_001).unwrap(), TxnEpoch::new(0));
let commit_result = ssi_validate_and_publish(
commit_txn,
CommitSeq::new(1),
CommitSeq::new(2),
&[page_key(500)],
&[page_key(600)],
&[],
&[],
&[],
&[],
false,
);
let commit_ok = commit_result.expect("commit decision should be recorded");
let abort_txn = TxnToken::new(TxnId::new(90_002).unwrap(), TxnEpoch::new(0));
let _reader = MockActiveTxn::new(90_003, 0, 1).with_reads(vec![page_key(700)]);
let _writer = MockActiveTxn::new(90_004, 0, 1).with_writes(vec![page_key(800)]);
let _readers = vec![CommittedReaderInfo {
token: TxnToken::new(TxnId::new(90_003).unwrap(), TxnEpoch::new(0)),
begin_seq: CommitSeq::new(0),
commit_seq: CommitSeq::new(1),
had_in_rw: commit_ok.ssi_state.has_in_rw,
keys: vec![page_key(700)],
}];
let _writers = vec![CommittedWriterInfo {
token: TxnToken::new(TxnId::new(90_004).unwrap(), TxnEpoch::new(0)),
commit_seq: CommitSeq::new(2),
had_out_rw: commit_ok.ssi_state.has_out_rw,
keys: vec![page_key(800)],
}];
let abort_result = ssi_validate_and_publish(
abort_txn,
CommitSeq::new(1),
CommitSeq::new(3),
&[page_key(800)],
&[page_key(700)],
&[&_reader],
&[&_writer],
&_readers,
&_writers,
false,
);
abort_result.expect_err("pivot abort should be recorded");
let after = ssi_evidence_metrics_snapshot();
assert!(
after.fsqlite_evidence_records_total_commit
> before.fsqlite_evidence_records_total_commit
);
assert!(
after.fsqlite_evidence_records_total_abort
> before.fsqlite_evidence_records_total_abort
);
assert!(
after.fsqlite_evidence_records_total() >= before.fsqlite_evidence_records_total() + 2
);
}
#[test]
fn test_evidence_store_queryable_by_txn_id() {
let txn = TxnToken::new(TxnId::new(90_101).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(1),
CommitSeq::new(5),
&[page_key(10)],
&[page_key(20)],
&[],
&[],
&[],
&[],
false,
);
result.expect("commit should succeed");
std::thread::sleep(std::time::Duration::from_millis(100));
let snapshot = ssi_evidence_snapshot();
let target_txn_id = txn.id.get();
let rows = ssi_evidence_query(&SsiDecisionQuery {
txn_id: Some(target_txn_id),
..SsiDecisionQuery::default()
});
if rows.is_empty() {
assert!(
!snapshot.is_empty(),
"evidence ledger should not be completely empty"
);
for card in &snapshot {
assert!(card.decision_id > 0, "every card must have a decision_id");
}
} else {
let last = rows.last().unwrap();
assert_eq!(last.decision_type, SsiDecisionType::CommitAllowed);
assert!(last.conflict_pages.is_empty());
assert!(last.write_set.is_empty());
assert_eq!(last.read_set_summary.page_count, 0);
}
}
#[test]
fn test_commit_allowed_evidence_defaults_to_compact_mode() {
with_ssi_evidence_test_config(
SsiEvidenceRecordingMode::CompactCommit,
SsiEvidenceBudgetConfig {
max_pending_records_before_compact: usize::MAX,
max_commit_evidence_bytes: u64::MAX,
},
|| {
let txn = TxnToken::new(TxnId::new(90_202).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(3),
CommitSeq::new(4),
&[page_key(920)],
&[page_key(921)],
&[],
&[],
&[],
&[],
false,
);
result.expect("commit should succeed");
std::thread::sleep(std::time::Duration::from_millis(100));
let snapshot = ssi_evidence_snapshot();
let rows = ssi_evidence_query(&SsiDecisionQuery {
txn_id: Some(txn.id.get()),
..SsiDecisionQuery::default()
});
if rows.is_empty() {
assert!(
!snapshot.is_empty(),
"evidence ledger should not be completely empty"
);
for card in &snapshot {
assert!(card.decision_id > 0, "every card must have a decision_id");
}
} else {
let last = rows.last().expect("budgeted full commit evidence row");
assert_eq!(last.decision_type, SsiDecisionType::CommitAllowed);
assert!(last.conflict_pages.is_empty());
assert!(last.write_set.is_empty());
assert_eq!(last.read_set_summary.page_count, 0);
}
},
);
}
#[test]
fn test_budgeted_commit_mode_keeps_full_evidence_when_within_budget() {
with_ssi_evidence_test_config(
SsiEvidenceRecordingMode::BudgetedCommit,
SsiEvidenceBudgetConfig {
max_pending_records_before_compact: usize::MAX,
max_commit_evidence_bytes: u64::MAX,
},
|| {
let before = ssi_evidence_metrics_snapshot();
let txn = TxnToken::new(TxnId::new(90_203).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(5),
CommitSeq::new(6),
&[page_key(930)],
&[page_key(931)],
&[],
&[],
&[],
&[],
false,
);
result.expect("commit should succeed");
let rows = ssi_evidence_query(&SsiDecisionQuery {
txn_id: Some(txn.id.get()),
..SsiDecisionQuery::default()
});
let last = rows.last().expect("budgeted full commit evidence row");
assert_eq!(last.decision_type, SsiDecisionType::CommitAllowed);
assert!(last.conflict_pages.is_empty());
assert!(!last.write_set.is_empty());
assert_eq!(last.write_set[0].get(), 931);
assert_eq!(last.read_set_summary.page_count, 1);
let after = ssi_evidence_metrics_snapshot();
assert_eq!(
after.fsqlite_evidence_records_total_budget_compact,
before.fsqlite_evidence_records_total_budget_compact
);
},
);
}
#[test]
fn test_budgeted_commit_mode_compacts_when_size_budget_is_exhausted() {
with_ssi_evidence_test_config(
SsiEvidenceRecordingMode::BudgetedCommit,
SsiEvidenceBudgetConfig {
max_pending_records_before_compact: usize::MAX,
max_commit_evidence_bytes: 0,
},
|| {
let before = ssi_evidence_metrics_snapshot();
let txn = TxnToken::new(TxnId::new(90_204).unwrap(), TxnEpoch::new(0));
let result = ssi_validate_and_publish(
txn,
CommitSeq::new(7),
CommitSeq::new(8),
&[page_key(940)],
&[page_key(941)],
&[],
&[],
&[],
&[],
false,
);
result.expect("commit should succeed");
let rows = ssi_evidence_query(&SsiDecisionQuery {
txn_id: Some(txn.id.get()),
..SsiDecisionQuery::default()
});
let last = rows.last().expect("budget-compacted commit evidence row");
assert_eq!(last.decision_type, SsiDecisionType::CommitAllowed);
assert!(last.conflict_pages.is_empty());
assert!(last.write_set.is_empty());
assert_eq!(last.read_set_summary.page_count, 0);
let after = ssi_evidence_metrics_snapshot();
assert!(
after.fsqlite_evidence_records_total_budget_compact
> before.fsqlite_evidence_records_total_budget_compact
);
},
);
}
}