use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use crate::state::CommittedRec;
#[derive(Clone, serde::Serialize, serde::Deserialize)]
pub struct Bloom {
bits: Vec<u64>,
m: usize,
}
impl Default for Bloom {
fn default() -> Self {
Self::new(1)
}
}
const BLOOM_K: usize = 4;
const BLOOM_SEED1: u64 = 0xDEAD_BEEF_CAFE_BABEu64;
const BLOOM_SEED2: u64 = 0x1337_C0DE_F00D_FACEu64;
impl Bloom {
pub fn new(expected_n: usize) -> Self {
let n = expected_n.max(1);
let m_bits = 10 * n;
let words = m_bits.div_ceil(64);
Self {
bits: vec![0u64; words],
m: words * 64,
}
}
fn hashes(&self, key: &str) -> [usize; BLOOM_K] {
use std::hash::{Hash, Hasher};
let h1: u64 = {
let mut h = std::collections::hash_map::DefaultHasher::new();
BLOOM_SEED1.hash(&mut h);
key.hash(&mut h);
h.finish()
};
let h2: u64 = {
let mut h = std::collections::hash_map::DefaultHasher::new();
BLOOM_SEED2.hash(&mut h);
key.hash(&mut h);
h.finish()
};
let m = self.m as u64;
std::array::from_fn(|i| (h1.wrapping_add((i as u64).wrapping_mul(h2)) % m) as usize)
}
pub fn insert(&mut self, key: &str) {
for bit in self.hashes(key) {
self.bits[bit / 64] |= 1u64 << (bit % 64);
}
}
pub fn might_contain(&self, key: &str) -> bool {
self.hashes(key)
.iter()
.all(|&bit| (self.bits[bit / 64] >> (bit % 64)) & 1 == 1)
}
}
const RUN_CRC_LEN: usize = 4;
#[derive(Debug, Clone)]
pub struct RunMeta {
pub path: PathBuf,
pub key_range: (String, String),
pub len: usize,
}
fn encode_run(pairs: &[(String, CommittedRec)]) -> std::io::Result<Vec<u8>> {
let payload = serde_json::to_vec(pairs)
.map_err(|e| std::io::Error::other(format!("run serialize: {e}")))?;
let crc = crc32fast::hash(&payload);
let mut out = Vec::with_capacity(RUN_CRC_LEN + payload.len());
out.extend_from_slice(&crc.to_le_bytes());
out.extend_from_slice(&payload);
Ok(out)
}
fn decode_run(bytes: &[u8]) -> Result<Vec<(String, CommittedRec)>, String> {
let Some((crc_bytes, payload)) = bytes.split_first_chunk::<RUN_CRC_LEN>() else {
return Err(format!("run too short: {} bytes", bytes.len()));
};
let stored = u32::from_le_bytes(*crc_bytes);
let actual = crc32fast::hash(payload);
if stored != actual {
return Err(format!(
"CRC mismatch: stored={stored:#010x} actual={actual:#010x}"
));
}
serde_json::from_slice(payload).map_err(|e| format!("run parse: {e}"))
}
fn run_name(n: usize) -> String {
format!("idem-{:06}.run", n)
}
fn fsync_dir(dir: &Path) -> std::io::Result<()> {
std::fs::File::open(dir)?.sync_all()
}
async fn write_run_atomic(path: &Path, bytes: &[u8]) -> std::io::Result<()> {
let tmp = path.with_extension("run.tmp");
tokio::fs::write(&tmp, bytes).await?;
{
let f = std::fs::OpenOptions::new()
.write(true)
.open(&tmp)
.map_err(|e| std::io::Error::other(format!("open tmp for sync: {e}")))?;
f.sync_all()?;
}
tokio::fs::rename(&tmp, path).await?;
let dir_owned = path.parent().unwrap_or(Path::new(".")).to_path_buf();
tokio::task::spawn_blocking(move || fsync_dir(&dir_owned))
.await
.map_err(|e| std::io::Error::other(format!("spawn_blocking join: {e}")))??;
Ok(())
}
#[derive(serde::Serialize, serde::Deserialize)]
pub struct TieredIdem {
pub hot: HashMap<String, CommittedRec>,
pub order: VecDeque<String>,
pub cap: usize,
#[serde(skip)]
pub runs_dir: PathBuf,
#[serde(skip)]
pub runs: Vec<RunMeta>,
#[serde(skip)]
pub bloom: Bloom,
}
impl std::fmt::Debug for TieredIdem {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TieredIdem")
.field("hot_len", &self.hot.len())
.field("cap", &self.cap)
.field("runs", &self.runs.len())
.finish()
}
}
impl Clone for TieredIdem {
fn clone(&self) -> Self {
Self {
hot: self.hot.clone(),
order: self.order.clone(),
cap: self.cap,
runs_dir: self.runs_dir.clone(),
runs: self.runs.clone(),
bloom: self.bloom.clone(),
}
}
}
impl Default for TieredIdem {
fn default() -> Self {
Self::with_cap(DEFAULT_IDEM_HOT_CAP)
}
}
pub const DEFAULT_IDEM_HOT_CAP: usize = 1_000_000;
impl TieredIdem {
pub fn with_cap(cap: usize) -> Self {
Self {
hot: HashMap::new(),
order: VecDeque::new(),
cap,
runs_dir: PathBuf::new(),
runs: Vec::new(),
bloom: Bloom::new(cap),
}
}
pub async fn attach_dir<F, Fut>(&mut self, dir: &Path, rebuild_fn: F) -> std::io::Result<()>
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = std::io::Result<Vec<(String, CommittedRec)>>>,
{
self.runs_dir = dir.to_path_buf();
let mut run_paths: Vec<(usize, PathBuf)> = Vec::new();
let mut rd = tokio::fs::read_dir(dir).await?;
while let Some(entry) = rd.next_entry().await? {
let name = entry
.file_name()
.into_string()
.map_err(|_| std::io::Error::other("non-UTF-8 run filename"))?;
if let Some(n) = parse_run_name(&name) {
run_paths.push((n, entry.path()));
}
}
run_paths.sort_by_key(|&(n, _)| n);
let mut all_valid = true;
let mut metas: Vec<RunMeta> = Vec::new();
for (_, path) in &run_paths {
match tokio::fs::read(path).await {
Ok(bytes) => match decode_run(&bytes) {
Ok(pairs) if !pairs.is_empty() => {
let min_key = pairs[0].0.clone();
let max_key = pairs[pairs.len() - 1].0.clone();
metas.push(RunMeta {
path: path.clone(),
key_range: (min_key, max_key),
len: pairs.len(),
});
}
Ok(_) => {
}
Err(e) => {
tracing::warn!(?path, %e, "idem run CRC/parse failure — will rebuild");
all_valid = false;
break;
}
},
Err(e) => {
tracing::warn!(?path, %e, "idem run read failure — will rebuild");
all_valid = false;
break;
}
}
}
if !all_valid || run_paths.is_empty() {
tracing::info!(dir = %dir.display(), "rebuilding idem run files from log scan");
let pairs = rebuild_fn().await?;
let spill_pairs: Vec<(String, CommittedRec)> = pairs
.into_iter()
.filter(|(k, _)| !self.hot.contains_key(k))
.collect();
for (_, path) in &run_paths {
let _ = tokio::fs::remove_file(path).await;
}
metas.clear();
if !spill_pairs.is_empty() {
let mut sorted = spill_pairs;
sorted.sort_by(|a, b| a.0.cmp(&b.0));
let n = next_run_n(&metas);
let run_path = dir.join(run_name(n));
match encode_run(&sorted) {
Ok(bytes) => match write_run_atomic(&run_path, &bytes).await {
Ok(()) => {
let min_key = sorted[0].0.clone();
let max_key = sorted[sorted.len() - 1].0.clone();
metas.push(RunMeta {
path: run_path,
key_range: (min_key, max_key),
len: sorted.len(),
});
}
Err(e) => {
tracing::error!(%e, "failed to write rebuilt idem run");
}
},
Err(e) => {
tracing::error!(%e, "failed to encode rebuilt idem run");
}
}
}
}
let mut bloom = Bloom::new(self.cap);
for meta in &metas {
match tokio::fs::read(&meta.path).await {
Ok(bytes) => match decode_run(&bytes) {
Ok(pairs) => {
for (key, _) in &pairs {
bloom.insert(key);
}
}
Err(e) => {
tracing::warn!(path = %meta.path.display(), %e, "idem run decode during bloom rebuild — bloom may be incomplete");
}
},
Err(e) => {
tracing::warn!(path = %meta.path.display(), %e, "idem run read during bloom rebuild — bloom may be incomplete");
}
}
}
self.bloom = bloom;
self.runs = metas;
Ok(())
}
pub fn insert(&mut self, key: String, rec: CommittedRec) {
if !self.hot.contains_key(&key) {
self.order.push_back(key.clone());
}
self.hot.insert(key, rec);
}
pub fn get_hot(&self, key: &str) -> Option<&CommittedRec> {
self.hot.get(key)
}
pub fn bloom_might_contain(&self, key: &str) -> bool {
self.bloom.might_contain(key)
}
pub async fn lookup_runs(&self, key: &str) -> Option<CommittedRec> {
#[cfg(test)]
test_hooks::RUN_READ_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
for meta in self.runs.iter().rev() {
if key < meta.key_range.0.as_str() || key > meta.key_range.1.as_str() {
continue;
}
let path = meta.path.clone();
let key_owned = key.to_string();
let result =
tokio::task::spawn_blocking(move || -> Result<Option<CommittedRec>, String> {
let bytes = std::fs::read(&path).map_err(|e| format!("read: {e}"))?;
let pairs = decode_run(&bytes)?;
match pairs.binary_search_by(|(k, _)| k.as_str().cmp(key_owned.as_str())) {
Ok(idx) => Ok(Some(pairs[idx].1.clone())),
Err(_) => Ok(None),
}
})
.await;
match result {
Ok(Ok(Some(rec))) => return Some(rec),
Ok(Ok(None)) => {} Ok(Err(e)) => {
tracing::warn!(key = %key, error = %e, "idem run lookup error — treating as miss");
}
Err(e) => {
tracing::warn!(key = %key, error = %e, "idem run spawn_blocking panicked — treating as miss");
}
}
}
None
}
pub async fn get(&self, key: &str) -> Option<CommittedRec> {
if let Some(rec) = self.hot.get(key) {
return Some(rec.clone());
}
if !self.bloom.might_contain(key) {
return None;
}
self.lookup_runs(key).await
}
pub fn needs_flush(&self) -> bool {
self.hot.len() > self.cap
}
pub async fn flush_spill(&mut self) {
let drain_n = self.hot.len() / 2;
if drain_n == 0 {
return;
}
let mut spilled: Vec<(String, CommittedRec)> = Vec::with_capacity(drain_n);
for _ in 0..drain_n {
if let Some(key) = self.order.pop_front()
&& let Some(rec) = self.hot.remove(&key)
{
spilled.push((key, rec));
}
}
if spilled.is_empty() {
return;
}
spilled.sort_by(|a, b| a.0.cmp(&b.0));
let n = next_run_n(&self.runs);
let run_path = self.runs_dir.join(run_name(n));
let bytes = match encode_run(&spilled) {
Ok(b) => b,
Err(e) => {
tracing::error!(%e, "failed to encode idem spill run — keeping keys in hot");
for (k, v) in spilled {
self.order.push_front(k.clone());
self.hot.insert(k, v);
}
return;
}
};
if let Err(e) = write_run_atomic(&run_path, &bytes).await {
tracing::error!(%e, "failed to write idem spill run — keeping keys in hot");
for (k, v) in spilled.into_iter().rev() {
self.order.push_front(k.clone());
self.hot.insert(k, v);
}
return;
}
let min_key = spilled[0].0.clone();
let max_key = spilled[spilled.len() - 1].0.clone();
self.runs.push(RunMeta {
path: run_path,
key_range: (min_key, max_key),
len: spilled.len(),
});
for (key, _) in &spilled {
self.bloom.insert(key);
}
if self.runs.len() > 8 {
self.merge_runs().await;
}
}
pub async fn merge_runs(&mut self) {
if self.runs.len() <= 1 {
return;
}
let mut merged: Vec<(String, CommittedRec)> = Vec::new();
let mut ok = true;
for meta in &self.runs {
match tokio::fs::read(&meta.path).await {
Ok(bytes) => match decode_run(&bytes) {
Ok(pairs) => {
merged.extend(pairs);
}
Err(e) => {
tracing::error!(path = %meta.path.display(), %e, "idem merge: run decode failed");
ok = false;
break;
}
},
Err(e) => {
tracing::error!(path = %meta.path.display(), %e, "idem merge: run read failed");
ok = false;
break;
}
}
}
if !ok {
return;
}
merged.sort_by(|a, b| a.0.cmp(&b.0));
merged.dedup_by(|a, b| {
a.0 == b.0
});
let merged_n = next_run_n(&self.runs);
let merged_path = self.runs_dir.join(run_name(merged_n));
let bytes = match encode_run(&merged) {
Ok(b) => b,
Err(e) => {
tracing::error!(%e, "idem merge: encode failed");
return;
}
};
if let Err(e) = write_run_atomic(&merged_path, &bytes).await {
tracing::error!(%e, "idem merge: write failed");
return;
}
for meta in &self.runs {
if let Err(e) = tokio::fs::remove_file(&meta.path).await {
tracing::warn!(path = %meta.path.display(), %e, "idem merge: failed to remove old run");
}
}
let min_key = if merged.is_empty() {
String::new()
} else {
merged[0].0.clone()
};
let max_key = if merged.is_empty() {
String::new()
} else {
merged[merged.len() - 1].0.clone()
};
self.runs = if merged.is_empty() {
vec![]
} else {
vec![RunMeta {
path: merged_path,
key_range: (min_key, max_key),
len: merged.len(),
}]
};
let mut bloom = Bloom::new(merged.len().max(self.cap));
for (key, _) in &merged {
bloom.insert(key);
}
self.bloom = bloom;
}
pub fn run_count(&self) -> usize {
self.runs.len()
}
pub fn hot_len(&self) -> usize {
self.hot.len()
}
}
fn parse_run_name(name: &str) -> Option<usize> {
let s = name.strip_prefix("idem-")?.strip_suffix(".run")?;
s.parse().ok()
}
fn next_run_n(existing: &[RunMeta]) -> usize {
existing
.iter()
.filter_map(|m| {
m.path
.file_name()
.and_then(|n| n.to_str())
.and_then(parse_run_name)
})
.max()
.map(|n| n + 1)
.unwrap_or(0)
}
#[cfg(test)]
pub mod test_hooks {
use std::sync::atomic::{AtomicUsize, Ordering};
pub static RUN_READ_COUNT: AtomicUsize = AtomicUsize::new(0);
pub fn reset() {
RUN_READ_COUNT.store(0, Ordering::SeqCst);
}
pub fn count() -> usize {
RUN_READ_COUNT.load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use talea_core::types::TxId;
use uuid::Uuid;
fn make_rec(seq: i64) -> CommittedRec {
CommittedRec {
txid: TxId(Uuid::now_v7()),
seq,
at: Utc::now(),
}
}
async fn make_tiered(dir: &Path, cap: usize) -> TieredIdem {
let mut t = TieredIdem::with_cap(cap);
t.attach_dir(dir, || async { Ok(vec![]) }).await.unwrap();
t
}
#[test]
fn bloom_no_false_negatives() {
let mut b = Bloom::new(1000);
for i in 0..500 {
b.insert(&format!("key-{i}"));
}
for i in 0..500 {
assert!(b.might_contain(&format!("key-{i}")));
}
}
#[test]
fn bloom_mostly_no_false_positives_for_unseen_keys() {
let mut b = Bloom::new(10000);
for i in 0..5000 {
b.insert(&format!("present-{i}"));
}
let mut fp = 0usize;
for i in 0..5000 {
if b.might_contain(&format!("absent-{i}")) {
fp += 1;
}
}
assert!(fp < 100, "too many false positives: {fp}");
}
#[tokio::test]
async fn hot_lookup_hits() {
let dir = tempfile::tempdir().unwrap();
let mut t = make_tiered(dir.path(), 100).await;
let rec = make_rec(1);
t.insert("foo".into(), rec.clone());
assert_eq!(t.get("foo").await, Some(rec));
}
#[tokio::test]
async fn hot_miss_returns_none() {
let dir = tempfile::tempdir().unwrap();
let t = make_tiered(dir.path(), 100).await;
assert_eq!(t.get("nope").await, None);
}
#[tokio::test]
async fn spilled_key_still_dedups_with_original_committed() {
let dir = tempfile::tempdir().unwrap();
let cap = 4usize;
let mut t = make_tiered(dir.path(), cap).await;
let mut recs: Vec<(String, CommittedRec)> = Vec::new();
for i in 0..10 {
let key = format!("idem-key-{i:02}");
let rec = make_rec((i + 1) as i64);
t.insert(key.clone(), rec.clone());
recs.push((key, rec));
if t.needs_flush() {
t.flush_spill().await;
}
}
let (key0, rec0) = &recs[0];
assert!(
!t.hot.contains_key(key0.as_str()),
"key0 should have been spilled out of hot"
);
let found = t.get(key0).await;
assert_eq!(
found.as_ref().map(|r| r.seq),
Some(rec0.seq),
"spilled key must still be found via run lookup"
);
assert_eq!(
found.as_ref().map(|r| r.txid.clone()),
Some(rec0.txid.clone())
);
assert_eq!(found.as_ref().map(|r| r.at), Some(rec0.at));
}
#[tokio::test]
async fn bloom_negative_skips_disk() {
let dir = tempfile::tempdir().unwrap();
let cap = 4usize;
let mut t = make_tiered(dir.path(), cap).await;
let mut spilled_key = String::new();
for i in 0..8 {
let key = format!("spilled-{i}");
if i == 0 {
spilled_key = key.clone();
}
t.insert(key, make_rec(i as i64 + 1));
if t.needs_flush() {
t.flush_spill().await;
}
}
assert!(!t.runs.is_empty(), "must have at least one run");
let fresh_key = "absolutely-fresh-key-xyz";
assert!(
!t.bloom.might_contain(fresh_key),
"bloom must not contain a key that was never inserted"
);
test_hooks::reset();
let result = t.get(fresh_key).await;
assert_eq!(result, None);
assert_eq!(
test_hooks::count(),
0,
"bloom-negative get must not call lookup_runs (disk skip)"
);
assert!(
t.bloom.might_contain(&spilled_key),
"spilled_key must be in the bloom"
);
test_hooks::reset();
let _ = t.get(&spilled_key).await;
assert!(
test_hooks::count() > 0,
"bloom-positive get must call lookup_runs (counter should increment)"
);
}
#[tokio::test]
async fn runs_rebuild_from_log_when_deleted() {
let dir = tempfile::tempdir().unwrap();
let cap = 4usize;
let mut t = make_tiered(dir.path(), cap).await;
let mut recs: Vec<(String, CommittedRec)> = Vec::new();
for i in 0..10 {
let key = format!("rebuild-{i:02}");
let rec = make_rec((i + 1) as i64);
t.insert(key.clone(), rec.clone());
recs.push((key, rec));
if t.needs_flush() {
t.flush_spill().await;
}
}
let hot_keys: std::collections::HashSet<String> = t.hot.keys().cloned().collect();
let spilled_keys: Vec<&(String, CommittedRec)> =
recs.iter().filter(|(k, _)| !hot_keys.contains(k)).collect();
assert!(!spilled_keys.is_empty(), "some keys must have been spilled");
let entries: Vec<_> = std::fs::read_dir(dir.path())
.unwrap()
.filter_map(|e| e.ok())
.filter(|e| {
let name = e.file_name().into_string().unwrap_or_default();
name.ends_with(".run")
})
.collect();
for entry in entries {
std::fs::remove_file(entry.path()).unwrap();
}
let all_recs_clone = recs.clone();
let hot_snap = t.hot.clone();
let mut t2 = TieredIdem {
hot: hot_snap,
order: t.order.clone(),
cap,
runs_dir: PathBuf::new(),
runs: Vec::new(),
bloom: Bloom::new(1),
};
t2.attach_dir(dir.path(), || {
let all = all_recs_clone.clone();
async move { Ok(all) }
})
.await
.unwrap();
for (key, rec) in &recs {
let found = t2.get(key).await;
assert_eq!(
found.as_ref().map(|r| r.seq),
Some(rec.seq),
"key {key} must be found after rebuild; run files were deleted"
);
}
}
#[tokio::test]
async fn merge_compacts_runs() {
let dir = tempfile::tempdir().unwrap();
let cap = 2usize;
let mut t = make_tiered(dir.path(), cap).await;
let mut recs: Vec<(String, CommittedRec)> = Vec::new();
for i in 0..30 {
let key = format!("merge-{i:03}");
let rec = make_rec((i + 1) as i64);
t.insert(key.clone(), rec.clone());
recs.push((key, rec));
if t.needs_flush() {
let drain_n = t.hot.len() / 2;
let mut spilled: Vec<(String, CommittedRec)> = Vec::new();
for _ in 0..drain_n {
if let Some(key) = t.order.pop_front()
&& let Some(rec) = t.hot.remove(&key)
{
spilled.push((key, rec));
}
}
if !spilled.is_empty() {
spilled.sort_by(|a, b| a.0.cmp(&b.0));
let n = next_run_n(&t.runs);
let run_path = t.runs_dir.join(run_name(n));
let bytes = encode_run(&spilled).unwrap();
write_run_atomic(&run_path, &bytes).await.unwrap();
let min_key = spilled[0].0.clone();
let max_key = spilled[spilled.len() - 1].0.clone();
t.runs.push(RunMeta {
path: run_path,
key_range: (min_key, max_key),
len: spilled.len(),
});
for (key, _) in &spilled {
t.bloom.insert(key);
}
}
}
}
assert!(
t.runs.len() > 8,
"must have more than 8 runs before merge: {}",
t.runs.len()
);
t.merge_runs().await;
assert_eq!(t.runs.len(), 1, "after merge there must be exactly 1 run");
for (key, rec) in &recs {
let found = t.get(key).await;
assert_eq!(
found.as_ref().map(|r| r.seq),
Some(rec.seq),
"key {key} must still be found after merge"
);
}
}
#[tokio::test]
async fn second_merge_does_not_clobber_output() {
let dir = tempfile::tempdir().unwrap();
let cap = 2usize;
let mut t = make_tiered(dir.path(), cap).await;
let first_key = "first-key-ever";
let first_rec = make_rec(1);
t.insert(first_key.into(), first_rec.clone());
for seq in 2i64..42 {
let key = format!("bulk-{seq}");
t.insert(key, make_rec(seq));
if t.needs_flush() {
t.flush_spill().await;
}
}
assert!(
!t.runs.is_empty(),
"must have at least one run after sustained inserts"
);
for meta in &t.runs {
assert!(
meta.path.exists(),
"every live run file must exist on disk: {:?}",
meta.path
);
}
let found = t.get(first_key).await;
assert_eq!(
found.as_ref().map(|r| r.seq),
Some(first_rec.seq),
"key spilled before first merge must still resolve after subsequent merges"
);
}
#[tokio::test]
async fn bloom_rebuilt_from_runs_on_attach_never_stale() {
let dir = tempfile::tempdir().unwrap();
let cap = 4usize;
let mut t = make_tiered(dir.path(), cap).await;
let mut spilled_keys: Vec<String> = Vec::new();
for i in 0..10 {
let key = format!("stale-test-{i:02}");
t.insert(key.clone(), make_rec(i as i64 + 1));
spilled_keys.push(key);
if t.needs_flush() {
t.flush_spill().await;
}
}
let hot_keys: std::collections::HashSet<String> = t.hot.keys().cloned().collect();
let actually_spilled: Vec<&String> = spilled_keys
.iter()
.filter(|k| !hot_keys.contains(*k))
.collect();
assert!(
!actually_spilled.is_empty(),
"some keys must have been spilled to runs"
);
let bloom_artifact = dir.path().join("idem-bloom.json");
assert!(
!bloom_artifact.exists(),
"idem-bloom.json must not exist — bloom is never persisted"
);
let mut t2 = TieredIdem::with_cap(cap);
t2.attach_dir(dir.path(), || async { Ok(vec![]) })
.await
.unwrap();
for key in &actually_spilled {
assert!(
t2.bloom.might_contain(key),
"after fresh attach bloom must contain spilled key {key:?}"
);
}
}
}