use std::collections::BTreeSet;
use fsqlite_types::{CommitSeq, ObjectId, RangeKey, TxnToken, WitnessKey};
use tracing::debug;
use crate::hot_witness_index::{HotWitnessIndex, bitset_to_slot_ids};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KeySummary {
ExactKeys(Vec<WitnessKey>),
HashedKeySet(Vec<u64>),
PageBitmap(BTreeSet<u32>),
CellBitmap(BTreeSet<u64>),
ByteRangeList(Vec<(u32, u16, u16)>),
Chunked(Vec<KeySummaryChunk>),
}
impl KeySummary {
#[must_use]
pub fn may_overlap(&self, key: &WitnessKey) -> bool {
match self {
Self::ExactKeys(keys) => keys.iter().any(|k| k == key),
Self::HashedKeySet(hashes) => {
let h = crate::witness_hierarchy::witness_key_hash(key);
hashes.binary_search(&h).is_ok()
}
Self::PageBitmap(pages) => {
if let WitnessKey::Custom { .. } = key {
true
} else {
let pgno = page_number_of(key);
pages.contains(&pgno)
}
}
Self::CellBitmap(cells) => {
if let WitnessKey::Custom { .. } = key {
true
} else {
let pgno = page_number_of(key);
let page_prefix = u64::from(pgno) << 32;
let page_end = page_prefix | 0xFFFF_FFFF;
cells.range(page_prefix..=page_end).next().is_some()
}
}
Self::ByteRangeList(ranges) => match key {
WitnessKey::ByteRange { page, start, len } => {
ranges.iter().any(|(range_page, range_start, range_len)| {
*range_page == page.get()
&& byte_ranges_overlap(*range_start, *range_len, *start, *len)
})
}
WitnessKey::Page(pgno) => ranges
.iter()
.any(|(range_page, _, _)| *range_page == pgno.get()),
WitnessKey::Cell { btree_root, .. } | WitnessKey::KeyRange { btree_root, .. } => {
ranges
.iter()
.any(|(range_page, _, _)| *range_page == btree_root.get())
}
WitnessKey::Custom { .. } => true,
},
Self::Chunked(chunks) => chunks.iter().any(|c| c.summary.may_overlap(key)),
}
}
#[must_use]
pub fn len(&self) -> usize {
match self {
Self::ExactKeys(keys) => keys.len(),
Self::HashedKeySet(hashes) => hashes.len(),
Self::PageBitmap(pages) => pages.len(),
Self::CellBitmap(cells) => cells.len(),
Self::ByteRangeList(ranges) => ranges.len(),
Self::Chunked(chunks) => chunks.iter().map(|c| c.summary.len()).sum(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KeySummaryChunk {
pub range_prefix: u32,
pub summary: KeySummary,
}
fn page_number_of(key: &WitnessKey) -> u32 {
match key {
WitnessKey::Page(pgno) => pgno.get(),
WitnessKey::Cell { btree_root, .. } | WitnessKey::KeyRange { btree_root, .. } => {
btree_root.get()
}
WitnessKey::ByteRange { page, .. } => page.get(),
WitnessKey::Custom { .. } => 0,
}
}
#[must_use]
fn byte_ranges_overlap(a_start: u16, a_len: u16, b_start: u32, b_len: u32) -> bool {
if a_len == 0 || b_len == 0 {
return false;
}
let a_start = u32::from(a_start);
let a_end = a_start.saturating_add(u32::from(a_len));
let b_end = b_start.saturating_add(b_len);
a_start < b_end && b_start < a_end
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WriteKind {
Intent,
Final,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct LogicalTime(u64);
impl LogicalTime {
#[must_use]
pub const fn new(raw: u64) -> Self {
Self(raw)
}
#[must_use]
pub const fn get(self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EcsReadWitness {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub level: u8,
pub range_prefix: u32,
pub key_summary: KeySummary,
pub emitted_at: LogicalTime,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EcsWriteWitness {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub level: u8,
pub range_prefix: u32,
pub key_summary: KeySummary,
pub emitted_at: LogicalTime,
pub write_kind: WriteKind,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WitnessDelta {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub kind: WitnessDeltaKind,
pub level: u8,
pub range_prefix: u32,
pub participation: WitnessParticipation,
pub refinement: Option<KeySummary>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WitnessDeltaKind {
Read,
Write,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum WitnessParticipation {
Present,
}
impl WitnessDelta {
#[must_use]
pub fn merge(self, other: &Self) -> Self {
debug_assert_eq!(self.txn, other.txn);
debug_assert_eq!(self.level, other.level);
debug_assert_eq!(self.range_prefix, other.range_prefix);
debug_assert_eq!(self.kind, other.kind);
let refinement = match (self.refinement, &other.refinement) {
(Some(a), Some(b)) => {
if a.len() >= b.len() {
Some(a)
} else {
Some(b.clone())
}
}
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b.clone()),
(None, None) => None,
};
Self {
txn: self.txn,
begin_seq: self.begin_seq,
kind: self.kind,
level: self.level,
range_prefix: self.range_prefix,
participation: WitnessParticipation::Present,
refinement,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EcsDependencyEdge {
pub kind: DependencyEdgeKind,
pub from: TxnToken,
pub to: TxnToken,
pub key_basis: EdgeKeyBasis,
pub observed_by: TxnToken,
pub observation_seq: CommitSeq,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DependencyEdgeKind {
RwAntiDependency,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EdgeKeyBasis {
pub level: u8,
pub range_prefix: u32,
pub refinement: Option<KeySummary>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EcsCommitProof {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub commit_seq: CommitSeq,
pub has_in_rw: bool,
pub has_out_rw: bool,
pub read_witness_refs: Vec<ObjectId>,
pub write_witness_refs: Vec<ObjectId>,
pub index_segments_used: Vec<ObjectId>,
pub edges_emitted: Vec<ObjectId>,
pub merge_witnesses: Vec<ObjectId>,
pub abort_policy: AbortPolicy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AbortPolicy {
AbortPivot,
AbortDro,
AbortYoungest,
Custom,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AbortWitness {
pub txn: TxnToken,
pub begin_seq: CommitSeq,
pub abort_seq: CommitSeq,
pub reason: AbortReason,
pub edges_observed: Vec<EcsDependencyEdge>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AbortReason {
SsiPivot,
Cancelled,
Other,
}
#[derive(Debug, Clone)]
pub struct HotPlaneCandidates {
pub reader_slot_ids: Vec<u32>,
pub writer_slot_ids: Vec<u32>,
}
#[must_use]
pub fn hot_plane_discover(index: &HotWitnessIndex, range_keys: &[RangeKey]) -> HotPlaneCandidates {
let reader_bits = index.candidate_readers(range_keys);
let writer_bits = index.candidate_writers(range_keys);
let reader_slot_ids = bitset_to_slot_ids(&reader_bits);
let writer_slot_ids = bitset_to_slot_ids(&writer_bits);
debug!(
bead_id = "bd-1if1",
range_keys = range_keys.len(),
reader_candidates = reader_slot_ids.len(),
writer_candidates = writer_slot_ids.len(),
"hot-plane candidate discovery"
);
HotPlaneCandidates {
reader_slot_ids,
writer_slot_ids,
}
}
#[derive(Debug, Clone)]
pub struct ColdPlaneRefinementResult {
pub confirmed_readers: Vec<TxnToken>,
pub confirmed_writers: Vec<TxnToken>,
pub false_positives_eliminated: usize,
}
pub fn cold_plane_refine(
candidate_readers: &[(TxnToken, Option<EcsReadWitness>)],
candidate_writers: &[(TxnToken, Option<EcsWriteWitness>)],
query_key: &WitnessKey,
) -> ColdPlaneRefinementResult {
let mut confirmed_readers = Vec::new();
let mut eliminated = 0_usize;
for (token, witness) in candidate_readers {
match witness {
Some(w) if w.key_summary.may_overlap(query_key) => {
confirmed_readers.push(*token);
}
Some(_) => {
eliminated += 1;
}
None => {
confirmed_readers.push(*token);
}
}
}
let mut confirmed_writers = Vec::new();
for (token, witness) in candidate_writers {
match witness {
Some(w) if w.key_summary.may_overlap(query_key) => {
confirmed_writers.push(*token);
}
Some(_) => {
eliminated += 1;
}
None => {
confirmed_writers.push(*token);
}
}
}
if eliminated > 0 {
debug!(
bead_id = "bd-1if1",
eliminated,
confirmed_readers = confirmed_readers.len(),
confirmed_writers = confirmed_writers.len(),
"cold-plane refinement eliminated false positives"
);
}
ColdPlaneRefinementResult {
confirmed_readers,
confirmed_writers,
false_positives_eliminated: eliminated,
}
}
#[cfg(test)]
mod tests {
use super::*;
use fsqlite_types::{CommitSeq, PageNumber, TxnEpoch, TxnId, TxnToken};
fn test_txn_token(id: u64, epoch: u32) -> TxnToken {
TxnToken::new(TxnId::new(id).unwrap(), TxnEpoch::new(epoch))
}
fn test_page_key(pgno: u32) -> WitnessKey {
WitnessKey::Page(PageNumber::new(pgno).unwrap())
}
fn test_range_key(level: u8, hash_prefix: u32) -> RangeKey {
RangeKey { level, hash_prefix }
}
#[test]
fn test_key_summary_canonical_encoding() {
let exact =
KeySummary::ExactKeys(vec![test_page_key(1), test_page_key(2), test_page_key(3)]);
assert_eq!(exact.len(), 3);
assert!(exact.may_overlap(&test_page_key(2)));
assert!(!exact.may_overlap(&test_page_key(4)));
let hashed = KeySummary::HashedKeySet(vec![100, 200, 300]);
assert_eq!(hashed.len(), 3);
assert!(!hashed.is_empty());
let bitmap = KeySummary::PageBitmap(BTreeSet::from([1, 2, 5, 10]));
assert!(bitmap.may_overlap(&test_page_key(5)));
assert!(!bitmap.may_overlap(&test_page_key(6)));
let cell_bmp = KeySummary::CellBitmap(BTreeSet::from([(0x3_u64 << 32) | 0x2a]));
assert_eq!(cell_bmp.len(), 1);
let ranges = KeySummary::ByteRangeList(vec![(1, 0, 100), (2, 50, 200)]);
assert_eq!(ranges.len(), 2);
assert!(ranges.may_overlap(&test_page_key(1)));
assert!(!ranges.may_overlap(&test_page_key(99)));
assert!(ranges.may_overlap(&WitnessKey::ByteRange {
page: PageNumber::new(2).unwrap(),
start: 100,
len: 20,
}));
assert!(!ranges.may_overlap(&WitnessKey::ByteRange {
page: PageNumber::new(2).unwrap(),
start: 400,
len: 20,
}));
let chunked = KeySummary::Chunked(vec![
KeySummaryChunk {
range_prefix: 0,
summary: KeySummary::ExactKeys(vec![test_page_key(1)]),
},
KeySummaryChunk {
range_prefix: 1,
summary: KeySummary::PageBitmap(BTreeSet::from([5])),
},
]);
assert!(chunked.may_overlap(&test_page_key(1)));
assert!(chunked.may_overlap(&test_page_key(5)));
assert!(!chunked.may_overlap(&test_page_key(3)));
}
#[test]
fn test_read_witness_ecs_deterministic() {
let token = test_txn_token(1, 0);
let w1 = EcsReadWitness {
txn: token,
begin_seq: CommitSeq::new(10),
level: 0,
range_prefix: 42,
key_summary: KeySummary::ExactKeys(vec![test_page_key(1), test_page_key(2)]),
emitted_at: LogicalTime::new(100),
};
let w2 = EcsReadWitness {
txn: token,
begin_seq: CommitSeq::new(10),
level: 0,
range_prefix: 42,
key_summary: KeySummary::ExactKeys(vec![test_page_key(1), test_page_key(2)]),
emitted_at: LogicalTime::new(100),
};
assert_eq!(w1, w2, "same inputs must produce identical ECS witnesses");
}
#[test]
fn test_write_witness_kinds() {
let token = test_txn_token(2, 0);
let intent = EcsWriteWitness {
txn: token,
begin_seq: CommitSeq::new(5),
level: 0,
range_prefix: 0,
key_summary: KeySummary::ExactKeys(vec![test_page_key(3)]),
emitted_at: LogicalTime::new(50),
write_kind: WriteKind::Intent,
};
let finalized = EcsWriteWitness {
write_kind: WriteKind::Final,
..intent.clone()
};
assert_ne!(intent.write_kind, finalized.write_kind);
assert_eq!(intent.write_kind, WriteKind::Intent);
assert_eq!(finalized.write_kind, WriteKind::Final);
}
#[test]
fn test_witness_delta_crdt_merge() {
let token = test_txn_token(3, 1);
let delta_a = WitnessDelta {
txn: token,
begin_seq: CommitSeq::new(7),
kind: WitnessDeltaKind::Read,
level: 0,
range_prefix: 0,
participation: WitnessParticipation::Present,
refinement: None,
};
let delta_b = WitnessDelta {
txn: token,
begin_seq: CommitSeq::new(7),
kind: WitnessDeltaKind::Read,
level: 0,
range_prefix: 0,
participation: WitnessParticipation::Present,
refinement: Some(KeySummary::ExactKeys(vec![test_page_key(1)])),
};
let merged = delta_a.merge(&delta_b);
assert_eq!(merged.participation, WitnessParticipation::Present);
assert!(merged.refinement.is_some());
}
#[test]
fn test_dependency_edge_canonical() {
let from = test_txn_token(10, 0);
let to = test_txn_token(20, 0);
let observer = test_txn_token(30, 0);
let edge = EcsDependencyEdge {
kind: DependencyEdgeKind::RwAntiDependency,
from,
to,
key_basis: EdgeKeyBasis {
level: 0,
range_prefix: 42,
refinement: Some(KeySummary::PageBitmap(BTreeSet::from([5]))),
},
observed_by: observer,
observation_seq: CommitSeq::new(100),
};
assert_eq!(edge.kind, DependencyEdgeKind::RwAntiDependency);
assert_eq!(edge.from.id.get(), 10);
assert_eq!(edge.to.id.get(), 20);
assert!(edge.key_basis.refinement.is_some());
}
#[test]
fn test_commit_proof_replay() {
let txn = test_txn_token(5, 0);
let proof = EcsCommitProof {
txn,
begin_seq: CommitSeq::new(10),
commit_seq: CommitSeq::new(15),
has_in_rw: true,
has_out_rw: false,
read_witness_refs: vec![ObjectId::from_bytes([1_u8; 16])],
write_witness_refs: vec![ObjectId::from_bytes([2_u8; 16])],
index_segments_used: vec![],
edges_emitted: vec![ObjectId::from_bytes([3_u8; 16])],
merge_witnesses: vec![],
abort_policy: AbortPolicy::AbortPivot,
};
assert!(proof.has_in_rw);
assert!(!proof.has_out_rw);
assert_eq!(proof.read_witness_refs.len(), 1);
assert_eq!(proof.edges_emitted.len(), 1);
assert_eq!(proof.abort_policy, AbortPolicy::AbortPivot);
}
#[test]
fn test_hot_plane_no_false_negatives() {
let index = HotWitnessIndex::new(16, 64);
let epoch = index.current_epoch();
let rk = test_range_key(0, 42);
index.register_read(3, epoch, &[rk]);
let candidates = hot_plane_discover(&index, &[rk]);
assert!(
candidates.reader_slot_ids.contains(&3),
"active reader at slot 3 must be discoverable: got {:?}",
candidates.reader_slot_ids
);
}
#[test]
fn test_hot_plane_epoch_overlap() {
let index = HotWitnessIndex::new(16, 64);
let epoch = index.current_epoch();
let rk = test_range_key(0, 100);
index.register_read(5, epoch, &[rk]);
let prev_epoch = if epoch > 0 { epoch - 1 } else { epoch };
index.register_read(7, prev_epoch, &[rk]);
let candidates = hot_plane_discover(&index, &[rk]);
assert!(
candidates.reader_slot_ids.contains(&5),
"current-epoch reader must be found"
);
}
#[test]
fn test_cold_plane_refinement_reduces_fp() {
let token_real = test_txn_token(10, 0);
let token_fp = test_txn_token(20, 0);
let query_key = test_page_key(5);
let real_witness = EcsReadWitness {
txn: token_real,
begin_seq: CommitSeq::new(1),
level: 0,
range_prefix: 0,
key_summary: KeySummary::PageBitmap(BTreeSet::from([5, 6, 7])),
emitted_at: LogicalTime::new(1),
};
let fp_witness = EcsReadWitness {
txn: token_fp,
begin_seq: CommitSeq::new(2),
level: 0,
range_prefix: 0,
key_summary: KeySummary::PageBitmap(BTreeSet::from([10, 11, 12])),
emitted_at: LogicalTime::new(2),
};
let candidates = vec![
(token_real, Some(real_witness)),
(token_fp, Some(fp_witness)),
];
let result = cold_plane_refine(&candidates, &[], &query_key);
assert_eq!(result.confirmed_readers.len(), 1);
assert_eq!(result.confirmed_readers[0], token_real);
assert_eq!(result.false_positives_eliminated, 1);
}
#[test]
fn prop_key_summary_soundness_no_false_negatives() {
let keys: Vec<WitnessKey> = (1..=100).map(test_page_key).collect();
let summary = KeySummary::ExactKeys(keys);
for i in 1..=100 {
assert!(
summary.may_overlap(&test_page_key(i)),
"ExactKeys must not have false negatives for page {i}"
);
}
for i in 101..=200 {
assert!(
!summary.may_overlap(&test_page_key(i)),
"ExactKeys should not match page {i}"
);
}
}
#[test]
fn test_abort_witness_construction() {
let txn = test_txn_token(42, 1);
let aw = AbortWitness {
txn,
begin_seq: CommitSeq::new(10),
abort_seq: CommitSeq::new(15),
reason: AbortReason::SsiPivot,
edges_observed: vec![],
};
assert_eq!(aw.reason, AbortReason::SsiPivot);
assert!(aw.edges_observed.is_empty());
}
#[test]
fn test_witness_delta_union_only() {
let token = test_txn_token(50, 0);
let base = WitnessDelta {
txn: token,
begin_seq: CommitSeq::new(1),
kind: WitnessDeltaKind::Write,
level: 1,
range_prefix: 99,
participation: WitnessParticipation::Present,
refinement: Some(KeySummary::ExactKeys(vec![
test_page_key(1),
test_page_key(2),
])),
};
let update = WitnessDelta {
refinement: Some(KeySummary::ExactKeys(vec![
test_page_key(1),
test_page_key(2),
test_page_key(3),
])),
..base
};
let merged = base.merge(&update);
assert_eq!(merged.participation, WitnessParticipation::Present);
assert_eq!(merged.refinement.as_ref().unwrap().len(), 3);
}
#[test]
fn test_roaring_bitmap_visibility() {
let visible_pages: BTreeSet<u32> = (1_u32..=100).collect();
let summary = KeySummary::PageBitmap(visible_pages);
for page in 1_u32..=100 {
assert!(
summary.may_overlap(&test_page_key(page)),
"bitmap should include in-flight visible page {page}"
);
}
assert!(
!summary.may_overlap(&test_page_key(101)),
"bitmap should reject out-of-set page"
);
}
}