use super::types::{OutputId, OutputKV, OUTPUT_ID_DELETED};
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
static CHECKPOINT_GC_FENCE: AtomicI32 = AtomicI32::new(i32::MAX);
pub fn set_gc_fence(checkpoint_height: i32) {
CHECKPOINT_GC_FENCE.store(checkpoint_height, Ordering::Release);
tracing::debug!(
"IBD engine GC fence set to {} — cross-checkpoint GC disabled for Delete > {}",
checkpoint_height,
checkpoint_height
);
}
pub fn gc_fence_snapshot() -> i32 {
CHECKPOINT_GC_FENCE.load(Ordering::Acquire)
}
#[derive(Debug, Clone)]
pub struct Directory {
buckets: Vec<u32>,
prefix_bits: u32,
}
impl Directory {
pub fn build(entries: &[OutputKV]) -> Self {
if entries.is_empty() {
return Self {
buckets: vec![0, 0],
prefix_bits: 1,
};
}
let n = entries.len();
let raw_bits = if n <= 128 {
4u32
} else {
let ratio = (n / 85).max(1);
(usize::BITS - ratio.leading_zeros()).clamp(4, 16)
};
let prefix_bits = raw_bits;
let num_buckets = 1usize << prefix_bits;
let mut buckets = vec![0u32; num_buckets + 1];
for (i, kv) in entries.iter().enumerate() {
let prefix = key_prefix(&kv.key, prefix_bits) as usize;
buckets[prefix + 1] = buckets[prefix + 1].max((i + 1) as u32);
}
let mut bucket_start = vec![0u32; num_buckets + 1];
let mut cur_bucket = 0usize;
for (i, kv) in entries.iter().enumerate() {
let prefix = key_prefix(&kv.key, prefix_bits) as usize;
while cur_bucket <= prefix {
bucket_start[cur_bucket] = i as u32;
cur_bucket += 1;
}
}
while cur_bucket <= num_buckets {
bucket_start[cur_bucket] = entries.len() as u32;
cur_bucket += 1;
}
Self {
buckets: bucket_start,
prefix_bits,
}
}
#[inline]
pub fn lookup_range(&self, key: &[u8; 36]) -> (usize, usize) {
let prefix = key_prefix(key, self.prefix_bits) as usize;
let lo = self.buckets[prefix] as usize;
let hi = self.buckets[prefix + 1] as usize;
(lo, hi)
}
pub(super) fn build_streaming(
reader: &mut super::disk_segment::SegmentReader,
entry_count: usize,
) -> anyhow::Result<Self> {
if entry_count == 0 {
return Ok(Self {
buckets: vec![0, 0],
prefix_bits: 1,
});
}
let prefix_bits = if entry_count <= 128 {
4u32
} else {
let ratio = (entry_count / 85).max(1);
(usize::BITS - ratio.leading_zeros()).clamp(4, 16)
};
let num_buckets = 1usize << prefix_bits;
let mut bucket_start = vec![0u32; num_buckets + 1];
let mut cur_bucket = 0usize;
let mut i = 0usize;
while let Some(kv) = reader.advance()? {
let prefix = key_prefix(&kv.key, prefix_bits) as usize;
while cur_bucket <= prefix {
bucket_start[cur_bucket] = i as u32;
cur_bucket += 1;
}
i += 1;
}
while cur_bucket <= num_buckets {
bucket_start[cur_bucket] = entry_count as u32;
cur_bucket += 1;
}
Ok(Self {
buckets: bucket_start,
prefix_bits,
})
}
}
#[inline]
fn key_prefix(key: &[u8; 36], bits: u32) -> u32 {
let raw = u32::from_be_bytes(key[..4].try_into().unwrap());
raw >> (32 - bits)
}
#[derive(Debug, Clone)]
pub struct BloomFilter {
data: Vec<u64>,
num_blocks: usize,
}
const BLOOM_WORDS_PER_BLOCK: usize = 8;
impl BloomFilter {
const GOLDEN_RATIO_64: u64 = 0x9e3779b97f4a7c15;
pub fn build(entries: &[OutputKV]) -> Self {
if entries.is_empty() {
return Self {
data: vec![0u64; BLOOM_WORDS_PER_BLOCK],
num_blocks: 1,
};
}
let bits_needed = entries.len() * 12;
let num_blocks = bits_needed.div_ceil(512).max(1);
let mut data = vec![0u64; num_blocks * BLOOM_WORDS_PER_BLOCK];
for kv in entries {
let (block_idx, word_bits) = Self::hash_key(&kv.key, num_blocks);
let base = block_idx * BLOOM_WORDS_PER_BLOCK;
for probe in 0..7usize {
let bit = ((word_bits >> (probe * 9)) & 0x1FF) as usize;
let word = bit / 64;
let shift = bit % 64;
data[base + word % BLOOM_WORDS_PER_BLOCK] |= 1u64 << shift;
}
}
Self { data, num_blocks }
}
pub fn new_for_capacity(capacity: usize) -> Self {
let capacity = capacity.max(1);
let bits_needed = capacity * 12;
let num_blocks = bits_needed.div_ceil(512).max(1);
Self {
data: vec![0u64; num_blocks * BLOOM_WORDS_PER_BLOCK],
num_blocks,
}
}
#[inline]
pub fn insert(&mut self, key: &[u8; 36]) {
let (block_idx, word_bits) = Self::hash_key(key, self.num_blocks);
let base = block_idx * BLOOM_WORDS_PER_BLOCK;
for probe in 0..7usize {
let bit = ((word_bits >> (probe * 9)) & 0x1FF) as usize;
let word = bit / 64;
let shift = bit % 64;
self.data[base + word % BLOOM_WORDS_PER_BLOCK] |= 1u64 << shift;
}
}
#[inline]
pub fn may_contain(&self, key: &[u8; 36]) -> bool {
let (block_idx, word_bits) = Self::hash_key(key, self.num_blocks);
let base = block_idx * BLOOM_WORDS_PER_BLOCK;
for probe in 0..7usize {
let bit = ((word_bits >> (probe * 9)) & 0x1FF) as usize;
let word = bit / 64;
let shift = bit % 64;
if self.data[base + word % BLOOM_WORDS_PER_BLOCK] & (1u64 << shift) == 0 {
return false;
}
}
true
}
#[inline]
fn hash_key(key: &[u8; 36], num_blocks: usize) -> (usize, u64) {
let r0 = u64::from_le_bytes(key[0..8].try_into().unwrap());
let r1 = u64::from_le_bytes(key[8..16].try_into().unwrap());
let r2 = u64::from_le_bytes(key[16..24].try_into().unwrap());
let r3 = u64::from_le_bytes(key[24..32].try_into().unwrap());
let r4 = u32::from_le_bytes(key[32..36].try_into().unwrap()) as u64;
let acc = r0
.wrapping_add(r1.rotate_left(17))
.wrapping_add(r2.rotate_right(11))
.wrapping_add(r3.rotate_left(29))
.wrapping_add(r4.wrapping_mul(Self::GOLDEN_RATIO_64));
let fmix = |mut x: u64| -> u64 {
x ^= x >> 33;
x = x.wrapping_mul(0xff51afd7ed558ccd);
x ^= x >> 33;
x = x.wrapping_mul(0xc4ceb9fe1a85ec53);
x ^= x >> 33;
x
};
let h0 = fmix(acc);
let h1 = fmix(acc.wrapping_add(Self::GOLDEN_RATIO_64));
let block_idx = (h0 % num_blocks as u64) as usize;
(block_idx, h1)
}
}
#[derive(Debug, Default, Clone)]
pub struct QueryResult {
pub resolved: usize,
pub deleted: usize,
pub absent: usize,
}
#[derive(Debug, Clone)]
pub struct MemoryRun {
pub(super) entries: Vec<OutputKV>,
pub(super) height_range: (i32, i32),
pub(super) directory: Directory,
pub(super) filter: BloomFilter,
pub(super) is_mutable: bool,
}
impl MemoryRun {
pub fn build(mut entries: Vec<OutputKV>) -> Self {
entries.sort_unstable();
let height_range = height_range_of(&entries);
let directory = Directory::build(&entries);
let filter = BloomFilter::build(&entries);
Self {
entries,
height_range,
directory,
filter,
is_mutable: false,
}
}
pub fn build_presorted(entries: Vec<OutputKV>) -> Self {
debug_assert!(
entries.windows(2).all(|w| w[0] <= w[1]),
"build_presorted: entries must be sorted"
);
let height_range = height_range_of(&entries);
let directory = Directory::build(&entries);
let filter = BloomFilter::build(&entries);
Self {
entries,
height_range,
directory,
filter,
is_mutable: false,
}
}
pub fn new_mutable() -> Self {
Self {
entries: Vec::new(),
height_range: (i32::MAX, i32::MIN),
directory: Directory::build(&[]),
filter: BloomFilter::build(&[]),
is_mutable: true,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn height_range(&self) -> (i32, i32) {
self.height_range
}
pub fn append_and_rebuild(&mut self, new_entries: &[OutputKV]) {
debug_assert!(self.is_mutable, "cannot append to frozen run");
self.entries.extend_from_slice(new_entries);
self.entries.sort_unstable();
self.height_range = height_range_of(&self.entries);
self.directory = Directory::build(&self.entries);
self.filter = BloomFilter::build(&self.entries);
}
pub fn freeze(&mut self) {
self.is_mutable = false;
}
#[inline]
pub fn lookup_key(&self, key: &[u8; 36], since: i32, before: i32) -> Option<OutputId> {
if self.height_range.1 < since || self.height_range.0 >= before {
return None;
}
if !self.filter.may_contain(key) {
return None;
}
let (lo, hi) = self.directory.lookup_range(key);
if lo >= hi {
return None;
}
let slice = &self.entries[lo..hi];
let pos = slice.partition_point(|e| e.key < *key);
let mut result: Option<OutputId> = None;
let mut i = pos;
while i < slice.len() {
let e = &slice[i];
if e.key != *key {
break;
}
if e.height < since || e.height >= before {
i += 1;
continue;
}
if e.is_add() {
let next = slice.get(i + 1);
if let Some(n) = next {
if n.key == *key && n.height == e.height && n.is_delete() {
i += 2; continue;
}
}
result = Some(e.id);
break;
} else if e.is_delete() {
result = Some(OUTPUT_ID_DELETED);
break;
}
i += 1;
}
result
}
pub fn batch_lookup(&self, keys: &[[u8; 36]], ids: &mut [OutputId], since: i32, before: i32) {
debug_assert_eq!(keys.len(), ids.len());
#[cfg(feature = "rayon")]
{
use rayon::prelude::*;
keys.par_iter()
.zip(ids.par_iter_mut())
.for_each(|(key, id)| {
if *id == OutputId::MAX {
if let Some(found) = self.lookup_key(key, since, before) {
*id = found;
}
}
});
}
#[cfg(not(feature = "rayon"))]
{
for (key, id) in keys.iter().zip(ids.iter_mut()) {
if *id == OutputId::MAX {
if let Some(found) = self.lookup_key(key, since, before) {
*id = found; }
}
}
}
}
pub fn merge(inputs: &[Arc<MemoryRun>]) -> Self {
use std::cmp::Reverse;
use std::collections::BinaryHeap;
if inputs.is_empty() {
return Self::build(vec![]);
}
let total: usize = inputs.iter().map(|r| r.entries.len()).sum();
let mut merged: Vec<OutputKV> = Vec::with_capacity(total);
#[derive(PartialEq, Eq)]
struct HeapItem {
entry: OutputKV,
run_idx: usize,
entry_idx: usize,
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.entry.cmp(&self.entry)
}
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
let mut heap = BinaryHeap::with_capacity(inputs.len());
for (ri, run) in inputs.iter().enumerate() {
if let Some(e) = run.entries.first() {
heap.push(HeapItem {
entry: *e,
run_idx: ri,
entry_idx: 0,
});
}
}
while let Some(HeapItem {
entry,
run_idx,
entry_idx,
}) = heap.pop()
{
let next_idx = entry_idx + 1;
if let Some(next) = inputs[run_idx].entries.get(next_idx) {
heap.push(HeapItem {
entry: *next,
run_idx,
entry_idx: next_idx,
});
}
merged.push(entry);
}
let mut gc: Vec<OutputKV> = Vec::with_capacity(merged.len());
let mut pending_del: Option<OutputKV> = None;
for e in merged {
if let Some(d) = pending_del {
if d.key != e.key {
gc.push(d); pending_del = None;
} else {
pending_del = Some(d);
}
}
if e.is_delete() {
if let Some(last) = gc.last() {
if last.key == e.key && last.height == e.height && last.is_add() {
gc.pop();
continue;
}
}
pending_del = Some(e);
} else {
if let Some(d) = pending_del.take() {
if d.key == e.key && d.height > e.height {
let fence = CHECKPOINT_GC_FENCE.load(Ordering::Acquire);
if d.height <= fence {
continue; }
gc.push(d);
gc.push(e);
continue;
}
gc.push(d);
gc.push(e);
} else {
gc.push(e); }
}
}
if let Some(d) = pending_del {
gc.push(d); }
Self::build_presorted(gc)
}
pub fn erase_since(&mut self, since: i32) {
debug_assert!(self.is_mutable, "erase_since on frozen run");
self.entries.retain(|e| e.height < since);
self.height_range = height_range_of(&self.entries);
self.directory = Directory::build(&self.entries);
self.filter = BloomFilter::build(&self.entries);
}
}
fn height_range_of(entries: &[OutputKV]) -> (i32, i32) {
if entries.is_empty() {
return (i32::MAX, i32::MIN);
}
let min = entries.iter().map(|e| e.height).min().unwrap_or(i32::MAX);
let max = entries.iter().map(|e| e.height).max().unwrap_or(i32::MIN);
(min, max)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_key(n: u8) -> [u8; 36] {
let mut k = [0u8; 36];
k[0] = n;
k
}
#[test]
fn test_bloom_no_false_negatives() {
let keys: Vec<[u8; 36]> = (0..100).map(make_key).collect();
let entries: Vec<OutputKV> = keys.iter().map(|k| OutputKV::new_add(*k, 1, 42)).collect();
let bloom = BloomFilter::build(&entries);
for k in &keys {
assert!(bloom.may_contain(k), "false negative for key {:?}", k[0]);
}
}
#[test]
fn test_bloom_fpr_under_2pct() {
let entries: Vec<OutputKV> = (0u32..1000)
.map(|i| {
let mut k = [0u8; 36];
k[..4].copy_from_slice(&i.to_be_bytes());
OutputKV::new_add(k, 1, i as u64)
})
.collect();
let bloom = BloomFilter::build(&entries);
let mut false_positives = 0usize;
for i in 1000u32..11000u32 {
let mut k = [0u8; 36];
k[..4].copy_from_slice(&i.to_be_bytes());
if bloom.may_contain(&k) {
false_positives += 1;
}
}
let fpr = false_positives as f64 / 10000.0;
assert!(fpr < 0.02, "FPR too high: {:.2}%", fpr * 100.0);
}
#[test]
fn test_directory_matches_linear_scan() {
let entries: Vec<OutputKV> = (0u32..500)
.map(|i| {
let mut k = [0u8; 36];
k[..4].copy_from_slice(&i.to_be_bytes());
OutputKV::new_add(k, 1, i as u64)
})
.collect();
let run = MemoryRun::build(entries);
for i in 0u32..500 {
let mut k = [0u8; 36];
k[..4].copy_from_slice(&i.to_be_bytes());
let (lo, hi) = run.directory.lookup_range(&k);
let found = run.entries[lo..hi].iter().any(|e| e.key == k);
assert!(found, "directory missed key {i}");
}
}
#[test]
fn test_lookup_key_basic() {
let k1 = make_key(1);
let k2 = make_key(2);
let entries = vec![
OutputKV::new_add(k1, 100, 42),
OutputKV::new_add(k2, 200, 99),
];
let run = MemoryRun::build(entries);
assert_eq!(run.lookup_key(&k1, 0, i32::MAX), Some(42));
assert_eq!(run.lookup_key(&k2, 0, i32::MAX), Some(99));
assert_eq!(run.lookup_key(&make_key(3), 0, i32::MAX), None);
}
#[test]
fn test_lookup_height_window() {
let k = make_key(1);
let entries = vec![OutputKV::new_add(k, 100, 42)];
let run = MemoryRun::build(entries);
assert_eq!(run.lookup_key(&k, 0, 101), Some(42));
assert_eq!(run.lookup_key(&k, 101, i32::MAX), None);
}
#[test]
fn test_delete_hides_add() {
let k = make_key(1);
let entries = vec![OutputKV::new_delete(k, 200), OutputKV::new_add(k, 100, 42)];
let run = MemoryRun::build(entries);
assert_eq!(run.lookup_key(&k, 0, i32::MAX), Some(OUTPUT_ID_DELETED));
}
#[test]
fn test_merge_cancellation_same_height() {
let k = make_key(1);
let run_a = Arc::new(MemoryRun::build(vec![OutputKV::new_add(k, 100, 42)]));
let run_b = Arc::new(MemoryRun::build(vec![OutputKV::new_delete(k, 100)]));
let merged = MemoryRun::merge(&[run_a, run_b]);
assert!(
merged.entries.is_empty(),
"same-height cancel failed: {:?}",
merged.entries.len()
);
}
#[test]
fn test_merge_cancellation_cross_height() {
let k = make_key(1);
let run_a = Arc::new(MemoryRun::build(vec![OutputKV::new_add(k, 100, 42)]));
let run_b = Arc::new(MemoryRun::build(vec![OutputKV::new_delete(k, 290_000)]));
let merged = MemoryRun::merge(&[run_a, run_b]);
assert!(
merged.entries.is_empty(),
"cross-height cancel failed: {:?}",
merged.entries.len()
);
}
#[test]
fn test_merge_cross_height_with_recreation() {
let k = make_key(1);
let run_a = Arc::new(MemoryRun::build(vec![
OutputKV::new_add(k, 100, 11),
OutputKV::new_delete(k, 200),
OutputKV::new_add(k, 300, 99),
]));
let merged = MemoryRun::merge(&[run_a]);
assert_eq!(
merged.entries.len(),
1,
"expected 1 live entry, got {:?}",
merged.entries
);
assert_eq!(merged.entries[0].height, 300);
assert_eq!(merged.entries[0].id, 99);
assert!(merged.entries[0].is_add());
}
#[test]
fn test_merge_dangling_delete_preserved() {
let k = make_key(1);
let run_a = Arc::new(MemoryRun::build(vec![OutputKV::new_delete(k, 200)]));
let merged = MemoryRun::merge(&[run_a]);
assert_eq!(merged.entries.len(), 1);
assert!(merged.entries[0].is_delete());
}
#[test]
fn test_erase_since() {
let k1 = make_key(1);
let k2 = make_key(2);
let mut run = MemoryRun::new_mutable();
run.append_and_rebuild(&[OutputKV::new_add(k1, 50, 1), OutputKV::new_add(k2, 100, 2)]);
run.erase_since(75);
assert_eq!(run.entries.len(), 1);
assert_eq!(run.entries[0].key, k1);
}
}