use crate::store::platform::fs::{read as fs_read, write_file_atomically_with_fs, StoreFs};
use crate::store::{EncodedBytes, ExtensionKey, StoreError};
use dashmap::DashMap;
use std::collections::BTreeMap;
use std::path::Path;
use super::entry::{DiskPos, IndexEntry};
use crate::event::EventKind;
pub(crate) const IDEMP_MAGIC: &[u8; 6] = b"FBATID";
pub(crate) const IDEMP_VERSION: u16 = 1;
pub(crate) const IDEMP_FILENAME: &str = "index.idemp";
const HEADER_LEN: usize = 6 + 2 + 4;
pub(crate) const DEFAULT_KEEP_SEQUENCES: u64 = 16 * 1024 * 1024;
pub(crate) const DEFAULT_MAX_KEYS: u64 = 64 * 1024 * 1024;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum IdempotencyRetention {
Unbounded,
Window {
keep_sequences: u64,
},
Hybrid {
keep_sequences: u64,
max_keys: u64,
},
}
impl Default for IdempotencyRetention {
fn default() -> Self {
Self::Hybrid {
keep_sequences: DEFAULT_KEEP_SEQUENCES,
max_keys: DEFAULT_MAX_KEYS,
}
}
}
impl IdempotencyRetention {
pub(crate) fn keep_sequences(self) -> Option<u64> {
match self {
Self::Unbounded => None,
Self::Window { keep_sequences } | Self::Hybrid { keep_sequences, .. } => {
Some(keep_sequences)
}
}
}
pub(crate) fn max_keys(self) -> Option<u64> {
match self {
Self::Unbounded | Self::Window { .. } => None,
Self::Hybrid { max_keys, .. } => Some(max_keys),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum OverflowPolicy {
#[default]
Warn,
FailClosed,
Backpressure,
}
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub(crate) struct IdempEntry {
pub(crate) key: u128,
pub(crate) event_id: u128,
pub(crate) global_sequence: u64,
pub(crate) disk_pos_segment: u64,
pub(crate) disk_pos_offset: u64,
pub(crate) disk_pos_length: u32,
pub(crate) content_hash: [u8; 32],
pub(crate) prev_hash: [u8; 32],
pub(crate) entity: String,
pub(crate) scope: String,
pub(crate) kind: EventKind,
pub(crate) recorded_global_sequence: u64,
#[serde(default)]
pub(crate) event_evicted: bool,
pub(crate) receipt_extensions: BTreeMap<ExtensionKey, EncodedBytes>,
}
impl IdempEntry {
pub(crate) fn from_index_entry(entry: &IndexEntry, recorded_global_sequence: u64) -> Self {
Self {
key: entry.event_id,
event_id: entry.event_id,
global_sequence: entry.global_sequence,
disk_pos_segment: entry.disk_pos.segment_id(),
disk_pos_offset: entry.disk_pos.offset(),
disk_pos_length: entry.disk_pos.length(),
content_hash: entry.hash_chain.event_hash,
prev_hash: entry.hash_chain.prev_hash,
entity: entry.coord.entity().to_owned(),
scope: entry.coord.scope().to_owned(),
kind: entry.kind,
recorded_global_sequence,
event_evicted: false,
receipt_extensions: entry.receipt_extensions.clone(),
}
}
pub(crate) fn disk_pos(&self) -> DiskPos {
DiskPos::new(
self.disk_pos_segment,
self.disk_pos_offset,
self.disk_pos_length,
)
}
#[cfg(test)]
pub(crate) fn is_event_evicted(&self) -> bool {
self.event_evicted
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) struct EvictionReport {
pub(crate) aged_out: u64,
pub(crate) cap_trimmed_out_of_window: u64,
pub(crate) within_window_exceeds_cap: bool,
pub(crate) remaining: u64,
}
pub(crate) struct IdempotencyStore {
map: DashMap<u128, IdempEntry>,
retention: IdempotencyRetention,
overflow: OverflowPolicy,
}
impl IdempotencyStore {
pub(crate) fn new(retention: IdempotencyRetention, overflow: OverflowPolicy) -> Self {
Self {
map: DashMap::new(),
retention,
overflow,
}
}
pub(crate) fn len(&self) -> usize {
self.map.len()
}
pub(crate) fn get(&self, key: u128) -> Option<IdempEntry> {
self.map.get(&key).map(|r| r.value().clone())
}
pub(crate) fn admit_new_key(&self, key: u128, frontier: u64) -> Result<(), StoreError> {
if self.map.contains_key(&key) {
return Ok(());
}
self.admit_unique_new_count(1, frontier)
}
pub(crate) fn admit_new_keys(
&self,
keys: impl Iterator<Item = u128>,
frontier: u64,
) -> Result<(), StoreError> {
let mut seen_new: std::collections::HashSet<u128> = std::collections::HashSet::new();
let mut unique_new: u64 = 0;
for key in keys {
if self.map.contains_key(&key) {
continue;
}
if !seen_new.insert(key) {
return Err(StoreError::IdempotencyPartialBatch {
reason: "duplicate idempotency key in batch".into(),
});
}
unique_new = unique_new.saturating_add(1);
}
self.admit_unique_new_count(unique_new, frontier)
}
fn admit_unique_new_count(&self, unique_new: u64, frontier: u64) -> Result<(), StoreError> {
let Some(max_keys) = self.retention.max_keys() else {
return Ok(());
};
if unique_new == 0 {
return Ok(());
}
let mut len = self.map.len() as u64;
if len.saturating_add(unique_new) <= max_keys {
return Ok(());
}
self.evict(frontier);
len = self.map.len() as u64;
if len.saturating_add(unique_new) <= max_keys {
return Ok(());
}
match self.overflow {
OverflowPolicy::Warn => Ok(()),
OverflowPolicy::FailClosed | OverflowPolicy::Backpressure => {
let backpressure_note = matches!(self.overflow, OverflowPolicy::Backpressure);
tracing::warn!(
target: "batpak::idemp",
len,
max_keys,
unique_new,
backpressure_note,
"durable idempotency store at soft cap; refusing new keyed append(s) (fail-closed)"
);
Err(StoreError::IdempotencyOverflowFailClosed { len, max_keys })
}
}
}
pub(crate) fn record(&self, entry: IdempEntry) {
self.map.insert(entry.key, entry);
}
pub(crate) fn mark_evicted(&self, is_live: impl Fn(u128) -> bool) {
for mut entry in self.map.iter_mut() {
if !entry.event_evicted && !is_live(entry.event_id) {
entry.event_evicted = true;
}
}
}
pub(crate) fn restore(&self, entries: Vec<IdempEntry>) {
self.map.clear();
for entry in entries {
self.map.insert(entry.key, entry);
}
}
pub(crate) fn snapshot(&self) -> Vec<IdempEntry> {
self.map.iter().map(|r| r.value().clone()).collect()
}
pub(crate) fn evict(&self, frontier: u64) -> EvictionReport {
let mut report = EvictionReport::default();
let window_floor = match self.retention.keep_sequences() {
None => {
report.remaining = self.map.len() as u64;
return report;
}
Some(keep) => frontier.saturating_sub(keep),
};
let aged: Vec<u128> = self
.map
.iter()
.filter(|r| r.value().recorded_global_sequence < window_floor)
.map(|r| *r.key())
.collect();
for key in &aged {
self.map.remove(key);
}
report.aged_out = aged.len() as u64;
if let Some(max_keys) = self.retention.max_keys() {
let len = self.map.len() as u64;
if len > max_keys {
let within_window = self
.map
.iter()
.filter(|r| r.value().recorded_global_sequence >= window_floor)
.count() as u64;
if within_window >= max_keys {
report.within_window_exceeds_cap = true;
tracing::warn!(
target: "batpak::idemp",
len,
max_keys,
within_window,
"durable idempotency store exceeds soft cap from within-window keys \
alone (key-rate spike); window wins, correctness preserved, store \
temporarily over cap (bounded by rate x window)"
);
} else {
let trim_target = len.saturating_sub(max_keys);
let out_of_window: Vec<u128> = self
.map
.iter()
.filter(|r| r.value().recorded_global_sequence < window_floor)
.map(|r| *r.key())
.take(usize::try_from(trim_target).unwrap_or(usize::MAX))
.collect();
for key in &out_of_window {
self.map.remove(key);
}
report.cap_trimmed_out_of_window = out_of_window.len() as u64;
}
}
}
report.remaining = self.map.len() as u64;
report
}
pub(crate) fn flush(&self, data_dir: &Path, fs: &dyn StoreFs) -> Result<(), StoreError> {
let entries = self.snapshot();
let body = crate::encoding::to_bytes(&entries)
.map_err(|error| StoreError::ser_msg(&format!("encode idemp store: {error}")))?;
let crc = crc32fast::hash(&body);
let final_path = data_dir.join(IDEMP_FILENAME);
write_file_atomically_with_fs(
data_dir,
&final_path,
"idempotency-store",
|file| {
use std::io::Write;
file.write_all(IDEMP_MAGIC).map_err(StoreError::Io)?;
file.write_all(&IDEMP_VERSION.to_le_bytes())
.map_err(StoreError::Io)?;
file.write_all(&crc.to_le_bytes()).map_err(StoreError::Io)?;
file.write_all(&body).map_err(StoreError::Io)?;
Ok(())
},
fs,
)?;
tracing::debug!(
target: "batpak::idemp",
count = entries.len(),
"flushed durable idempotency store"
);
Ok(())
}
}
pub(crate) enum IdempLoad {
Loaded(Vec<IdempEntry>),
Missing,
Invalid {
reason: String,
},
}
pub(crate) fn read_idemp_file(data_dir: &Path) -> Result<IdempLoad, StoreError> {
let path = data_dir.join(IDEMP_FILENAME);
let raw = match fs_read(&path) {
Ok(bytes) => bytes,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
return Ok(IdempLoad::Missing)
}
Err(error) => {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
error = %error,
"failed to read idempotency store file"
);
return Ok(IdempLoad::Invalid {
reason: format!("read failed: {error}"),
});
}
};
if raw.len() < HEADER_LEN {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
len = raw.len(),
"idempotency store file too short for a valid header — ignoring"
);
return Ok(IdempLoad::Invalid {
reason: format!("file too short: {} bytes", raw.len()),
});
}
if &raw[..6] != IDEMP_MAGIC.as_ref() {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
"idempotency store file has wrong magic bytes — ignoring"
);
return Ok(IdempLoad::Invalid {
reason: "wrong magic bytes".to_owned(),
});
}
let version = u16::from_le_bytes([raw[6], raw[7]]);
if version > IDEMP_VERSION {
return Err(StoreError::IdempotencyFutureVersion {
stored: version,
current: IDEMP_VERSION,
});
}
if version != IDEMP_VERSION {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
version,
current = IDEMP_VERSION,
"idempotency store file declares an unsupported version — ignoring"
);
return Ok(IdempLoad::Invalid {
reason: format!("unsupported version: {version}"),
});
}
let stored_crc = u32::from_le_bytes([raw[8], raw[9], raw[10], raw[11]]);
let body = &raw[HEADER_LEN..];
let computed_crc = crc32fast::hash(body);
if stored_crc != computed_crc {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
stored = stored_crc,
computed = computed_crc,
"idempotency store CRC mismatch — file is corrupt, ignoring"
);
return Ok(IdempLoad::Invalid {
reason: format!("crc mismatch: stored {stored_crc}, computed {computed_crc}"),
});
}
match crate::encoding::from_bytes::<Vec<IdempEntry>>(body) {
Ok(entries) => Ok(IdempLoad::Loaded(entries)),
Err(error) => {
tracing::warn!(
target: "batpak::idemp",
path = %path.display(),
error = %error,
"idempotency store body failed to decode — ignoring"
);
Ok(IdempLoad::Invalid {
reason: format!("decode failed: {error}"),
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn entry(key: u128, recorded_global_sequence: u64) -> IdempEntry {
IdempEntry {
key,
event_id: key,
global_sequence: recorded_global_sequence,
disk_pos_segment: 0,
disk_pos_offset: 0,
disk_pos_length: 0,
content_hash: [0u8; 32],
prev_hash: [0u8; 32],
entity: "e".to_owned(),
scope: "s".to_owned(),
kind: EventKind::custom(0xB, 1),
recorded_global_sequence,
event_evicted: false,
receipt_extensions: BTreeMap::new(),
}
}
#[test]
fn cap_never_evicts_within_window_keys_even_under_residual_pigeonhole() {
let store = IdempotencyStore::new(
IdempotencyRetention::Hybrid {
keep_sequences: 100,
max_keys: 3,
},
OverflowPolicy::Warn,
);
for i in 0..10u128 {
let seq = u64::try_from(i).expect("loop index 0..10 fits u64");
store.record(entry(i, 90 + seq));
}
let report = store.evict(100);
assert!(report.within_window_exceeds_cap, "residual pigeonhole");
assert_eq!(report.aged_out, 0);
assert_eq!(report.cap_trimmed_out_of_window, 0);
assert_eq!(report.remaining, 10, "all within-window keys survive");
assert!((0..10u128).all(|i| store.get(i).is_some()));
}
#[test]
fn window_aging_trims_only_out_of_window_keys() {
let store = IdempotencyStore::new(
IdempotencyRetention::Window { keep_sequences: 10 },
OverflowPolicy::Warn,
);
for i in 0..5u128 {
store.record(entry(
i,
u64::try_from(i).expect("loop index 0..5 fits u64"),
));
}
for i in 95..100u128 {
store.record(entry(
i,
u64::try_from(i).expect("loop index 95..100 fits u64"),
));
}
let report = store.evict(100);
assert_eq!(report.aged_out, 5, "five out-of-window keys aged out");
assert_eq!(report.remaining, 5, "five within-window keys remain");
for i in 0..5u128 {
assert!(store.get(i).is_none(), "aged-out key {i} is gone");
}
assert!((95..100u128).all(|i| store.get(i).is_some()), "kept");
}
#[test]
fn unbounded_never_evicts() {
let store = IdempotencyStore::new(IdempotencyRetention::Unbounded, OverflowPolicy::Warn);
for i in 0..50u128 {
store.record(entry(
i,
u64::try_from(i).expect("loop index 0..50 fits u64"),
));
}
let report = store.evict(1_000_000);
assert_eq!(report.aged_out, 0);
assert_eq!(report.cap_trimmed_out_of_window, 0);
assert_eq!(report.remaining, 50);
}
fn fail_closed(keep_sequences: u64, max_keys: u64) -> IdempotencyStore {
IdempotencyStore::new(
IdempotencyRetention::Hybrid {
keep_sequences,
max_keys,
},
OverflowPolicy::FailClosed,
)
}
#[test]
fn admit_new_key_fail_closed_refuses_only_new_keys_over_cap() {
let store = fail_closed(1000, 2);
store.record(entry(1, 1));
store.record(entry(2, 2));
assert!(store.admit_new_key(1, 2).is_ok());
assert!(matches!(
store.admit_new_key(99, 2),
Err(StoreError::IdempotencyOverflowFailClosed { .. })
));
}
#[test]
fn admit_new_key_ages_out_of_window_before_fail_closing() {
let store = fail_closed(10, 2);
store.record(entry(1, 1));
store.record(entry(2, 2));
assert!(
store.admit_new_key(99, 1000).is_ok(),
"PROPERTY: out-of-window keys must age out before a fresh key is refused"
);
assert_eq!(store.len(), 0, "stale keys were aged out by admission");
}
#[test]
fn admit_new_keys_validates_the_batch_as_a_unit() {
let dup_store = fail_closed(1000, 1000);
let dup_err = dup_store
.admit_new_keys([7u128, 7u128].into_iter(), 0)
.expect_err("PROPERTY: two identical new keys in a batch must be rejected");
assert!(
matches!(dup_err, StoreError::IdempotencyPartialBatch { .. }),
"duplicate new key must surface IdempotencyPartialBatch, got {dup_err:?}"
);
let store = fail_closed(1000, 3);
store.record(entry(1, 1));
store.record(entry(2, 2));
let err = store
.admit_new_keys([10u128, 11u128].into_iter(), 2)
.expect_err("PROPERTY: a unique-new batch exceeding the cap must be rejected");
assert!(
matches!(err, StoreError::IdempotencyOverflowFailClosed { .. }),
"over-cap unique batch must surface IdempotencyOverflowFailClosed, got {err:?}"
);
assert!(store.admit_new_keys([10u128].into_iter(), 2).is_ok());
}
#[test]
fn mark_evicted_flags_only_dropped_events_and_preserves_tuple() {
let store = IdempotencyStore::new(IdempotencyRetention::Unbounded, OverflowPolicy::Warn);
let mut live = entry(1, 1);
live.disk_pos_segment = 7;
live.global_sequence = 1;
let mut dropped = entry(2, 2);
dropped.disk_pos_segment = 9;
dropped.global_sequence = 2;
dropped.content_hash = [0xAB; 32];
store.record(live);
store.record(dropped);
store.mark_evicted(|event_id| event_id == 1);
let one = store.get(1).expect("event 1 was recorded and is live");
assert!(!one.is_event_evicted() && !one.event_evicted);
assert_eq!(one.disk_pos_segment, 7);
let two = store.get(2).expect("event 2 was recorded");
assert!(two.is_event_evicted() && two.event_evicted);
assert_eq!(two.disk_pos_segment, 9, "disk_pos unchanged after eviction");
assert_eq!(two.global_sequence, 2, "sequence preserved");
assert_eq!(two.content_hash, [0xAB; 32], "content hash preserved");
}
#[test]
fn flush_then_read_roundtrips() {
let dir = tempfile::tempdir().expect("create temp dir");
let store = IdempotencyStore::new(IdempotencyRetention::default(), OverflowPolicy::Warn);
store.record(entry(7, 7));
store.record(entry(8, 8));
store
.flush(dir.path(), &crate::store::platform::fs::RealFs)
.expect("flush idempotency store to disk");
let loaded = read_idemp_file(dir.path()).expect("read back the flushed idempotency file");
assert!(matches!(&loaded, IdempLoad::Loaded(e) if e.len() == 2));
}
#[test]
fn future_version_header_constants_are_stable() {
assert_eq!(IDEMP_MAGIC, b"FBATID");
assert_eq!(IDEMP_VERSION, 1);
}
}