use std::collections::BTreeMap;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use fsqlite_types::{
CommitMarker, CommitProof, CommitSeq, DependencyEdge, ReadWitness, TxnId, WriteWitness,
};
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ReservationId(u64);
impl ReservationId {
#[must_use]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[must_use]
pub const fn get(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PublicationPhase {
Reserved,
Writing,
Committed,
Aborted,
}
pub struct ReservationToken {
id: ReservationId,
publisher: Arc<WitnessPublisherInner>,
phase: PublicationPhase,
consumed: bool,
}
impl ReservationToken {
#[must_use]
pub fn id(&self) -> ReservationId {
self.id
}
#[must_use]
pub fn phase(&self) -> PublicationPhase {
self.phase
}
}
impl Drop for ReservationToken {
fn drop(&mut self) {
if !self.consumed {
warn!(
reservation_id = self.id.0,
phase = ?self.phase,
"ReservationToken dropped without commit — implicit abort"
);
self.publisher.mark_aborted(self.id);
}
}
}
#[derive(Debug)]
struct PendingPublication {
phase: PublicationPhase,
txn_id: TxnId,
reads: Vec<ReadWitness>,
writes: Vec<WriteWitness>,
edges: Vec<DependencyEdge>,
}
#[derive(Debug)]
struct WitnessPublisherInner {
next_id: AtomicU64,
pending: Mutex<BTreeMap<u64, PendingPublication>>,
committed: Mutex<Vec<CommittedPublication>>,
aborted: Mutex<Vec<u64>>,
}
impl WitnessPublisherInner {
fn mark_aborted(&self, id: ReservationId) {
let mut pending = self.pending.lock().unwrap_or_else(|e| e.into_inner());
if let Some(mut pub_) = pending.remove(&id.0) {
pub_.phase = PublicationPhase::Aborted;
drop(pending);
self.aborted
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(id.0);
info!(
reservation_id = id.0,
txn_id = pub_.txn_id.get(),
reads = pub_.reads.len(),
writes = pub_.writes.len(),
edges = pub_.edges.len(),
"publication aborted — partial writes are GC-able"
);
}
}
}
#[derive(Debug, Clone)]
pub struct CommittedPublication {
pub reservation_id: ReservationId,
pub txn_id: TxnId,
pub reads: Vec<ReadWitness>,
pub writes: Vec<WriteWitness>,
pub edges: Vec<DependencyEdge>,
pub proof: CommitProof,
}
#[derive(Clone)]
pub struct WitnessPublisher {
inner: Arc<WitnessPublisherInner>,
}
impl WitnessPublisher {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(WitnessPublisherInner {
next_id: AtomicU64::new(1),
pending: Mutex::new(BTreeMap::new()),
committed: Mutex::new(Vec::new()),
aborted: Mutex::new(Vec::new()),
}),
}
}
pub fn reserve(&self, txn_id: TxnId) -> ReservationToken {
let id = ReservationId(self.inner.next_id.fetch_add(1, Ordering::Relaxed));
let pub_ = PendingPublication {
phase: PublicationPhase::Reserved,
txn_id,
reads: Vec::new(),
writes: Vec::new(),
edges: Vec::new(),
};
{
let mut pending = self.inner.pending.lock().unwrap_or_else(|e| e.into_inner());
pending.insert(id.0, pub_);
}
debug!(
reservation_id = id.0,
txn_id = txn_id.get(),
phase = "reserve",
"publication reservation obtained"
);
ReservationToken {
id,
publisher: Arc::clone(&self.inner),
phase: PublicationPhase::Reserved,
consumed: false,
}
}
#[allow(clippy::significant_drop_tightening)]
pub fn write(
&self,
token: &mut ReservationToken,
reads: Vec<ReadWitness>,
writes: Vec<WriteWitness>,
edges: Vec<DependencyEdge>,
) -> Result<(), PublicationError> {
if token.phase != PublicationPhase::Reserved && token.phase != PublicationPhase::Writing {
return Err(PublicationError::InvalidPhase {
expected: PublicationPhase::Reserved,
actual: token.phase,
});
}
let read_count = reads.len();
let write_count = writes.len();
let edge_count = edges.len();
{
let mut pending = self.inner.pending.lock().unwrap_or_else(|e| e.into_inner());
let Some(pub_) = pending.get_mut(&token.id.0) else {
return Err(PublicationError::ReservationNotFound(token.id));
};
pub_.reads.extend(reads);
pub_.writes.extend(writes);
pub_.edges.extend(edges);
pub_.phase = PublicationPhase::Writing;
}
token.phase = PublicationPhase::Writing;
debug!(
reservation_id = token.id.0,
phase = "write",
new_reads = read_count,
new_writes = write_count,
new_edges = edge_count,
"publication write phase"
);
Ok(())
}
#[allow(clippy::significant_drop_tightening)]
pub fn commit(
&self,
token: &mut ReservationToken,
commit_proof: CommitProof,
) -> Result<CommittedPublication, PublicationError> {
if token.phase != PublicationPhase::Writing && token.phase != PublicationPhase::Reserved {
return Err(PublicationError::InvalidPhase {
expected: PublicationPhase::Writing,
actual: token.phase,
});
}
let committed = {
let mut pending = self.inner.pending.lock().unwrap_or_else(|e| e.into_inner());
let Some(pub_) = pending.remove(&token.id.0) else {
return Err(PublicationError::ReservationNotFound(token.id));
};
CommittedPublication {
reservation_id: token.id,
txn_id: pub_.txn_id,
reads: pub_.reads,
writes: pub_.writes,
edges: pub_.edges,
proof: commit_proof,
}
};
self.inner
.committed
.lock()
.unwrap_or_else(|e| e.into_inner())
.push(committed.clone());
token.phase = PublicationPhase::Committed;
token.consumed = true;
info!(
reservation_id = token.id.0,
txn_id = committed.txn_id.get(),
phase = "commit",
reads = committed.reads.len(),
writes = committed.writes.len(),
edges = committed.edges.len(),
"publication committed — evidence now visible"
);
Ok(committed)
}
pub fn abort(&self, token: &mut ReservationToken) {
if token.consumed {
return;
}
self.inner.mark_aborted(token.id);
token.phase = PublicationPhase::Aborted;
token.consumed = true;
}
#[must_use]
pub fn committed_publications(&self) -> Vec<CommittedPublication> {
let committed = self
.inner
.committed
.lock()
.unwrap_or_else(|e| e.into_inner());
committed.clone()
}
#[must_use]
pub fn is_aborted(&self, reservation_id: ReservationId) -> bool {
let aborted = self.inner.aborted.lock().unwrap_or_else(|e| e.into_inner());
aborted.contains(&reservation_id.0)
}
#[must_use]
pub fn is_pending(&self, reservation_id: ReservationId) -> bool {
let pending = self.inner.pending.lock().unwrap_or_else(|e| e.into_inner());
pending.contains_key(&reservation_id.0)
}
#[must_use]
pub fn aborted_count(&self) -> usize {
let aborted = self.inner.aborted.lock().unwrap_or_else(|e| e.into_inner());
aborted.len()
}
}
impl Default for WitnessPublisher {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for WitnessPublisher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let pending = self.inner.pending.lock().unwrap_or_else(|e| e.into_inner());
let committed = self
.inner
.committed
.lock()
.unwrap_or_else(|e| e.into_inner());
let aborted = self.inner.aborted.lock().unwrap_or_else(|e| e.into_inner());
f.debug_struct("WitnessPublisher")
.field("pending", &pending.len())
.field("committed", &committed.len())
.field("aborted", &aborted.len())
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PublicationError {
InvalidPhase {
expected: PublicationPhase,
actual: PublicationPhase,
},
ReservationNotFound(ReservationId),
}
impl std::fmt::Display for PublicationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidPhase { expected, actual } => {
write!(
f,
"publication phase error: expected {expected:?}, got {actual:?}"
)
}
Self::ReservationNotFound(id) => {
write!(f, "reservation {id:?} not found")
}
}
}
}
impl std::error::Error for PublicationError {}
#[derive(Debug, Clone, Default)]
pub struct CommitMarkerStore {
markers: BTreeMap<u64, CommitMarker>,
}
impl CommitMarkerStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn publish(&mut self, marker: CommitMarker) {
let seq = marker.commit_seq.get();
info!(
commit_seq = seq,
proof_object_id = ?marker.proof_object_id,
"commit marker published"
);
self.markers.insert(seq, marker);
}
#[must_use]
pub fn is_committed(&self, commit_seq: CommitSeq) -> bool {
self.markers.contains_key(&commit_seq.get())
}
#[must_use]
pub fn get(&self, commit_seq: CommitSeq) -> Option<&CommitMarker> {
self.markers.get(&commit_seq.get())
}
#[must_use]
pub fn resolve_seq_at_or_before_timestamp(&self, target_unix_ns: u64) -> Option<CommitSeq> {
self.markers.iter().rev().find_map(|(seq, marker)| {
(marker.commit_time_unix_ns <= target_unix_ns).then_some(CommitSeq::new(*seq))
})
}
#[must_use]
pub fn len(&self) -> usize {
self.markers.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.markers.is_empty()
}
}
#[derive(Debug)]
pub struct WitnessGcCoordinator {
retention_count: u64,
enabled: AtomicBool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GcEligibility {
pub safe_gc_seq: CommitSeq,
pub eligible_count: usize,
pub retained_count: usize,
pub prunable_count: usize,
}
#[derive(Debug, Clone, Copy)]
pub struct ActiveSlotSnapshot {
pub begin_seq: CommitSeq,
pub is_concurrent: bool,
pub witness_epoch: u32,
}
impl WitnessGcCoordinator {
#[must_use]
pub fn new(retention_count: u64) -> Self {
Self {
retention_count,
enabled: AtomicBool::new(true),
}
}
pub fn set_enabled(&self, enabled: bool) {
self.enabled.store(enabled, Ordering::Release);
}
#[must_use]
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Acquire)
}
#[must_use]
pub fn compute_safe_gc_seq(&self, active_slots: &[ActiveSlotSnapshot]) -> CommitSeq {
if active_slots.is_empty() {
return CommitSeq::ZERO;
}
active_slots
.iter()
.map(|s| s.begin_seq)
.min()
.unwrap_or(CommitSeq::ZERO)
}
#[must_use]
pub fn check_eligibility(
&self,
safe_gc_seq: CommitSeq,
witness_commit_seqs: &[CommitSeq],
) -> GcEligibility {
let eligible_count = witness_commit_seqs
.iter()
.filter(|&&seq| seq.get() < safe_gc_seq.get())
.count();
#[allow(clippy::cast_possible_truncation)]
let retained_count = if self.retention_count > 0 {
eligible_count.min(self.retention_count as usize)
} else {
0
};
let prunable_count = eligible_count.saturating_sub(retained_count);
GcEligibility {
safe_gc_seq,
eligible_count,
retained_count,
prunable_count,
}
}
#[must_use]
pub fn is_epoch_advancement_safe(
&self,
current_epoch: u32,
active_slots: &[ActiveSlotSnapshot],
) -> bool {
let old_epoch = current_epoch.wrapping_sub(1);
if old_epoch == 0 {
return true;
}
!active_slots
.iter()
.any(|s| s.is_concurrent && s.witness_epoch == old_epoch)
}
pub fn apply_gc(
&self,
store: &mut super::hot_witness_index::ColdWitnessStore,
safe_gc_seq: CommitSeq,
commit_seq_lookup: &dyn Fn(TxnId) -> Option<CommitSeq>,
) -> usize {
if !self.is_enabled() {
return 0;
}
let before_reads = store.read_witnesses.len();
let before_writes = store.write_witnesses.len();
let before_edges = store.dependency_edges.len();
store
.read_witnesses
.retain(|w| commit_seq_lookup(w.txn).is_none_or(|seq| seq.get() >= safe_gc_seq.get()));
store
.write_witnesses
.retain(|w| commit_seq_lookup(w.txn).is_none_or(|seq| seq.get() >= safe_gc_seq.get()));
store.dependency_edges.retain(|e| {
let from_seq = commit_seq_lookup(e.from);
let to_seq = commit_seq_lookup(e.to);
from_seq.is_none_or(|s| s.get() >= safe_gc_seq.get())
|| to_seq.is_none_or(|s| s.get() >= safe_gc_seq.get())
});
let pruned = (before_reads - store.read_witnesses.len())
+ (before_writes - store.write_witnesses.len())
+ (before_edges - store.dependency_edges.len());
if pruned > 0 {
info!(
safe_gc_seq = safe_gc_seq.get(),
pruned_reads = before_reads - store.read_witnesses.len(),
pruned_writes = before_writes - store.write_witnesses.len(),
pruned_edges = before_edges - store.dependency_edges.len(),
"witness GC applied"
);
}
pruned
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValidationVerdict {
Valid,
Invalid,
Incomplete,
}
#[derive(Debug, Clone)]
pub struct ProofCarryingCommit {
pub marker: CommitMarker,
pub proof: CommitProof,
pub reads: Vec<ReadWitness>,
pub writes: Vec<WriteWitness>,
pub edges: Vec<DependencyEdge>,
}
pub trait ProofCarryingValidator {
fn validate(&self, commit: &ProofCarryingCommit) -> ValidationVerdict;
}
#[derive(Debug, Default)]
pub struct DefaultProofValidator {
pub require_complete_evidence: bool,
}
impl ProofCarryingValidator for DefaultProofValidator {
fn validate(&self, commit: &ProofCarryingCommit) -> ValidationVerdict {
if commit.proof.commit_seq != commit.marker.commit_seq {
error!(
proof_seq = commit.proof.commit_seq.get(),
marker_seq = commit.marker.commit_seq.get(),
"proof/marker commit_seq mismatch"
);
return ValidationVerdict::Invalid;
}
for edge in &commit.proof.edges {
let has_read = commit.reads.iter().any(|r| r.txn == edge.from);
let has_write = commit.writes.iter().any(|w| w.txn == edge.to);
if self.require_complete_evidence && (!has_read || !has_write) {
warn!(
from = edge.from.get(),
to = edge.to.get(),
has_read,
has_write,
"incomplete evidence for dependency edge"
);
return ValidationVerdict::Incomplete;
}
}
let edges = &commit.edges;
for (i, e1) in edges.iter().enumerate() {
for e2 in edges.iter().skip(i + 1) {
if e1.to == e2.from && e1.from == e2.to {
error!("dangerous 2-node cycle detected in proof-carrying commit");
return ValidationVerdict::Invalid;
}
if e1.to == e2.from && edges.iter().any(|e3| e3.from == e2.to && e3.to == e1.from) {
error!("dangerous 3-node cycle detected in proof-carrying commit");
return ValidationVerdict::Invalid;
}
}
}
debug!(
commit_seq = commit.proof.commit_seq.get(),
edges = edges.len(),
reads = commit.reads.len(),
writes = commit.writes.len(),
verdict = "valid",
"proof-carrying validation replay"
);
ValidationVerdict::Valid
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hot_witness_index::ColdWitnessStore;
use fsqlite_types::{ObjectId, PageNumber, WitnessKey};
fn txn(id: u64) -> TxnId {
TxnId::new(id).unwrap()
}
fn page(n: u32) -> PageNumber {
PageNumber::new(n).unwrap()
}
fn key(n: u32) -> WitnessKey {
WitnessKey::for_cell_read(page(n), page(n), &n.to_le_bytes())
}
fn make_proof(commit_seq: u64, edges: Vec<DependencyEdge>) -> CommitProof {
CommitProof {
commit_seq: CommitSeq::new(commit_seq),
edges,
evidence_refs: Vec::new(),
}
}
fn make_marker(commit_seq: u64) -> CommitMarker {
CommitMarker {
commit_seq: CommitSeq::new(commit_seq),
commit_time_unix_ns: 1_000_000,
capsule_object_id: ObjectId::from_bytes([0u8; 16]),
proof_object_id: ObjectId::from_bytes([0u8; 16]),
prev_marker: None,
integrity_hash: [0u8; 16],
}
}
#[test]
fn test_publication_reserve_write_commit() {
let publisher = WitnessPublisher::new();
let t1 = txn(1);
let mut token = publisher.reserve(t1);
assert_eq!(token.phase(), PublicationPhase::Reserved);
assert!(publisher.is_pending(token.id()));
let reads = vec![ReadWitness {
txn: t1,
key: key(1),
}];
let writes = vec![WriteWitness {
txn: t1,
key: key(2),
}];
let edges = vec![DependencyEdge {
from: t1,
to: txn(2),
key_basis: key(1),
observed_by: t1,
}];
publisher
.write(&mut token, reads, writes, edges)
.expect("write should succeed");
assert_eq!(token.phase(), PublicationPhase::Writing);
let proof = make_proof(100, Vec::new());
let committed = publisher
.commit(&mut token, proof)
.expect("commit should succeed");
assert_eq!(committed.reads.len(), 1);
assert_eq!(committed.writes.len(), 1);
assert_eq!(committed.edges.len(), 1);
assert_eq!(committed.txn_id, t1);
let all_committed = publisher.committed_publications();
assert_eq!(all_committed.len(), 1);
assert!(!publisher.is_pending(committed.reservation_id));
}
#[test]
fn test_publication_abort_unreachable() {
let publisher = WitnessPublisher::new();
let t1 = txn(10);
let mut token = publisher.reserve(t1);
let reservation_id = token.id();
publisher
.write(
&mut token,
vec![ReadWitness {
txn: t1,
key: key(5),
}],
Vec::new(),
Vec::new(),
)
.expect("write should succeed");
drop(token);
assert!(publisher.is_aborted(reservation_id));
assert!(!publisher.is_pending(reservation_id));
assert!(publisher.committed_publications().is_empty());
assert_eq!(publisher.aborted_count(), 1);
}
#[test]
fn test_publication_crash_resilient() {
let publisher = WitnessPublisher::new();
let t1 = txn(50);
let mut token = publisher.reserve(t1);
publisher
.write(
&mut token,
vec![ReadWitness {
txn: t1,
key: key(3),
}],
vec![WriteWitness {
txn: t1,
key: key(4),
}],
Vec::new(),
)
.unwrap();
let proof = make_proof(200, Vec::new());
let committed = publisher.commit(&mut token, proof).unwrap();
let serialized_reads = serde_json::to_string(&committed.reads).unwrap();
let serialized_writes = serde_json::to_string(&committed.writes).unwrap();
drop(publisher);
let recovered_reads: Vec<ReadWitness> = serde_json::from_str(&serialized_reads).unwrap();
let recovered_writes: Vec<WriteWitness> = serde_json::from_str(&serialized_writes).unwrap();
assert_eq!(recovered_reads.len(), 1);
assert_eq!(recovered_reads[0].txn, t1);
assert_eq!(recovered_writes.len(), 1);
assert_eq!(recovered_writes[0].txn, t1);
}
#[test]
fn test_commit_marker_discipline() {
let mut marker_store = CommitMarkerStore::new();
let marker1 = make_marker(100);
marker_store.publish(marker1);
assert!(marker_store.is_committed(CommitSeq::new(100)));
assert!(!marker_store.is_committed(CommitSeq::new(200)));
let mut cold = ColdWitnessStore::new();
cold.publish_read_witness(ReadWitness {
txn: txn(2),
key: key(1),
});
assert_eq!(cold.reads_for_txn(txn(2)).len(), 1);
assert!(!marker_store.is_committed(CommitSeq::new(200)));
}
#[test]
fn test_commit_marker_timestamp_resolution() {
let mut marker_store = CommitMarkerStore::new();
let mut marker_10 = make_marker(10);
marker_10.commit_time_unix_ns = 1_000;
marker_store.publish(marker_10);
let mut marker_20 = make_marker(20);
marker_20.commit_time_unix_ns = 2_000;
marker_store.publish(marker_20);
let mut marker_30 = make_marker(30);
marker_30.commit_time_unix_ns = 3_000;
marker_store.publish(marker_30);
assert_eq!(
marker_store.resolve_seq_at_or_before_timestamp(2_500),
Some(CommitSeq::new(20))
);
assert_eq!(
marker_store.resolve_seq_at_or_before_timestamp(3_000),
Some(CommitSeq::new(30))
);
assert_eq!(marker_store.resolve_seq_at_or_before_timestamp(999), None);
}
#[test]
fn test_witness_gc_safe_seq() {
let gc = WitnessGcCoordinator::new(0);
let active_slots = vec![
ActiveSlotSnapshot {
begin_seq: CommitSeq::new(50),
is_concurrent: true,
witness_epoch: 3,
},
ActiveSlotSnapshot {
begin_seq: CommitSeq::new(100),
is_concurrent: true,
witness_epoch: 3,
},
];
let safe_gc_seq = gc.compute_safe_gc_seq(&active_slots);
assert_eq!(
safe_gc_seq,
CommitSeq::new(50),
"safe_gc_seq = min(begin_seq)"
);
let witness_seqs = vec![
CommitSeq::new(10),
CommitSeq::new(30),
CommitSeq::new(50),
CommitSeq::new(80),
];
let eligibility = gc.check_eligibility(safe_gc_seq, &witness_seqs);
assert_eq!(eligibility.eligible_count, 2); assert_eq!(eligibility.prunable_count, 2);
assert_eq!(eligibility.retained_count, 0);
}
#[test]
fn test_witness_gc_retention_policy() {
let gc = WitnessGcCoordinator::new(5);
let safe_gc_seq = CommitSeq::new(100);
let witness_seqs: Vec<CommitSeq> = (0..10).map(|i| CommitSeq::new(i * 10)).collect();
let eligibility = gc.check_eligibility(safe_gc_seq, &witness_seqs);
assert_eq!(eligibility.eligible_count, 10);
assert_eq!(eligibility.retained_count, 5); assert_eq!(eligibility.prunable_count, 5); }
#[test]
fn test_witness_epoch_pinned_at_begin_concurrent() {
let slot = ActiveSlotSnapshot {
begin_seq: CommitSeq::new(1),
is_concurrent: true,
witness_epoch: 5, };
assert!(slot.is_concurrent, "concurrent mode must be set");
assert_eq!(
slot.witness_epoch, 5,
"witness_epoch must be pinned at BEGIN"
);
let idx = crate::hot_witness_index::HotWitnessIndex::new(4, 64);
let epoch_at_begin = idx.current_epoch();
assert_eq!(epoch_at_begin, 1, "index starts at epoch 1");
}
#[test]
fn test_witness_epoch_not_pinned_for_serialized() {
let slot = ActiveSlotSnapshot {
begin_seq: CommitSeq::new(1),
is_concurrent: false,
witness_epoch: 0,
};
assert!(!slot.is_concurrent);
assert_eq!(
slot.witness_epoch, 0,
"serialized mode must set witness_epoch = 0"
);
}
#[test]
fn test_witness_epoch_advancement_waits_for_active_txns() {
let gc = WitnessGcCoordinator::new(0);
let active_slots = vec![ActiveSlotSnapshot {
begin_seq: CommitSeq::new(1),
is_concurrent: true,
witness_epoch: 4,
}];
assert!(
!gc.is_epoch_advancement_safe(5, &active_slots),
"must NOT advance while txn pinned to old epoch"
);
let active_slots_drained = vec![ActiveSlotSnapshot {
begin_seq: CommitSeq::new(1),
is_concurrent: true,
witness_epoch: 5, }];
assert!(
gc.is_epoch_advancement_safe(5, &active_slots_drained),
"safe to advance when oldest epoch drained"
);
}
#[test]
fn test_double_buffer_no_writer_starvation() {
let gc = WitnessGcCoordinator::new(0);
let active_slots = vec![
ActiveSlotSnapshot {
begin_seq: CommitSeq::new(1),
is_concurrent: true,
witness_epoch: 5, },
ActiveSlotSnapshot {
begin_seq: CommitSeq::new(2),
is_concurrent: true,
witness_epoch: 5,
},
ActiveSlotSnapshot {
begin_seq: CommitSeq::new(3),
is_concurrent: true,
witness_epoch: 5,
},
];
assert!(
gc.is_epoch_advancement_safe(5, &active_slots),
"readers at current epoch must not prevent advancement"
);
}
#[test]
fn test_distributed_proof_carrying_commit() {
let validator = DefaultProofValidator {
require_complete_evidence: false,
};
let t1 = txn(1);
let t2 = txn(2);
let edge = DependencyEdge {
from: t1,
to: t2,
key_basis: key(1),
observed_by: t1,
};
let pcc = ProofCarryingCommit {
marker: make_marker(100),
proof: make_proof(100, vec![edge.clone()]),
reads: vec![ReadWitness {
txn: t1,
key: key(1),
}],
writes: vec![WriteWitness {
txn: t2,
key: key(1),
}],
edges: vec![edge],
};
let verdict = validator.validate(&pcc);
assert_eq!(verdict, ValidationVerdict::Valid);
}
#[test]
fn test_distributed_witness_replay() {
let validator = DefaultProofValidator {
require_complete_evidence: true,
};
let t1 = txn(10);
let t2 = txn(20);
let t3 = txn(30);
let edges = vec![
DependencyEdge {
from: t1,
to: t2,
key_basis: key(1),
observed_by: t1,
},
DependencyEdge {
from: t2,
to: t3,
key_basis: key(2),
observed_by: t2,
},
];
let pcc = ProofCarryingCommit {
marker: make_marker(300),
proof: make_proof(300, edges.clone()),
reads: vec![
ReadWitness {
txn: t1,
key: key(1),
},
ReadWitness {
txn: t2,
key: key(2),
},
],
writes: vec![
WriteWitness {
txn: t2,
key: key(1),
},
WriteWitness {
txn: t3,
key: key(2),
},
],
edges,
};
assert_eq!(validator.validate(&pcc), ValidationVerdict::Valid);
}
#[test]
fn test_witness_visible_across_processes() {
let idx = Arc::new(crate::hot_witness_index::HotWitnessIndex::new(16, 256));
let config = crate::witness_hierarchy::WitnessHierarchyConfigV1::default();
let epoch = idx.current_epoch();
let key_a = WitnessKey::for_cell_read(page(5), page(5), b"shared_key");
let rks = crate::witness_hierarchy::derive_range_keys(&key_a, &config);
idx.register_read(0, epoch, &rks);
let readers = crate::hot_witness_index::bitset_to_slot_ids(&idx.candidate_readers(&rks));
assert!(
readers.contains(&0),
"Process B must see Process A's read registration"
);
}
#[test]
fn test_witness_cross_process_rw_detection() {
let idx = Arc::new(crate::hot_witness_index::HotWitnessIndex::new(16, 256));
let config = crate::witness_hierarchy::WitnessHierarchyConfigV1::default();
let epoch = idx.current_epoch();
let shared_key = WitnessKey::for_cell_read(page(10), page(10), b"account=100");
let rks = crate::witness_hierarchy::derive_range_keys(&shared_key, &config);
idx.register_read(0, epoch, &rks);
idx.register_write(1, epoch, &rks);
let readers = crate::hot_witness_index::bitset_to_slot_ids(&idx.candidate_readers(&rks));
let writers = crate::hot_witness_index::bitset_to_slot_ids(&idx.candidate_writers(&rks));
assert!(
readers.contains(&0) && writers.contains(&1),
"cross-process rw-antidependency must be detected"
);
}
#[test]
fn test_publication_explicit_abort() {
let publisher = WitnessPublisher::new();
let t1 = txn(99);
let mut token = publisher.reserve(t1);
let rid = token.id();
publisher
.write(
&mut token,
vec![ReadWitness {
txn: t1,
key: key(1),
}],
Vec::new(),
Vec::new(),
)
.unwrap();
publisher.abort(&mut token);
assert_eq!(token.phase(), PublicationPhase::Aborted);
assert!(publisher.is_aborted(rid));
assert!(publisher.committed_publications().is_empty());
drop(token);
assert_eq!(publisher.aborted_count(), 1);
}
#[test]
fn test_witness_gc_apply_to_cold_store() {
let gc = WitnessGcCoordinator::new(0);
let mut store = ColdWitnessStore::new();
store.publish_read_witness(ReadWitness {
txn: txn(1),
key: key(1),
});
store.publish_write_witness(WriteWitness {
txn: txn(1),
key: key(2),
});
store.publish_read_witness(ReadWitness {
txn: txn(2),
key: key(3),
});
store.publish_read_witness(ReadWitness {
txn: txn(3),
key: key(4),
});
let lookup = |id: TxnId| -> Option<CommitSeq> {
match id.get() {
1 => Some(CommitSeq::new(10)),
2 => Some(CommitSeq::new(50)),
3 => Some(CommitSeq::new(100)),
_ => None,
}
};
let pruned = gc.apply_gc(&mut store, CommitSeq::new(50), &lookup);
assert_eq!(pruned, 2, "should prune 1 read + 1 write for txn 1");
assert_eq!(store.read_witnesses.len(), 2); assert_eq!(store.write_witnesses.len(), 0); }
#[test]
fn test_proof_validation_rejects_mismatch() {
let validator = DefaultProofValidator {
require_complete_evidence: false,
};
let pcc = ProofCarryingCommit {
marker: make_marker(100),
proof: make_proof(200, Vec::new()), reads: Vec::new(),
writes: Vec::new(),
edges: Vec::new(),
};
assert_eq!(
validator.validate(&pcc),
ValidationVerdict::Invalid,
"must reject mismatched commit_seq"
);
}
#[test]
fn test_proof_validation_detects_incomplete_evidence() {
let validator = DefaultProofValidator {
require_complete_evidence: true,
};
let edge = DependencyEdge {
from: txn(1),
to: txn(2),
key_basis: key(1),
observed_by: txn(1),
};
let pcc = ProofCarryingCommit {
marker: make_marker(100),
proof: make_proof(100, vec![edge.clone()]),
reads: Vec::new(), writes: Vec::new(), edges: vec![edge],
};
assert_eq!(
validator.validate(&pcc),
ValidationVerdict::Incomplete,
"must detect incomplete evidence"
);
}
}