use fsqlite_types::{PageNumber, TxnEpoch, TxnId, TxnToken, WitnessKey};
use tracing::{debug, info};
#[derive(Debug, Clone, Default)]
pub struct WitnessSet {
pub reads: Vec<WitnessKey>,
pub writes: Vec<WitnessKey>,
pub token: Option<TxnToken>,
}
impl WitnessSet {
#[must_use]
pub fn new(token: TxnToken) -> Self {
Self {
reads: Vec::new(),
writes: Vec::new(),
token: Some(token),
}
}
pub fn register_point_read(&mut self, btree_root: PageNumber, canonical_key_bytes: &[u8]) {
let key = WitnessKey::for_cell_read(btree_root, btree_root, canonical_key_bytes);
debug!(
btree_root = btree_root.get(),
witness_kind = "cell",
op_kind = "point_read",
"witness key registered"
);
self.reads.push(key);
}
pub fn register_range_scan(&mut self, leaf_pages: &[PageNumber]) {
let keys = WitnessKey::for_range_scan(leaf_pages);
for key in &keys {
if let WitnessKey::Page(pgno) = key {
debug!(
page = pgno.get(),
witness_kind = "page",
op_kind = "range_scan",
"witness key registered"
);
}
}
self.reads.extend(keys);
}
pub fn register_point_write(
&mut self,
btree_root: PageNumber,
canonical_key_bytes: &[u8],
leaf_pgno: PageNumber,
) {
let (cell, page) = WitnessKey::for_point_write(btree_root, canonical_key_bytes, leaf_pgno);
debug!(
btree_root = btree_root.get(),
leaf_page = leaf_pgno.get(),
witness_kind = "cell+page",
op_kind = "point_write",
"witness key registered"
);
self.writes.push(cell);
self.writes.push(page);
}
pub fn publish_summary(&self) {
if let Some(ref token) = self.token {
info!(
txn_id = token.id.get(),
txn_epoch = token.epoch.get(),
read_count = self.reads.len(),
write_count = self.writes.len(),
plane = "hot",
"witness publication summary"
);
}
}
#[must_use]
pub fn overlaps_write(&self, read_key: &WitnessKey) -> bool {
self.writes
.iter()
.any(|w| witness_keys_overlap(w, read_key))
}
}
#[must_use]
pub fn witness_keys_overlap(a: &WitnessKey, b: &WitnessKey) -> bool {
match (a, b) {
(WitnessKey::Page(pa), WitnessKey::Page(pb) | WitnessKey::ByteRange { page: pb, .. })
| (WitnessKey::ByteRange { page: pb, .. }, WitnessKey::Page(pa)) => pa == pb,
(
WitnessKey::Cell {
btree_root: ra,
tag: ta,
..
},
WitnessKey::Cell {
btree_root: rb,
tag: tb,
..
},
) => ra == rb && ta == tb,
(
WitnessKey::KeyRange { btree_root: ra, .. },
WitnessKey::KeyRange { btree_root: rb, .. },
) => ra == rb,
(
WitnessKey::KeyRange {
btree_root: range_root,
..
},
WitnessKey::Cell {
btree_root: cell_root,
..
},
)
| (
WitnessKey::Cell {
btree_root: cell_root,
..
},
WitnessKey::KeyRange {
btree_root: range_root,
..
},
) => range_root == cell_root,
(
WitnessKey::KeyRange {
btree_root: range_root,
..
},
WitnessKey::ByteRange { page, .. },
)
| (
WitnessKey::ByteRange { page, .. },
WitnessKey::KeyRange {
btree_root: range_root,
..
},
) => range_root == page,
(
WitnessKey::Page(p),
WitnessKey::Cell { btree_root, .. } | WitnessKey::KeyRange { btree_root, .. },
)
| (
WitnessKey::Cell { btree_root, .. } | WitnessKey::KeyRange { btree_root, .. },
WitnessKey::Page(p),
) => {
p == btree_root
|| matches!(
a,
WitnessKey::Cell {
leaf_page: lp,
..
} if *lp == *p
)
|| matches!(
b,
WitnessKey::Cell {
leaf_page: lp,
..
} if *lp == *p
)
}
(
WitnessKey::ByteRange {
page: pa,
start: sa,
len: la,
},
WitnessKey::ByteRange {
page: pb,
start: sb,
len: lb,
},
) => pa == pb && *sa < sb.saturating_add(*lb) && *sb < sa.saturating_add(*la),
_ => true,
}
}
#[must_use]
pub fn validate_txn_token(candidate: &TxnToken, slot_id: TxnId, slot_epoch: TxnEpoch) -> bool {
candidate.id == slot_id && candidate.epoch == slot_epoch
}
#[cfg(test)]
mod tests {
use super::*;
fn page(n: u32) -> PageNumber {
PageNumber::new(n).unwrap()
}
fn token(id: u64, epoch: u32) -> TxnToken {
TxnToken::new(TxnId::new(id).unwrap(), TxnEpoch::new(epoch))
}
#[test]
fn test_witness_key_from_table_read() {
let btree_root = page(2);
let key_bytes = b"user_id=42";
let witness = WitnessKey::for_cell_read(btree_root, btree_root, key_bytes);
assert!(witness.is_cell());
if let WitnessKey::Cell {
btree_root: root,
leaf_page: lp,
tag,
} = &witness
{
assert_eq!(*root, btree_root);
assert_eq!(*lp, btree_root);
assert_ne!(*tag, 0, "tag must be non-zero for valid key bytes");
}
}
#[test]
fn test_witness_key_from_index_read() {
let leaf_pages = vec![page(10), page(11), page(12)];
let witnesses = WitnessKey::for_range_scan(&leaf_pages);
assert_eq!(witnesses.len(), 3);
for (i, w) in witnesses.iter().enumerate() {
assert!(w.is_page(), "range scan witnesses must be Page-level");
if let WitnessKey::Page(p) = w {
assert_eq!(p.get(), 10 + u32::try_from(i).unwrap());
}
}
}
#[test]
fn test_witness_key_from_write() {
let btree_root = page(2);
let key_bytes = b"user_id=42";
let leaf_pgno = page(15);
let (cell, page_w) = WitnessKey::for_point_write(btree_root, key_bytes, leaf_pgno);
assert!(cell.is_cell(), "write cell witness must be Cell variant");
assert!(page_w.is_page(), "write page witness must be Page variant");
let read_witness = WitnessKey::for_cell_read(btree_root, leaf_pgno, key_bytes);
assert_eq!(
cell, read_witness,
"write cell must match read cell for same key"
);
}
#[test]
fn test_witness_key_deterministic() {
let btree_root = page(5);
let key_bytes = b"email=alice@example.com";
let tag1 = WitnessKey::cell_tag(btree_root, key_bytes);
let tag2 = WitnessKey::cell_tag(btree_root, key_bytes);
assert_eq!(tag1, tag2, "cell_tag must be deterministic across calls");
let tag3 = WitnessKey::cell_tag(btree_root, b"email=bob@example.com");
assert_ne!(
tag1, tag3,
"different key bytes should produce different tags"
);
let tag4 = WitnessKey::cell_tag(page(6), key_bytes);
assert_ne!(
tag1, tag4,
"different btree_root should produce different tags"
);
}
#[test]
fn test_txn_token_epoch_prevents_stale() {
let original = token(42, 1);
assert!(validate_txn_token(
&original,
TxnId::new(42).unwrap(),
TxnEpoch::new(1)
));
assert!(!validate_txn_token(
&original,
TxnId::new(42).unwrap(),
TxnEpoch::new(2)
));
assert!(!validate_txn_token(
&original,
TxnId::new(99).unwrap(),
TxnEpoch::new(1)
));
}
#[test]
fn test_phantom_protection_leaf_pages() {
let mut ws = WitnessSet::new(token(1, 1));
let leaves = vec![page(100), page(101), page(102), page(103), page(104)];
ws.register_range_scan(&leaves);
assert_eq!(ws.reads.len(), 5, "must register one witness per leaf page");
for (i, key) in ws.reads.iter().enumerate() {
assert!(
matches!(key, WitnessKey::Page(p) if p.get() == 100 + u32::try_from(i).unwrap()),
"each witness must be Page(leaf_pgno)"
);
}
}
#[test]
fn test_cell_tag_deterministic_cross_process() {
let btree_root = page(3);
let key_bytes = b"account_number=9876543210";
let tag_a = WitnessKey::cell_tag(btree_root, key_bytes);
let tag_b = WitnessKey::cell_tag(btree_root, key_bytes);
assert_eq!(
tag_a, tag_b,
"cell_tag must be identical across independent processes for same inputs"
);
let tag_different_root = WitnessKey::cell_tag(page(4), key_bytes);
assert_ne!(tag_a, tag_different_root);
}
#[test]
fn prop_witness_key_collision_bounded() {
let btree_root = page(1);
let mut tags = std::collections::HashSet::new();
for i in 0..10_000_u32 {
let key_bytes = i.to_le_bytes();
let tag = WitnessKey::cell_tag(btree_root, &key_bytes);
assert!(
tags.insert(tag),
"collision at i={i}: tag={tag:#x} (birthday paradox threshold not exceeded)"
);
}
let tag_root1 = WitnessKey::cell_tag(page(1), b"key");
let tag_root2 = WitnessKey::cell_tag(page(2), b"key");
assert_ne!(
tag_root1, tag_root2,
"different roots must produce different tags"
);
}
#[test]
fn test_witness_overlap_detection() {
let btree_root = page(2);
let key_bytes = b"pk=1";
let mut writer_ws = WitnessSet::new(token(2, 1));
writer_ws.register_point_write(btree_root, key_bytes, page(10));
let read_key = WitnessKey::for_cell_read(btree_root, btree_root, key_bytes);
assert!(
writer_ws.overlaps_write(&read_key),
"must detect overlap between reader and writer on same cell"
);
let other_key = WitnessKey::for_cell_read(btree_root, btree_root, b"pk=2");
assert!(
!writer_ws.overlaps_write(&other_key),
"must not detect overlap for different key"
);
}
#[test]
fn test_keyrange_phantom_protection() {
let key_range = WitnessKey::KeyRange {
btree_root: page(100),
lo: b"aa".to_vec(),
hi: b"zz".to_vec(),
};
let same_leaf_write = WitnessKey::Page(page(100));
let other_leaf_write = WitnessKey::Page(page(101));
assert!(
witness_keys_overlap(&key_range, &same_leaf_write),
"range witness must overlap writes on the same btree root page"
);
assert!(
!witness_keys_overlap(&key_range, &other_leaf_write),
"range witness should not overlap writes from other roots"
);
let same_tree_other_bounds = WitnessKey::KeyRange {
btree_root: page(100),
lo: b"m".to_vec(),
hi: b"n".to_vec(),
};
assert!(
witness_keys_overlap(&key_range, &same_tree_other_bounds),
"same-tree key ranges conservatively overlap to preserve phantom safety"
);
}
#[test]
fn test_witness_set_summary() {
let mut ws = WitnessSet::new(token(42, 3));
ws.register_point_read(page(2), b"key1");
ws.register_point_write(page(2), b"key2", page(10));
ws.register_range_scan(&[page(20), page(21)]);
assert_eq!(ws.reads.len(), 3);
assert_eq!(ws.writes.len(), 2);
ws.publish_summary();
}
}