use crate::cache::key::Fingerprint;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::fs;
use std::io::{self, Write as _};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub(crate) const SCHEMA_VERSION: u32 = 1;
pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("json: {0}")]
Json(#[from] serde_json::Error),
}
#[derive(Debug, Clone)]
pub struct CachedEntry {
pub payload: Vec<u8>,
pub objective: Option<f64>,
pub iteration: Option<u64>,
pub written_unix_secs: u64,
pub kind: EntryKind,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum EntryKind {
Checkpoint,
Final,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct OnDiskMeta {
schema_version: u32,
written_unix_secs: u64,
#[serde(default)]
written_nanos: u32,
objective: Option<f64>,
iteration: Option<u64>,
kind: EntryKind,
checksum_hex: String,
payload_bytes: u64,
}
#[derive(Debug, Clone)]
pub struct StoreOptions {
pub size_budget_bytes: u64,
pub ttl: Duration,
}
impl Default for StoreOptions {
fn default() -> Self {
Self {
size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
ttl: Duration::from_secs(DEFAULT_TTL_SECS),
}
}
}
#[derive(Debug)]
pub struct WarmStartStore {
root: PathBuf,
opts: StoreOptions,
byte_total: AtomicU64,
save_counter: AtomicU64,
}
impl Clone for WarmStartStore {
fn clone(&self) -> Self {
Self {
root: self.root.clone(),
opts: self.opts.clone(),
byte_total: AtomicU64::new(self.byte_total.load(Ordering::Relaxed)),
save_counter: AtomicU64::new(0),
}
}
}
impl WarmStartStore {
pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
fs::create_dir_all(&root)?;
Ok(Self {
root,
opts,
byte_total: AtomicU64::new(0),
save_counter: AtomicU64::new(0),
})
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn options(&self) -> &StoreOptions {
&self.opts
}
fn key_dir(&self, key: &Fingerprint) -> PathBuf {
self.root.join(key.to_hex())
}
pub fn lookup(&self, key: &Fingerprint) -> Result<Option<CachedEntry>, StoreError> {
self.lookup_with(key, LookupMode::Best)
}
pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<CachedEntry>, StoreError> {
self.lookup_with(key, LookupMode::Latest)
}
fn lookup_with(
&self,
key: &Fingerprint,
mode: LookupMode,
) -> Result<Option<CachedEntry>, StoreError> {
let dir = self.key_dir(key);
if !dir.exists() {
lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
return Ok(None);
}
let cache_key = LookupCacheKey { fp: *key, mode };
let now_nanos = nanos_now();
if let Some(hit) = lookup_cache_get(&cache_key) {
if let Ok(md) = fs::metadata(&hit.meta_path)
&& md.modified().ok() == Some(hit.meta_mtime)
{
let expired = self.opts.ttl.as_nanos() > 0
&& now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
if !expired {
return Ok(Some(hit.entry));
}
lookup_cache_invalidate(&cache_key);
let bin = hit.meta_path.with_extension("bin");
fs::remove_file(&hit.meta_path).ok();
fs::remove_file(&bin).ok();
return Ok(None);
}
lookup_cache_invalidate(&cache_key);
}
let mut best: Option<(OnDiskMeta, PathBuf)> = None;
for entry in fs::read_dir(&dir)? {
let entry = match entry {
Ok(e) => e,
Err(_) => continue,
};
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
if path
.file_name()
.and_then(|s| s.to_str())
.is_some_and(|n| n.contains(".tmp."))
{
continue;
}
let meta = match read_meta(&path) {
Ok(m) => m,
Err(_) => {
fs::remove_file(&path).ok();
continue;
}
};
if meta.schema_version != SCHEMA_VERSION {
continue;
}
let bin = path.with_extension("bin");
if !bin.exists() {
fs::remove_file(&path).ok();
continue;
}
if meta_expired(
meta.written_unix_secs,
meta.written_nanos,
self.opts.ttl,
now_nanos,
) {
fs::remove_file(&path).ok();
fs::remove_file(&bin).ok();
continue;
}
let take = match best {
None => true,
Some((ref cur, _)) => mode.better(&meta, cur),
};
if take {
best = Some((meta, path));
}
}
let (meta, meta_path) = match best {
Some(b) => b,
None => {
lookup_cache_invalidate(&cache_key);
return Ok(None);
}
};
let bin_path = meta_path.with_extension("bin");
let payload = match fs::read(&bin_path) {
Ok(v) => v,
Err(_) => return Ok(None),
};
if checksum_hex(&payload) != meta.checksum_hex {
fs::remove_file(&meta_path).ok();
fs::remove_file(&bin_path).ok();
lookup_cache_invalidate(&cache_key);
return Ok(None);
}
let entry = CachedEntry {
payload,
objective: meta.objective,
iteration: meta.iteration,
written_unix_secs: meta.written_unix_secs,
kind: meta.kind,
};
if let Ok(md) = fs::metadata(&meta_path)
&& let Ok(mtime) = md.modified()
{
let write_nanos =
(meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
lookup_cache_insert(
cache_key,
CachedLookup {
meta_path: meta_path.clone(),
meta_mtime: mtime,
write_nanos,
entry: entry.clone(),
},
);
}
Ok(Some(entry))
}
pub fn save(
&self,
key: &Fingerprint,
payload: &[u8],
objective: Option<f64>,
iteration: Option<u64>,
kind: EntryKind,
) -> Result<String, StoreError> {
let run_id = fresh_run_id();
self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
Ok(run_id)
}
pub fn save_overwrite(
&self,
key: &Fingerprint,
run_id: &str,
payload: &[u8],
objective: Option<f64>,
iteration: Option<u64>,
kind: EntryKind,
) -> Result<(), StoreError> {
lookup_cache_invalidate(&LookupCacheKey {
fp: *key,
mode: LookupMode::Best,
});
lookup_cache_invalidate(&LookupCacheKey {
fp: *key,
mode: LookupMode::Latest,
});
let dir = self.key_dir(key);
fs::create_dir_all(&dir)?;
let pid = std::process::id();
let nonce = nanos_now();
let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}"));
let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}"));
let bin_final = dir.join(format!("{run_id}.bin"));
let meta_final = dir.join(format!("{run_id}.json"));
{
let mut f = fs::File::create(&bin_tmp)?;
f.write_all(payload)?;
f.sync_all().ok();
}
let checksum = checksum_hex(payload);
let objective_finite = objective.filter(|o| o.is_finite());
let (secs, subsec_nanos) = unix_now_parts();
let meta = OnDiskMeta {
schema_version: SCHEMA_VERSION,
written_unix_secs: secs,
written_nanos: subsec_nanos,
objective: objective_finite,
iteration,
kind,
checksum_hex: checksum,
payload_bytes: payload.len() as u64,
};
{
let json = serde_json::to_vec_pretty(&meta)?;
let mut f = fs::File::create(&meta_tmp)?;
f.write_all(&json)?;
f.sync_all().ok();
}
let bin_rename = fs::rename(&bin_tmp, &bin_final);
if let Err(e) = bin_rename {
fs::remove_file(&bin_tmp).ok();
fs::remove_file(&meta_tmp).ok();
return Err(StoreError::Io(e));
}
if let Err(e) = fs::rename(&meta_tmp, &meta_final) {
fs::remove_file(&bin_final).ok();
fs::remove_file(&meta_tmp).ok();
return Err(StoreError::Io(e));
}
if let Ok(d) = fs::File::open(&dir) {
d.sync_all().ok();
}
let approx_added = payload.len() as u64 + APPROX_META_BYTES;
let prev_total = self.byte_total.fetch_add(approx_added, Ordering::Relaxed);
let new_total = prev_total + approx_added;
let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
let over_budget = new_total > self.opts.size_budget_bytes;
if n == 0 || over_budget || n.is_multiple_of(EVICT_EVERY_N_SAVES) {
self.evict_overflow().ok();
}
Ok(())
}
pub fn evict_overflow(&self) -> Result<(), StoreError> {
let read_dir = match fs::read_dir(&self.root) {
Ok(rd) => rd,
Err(_) => return Ok(()),
};
let mut all: Vec<(PathBuf, PathBuf, u64, u128)> = Vec::new();
let now_nanos = nanos_now();
for key_dir_entry in read_dir {
let key_dir = match key_dir_entry {
Ok(e) => e.path(),
Err(_) => continue,
};
if !key_dir.is_dir() {
continue;
}
let inner = match fs::read_dir(&key_dir) {
Ok(rd) => rd,
Err(_) => continue,
};
for f in inner {
let p = match f {
Ok(e) => e.path(),
Err(_) => continue,
};
let name = match p.file_name().and_then(|s| s.to_str()) {
Some(s) => s.to_string(),
None => continue,
};
if name.contains(".tmp.") {
if let Some(pid) = parse_tmp_pid(&name)
&& pid != std::process::id()
{
fs::remove_file(&p).ok();
}
continue;
}
if p.extension().and_then(|s| s.to_str()) != Some("json") {
continue;
}
let meta_md = match fs::metadata(&p) {
Ok(m) => m,
Err(_) => continue,
};
let bin = p.with_extension("bin");
let bin_md = match fs::metadata(&bin) {
Ok(m) => m,
Err(_) => {
fs::remove_file(&p).ok();
continue;
}
};
let meta = match read_meta(&p) {
Ok(m) => m,
Err(_) => {
fs::remove_file(&p).ok();
fs::remove_file(&bin).ok();
continue;
}
};
let write_nanos = (meta.written_unix_secs as u128) * 1_000_000_000u128
+ meta.written_nanos as u128;
if meta_expired(
meta.written_unix_secs,
meta.written_nanos,
self.opts.ttl,
now_nanos,
) {
fs::remove_file(&p).ok();
fs::remove_file(&bin).ok();
continue;
}
let total_bytes = meta_md.len() + bin_md.len();
all.push((p, bin, total_bytes, write_nanos));
}
if fs::read_dir(&key_dir)
.map(|mut it| it.next().is_none())
.unwrap_or(false)
{
fs::remove_dir(&key_dir).ok();
}
}
let total: u64 = all.iter().map(|e| e.2).sum();
if total <= self.opts.size_budget_bytes {
self.byte_total.store(total, Ordering::Relaxed);
return Ok(());
}
all.sort_by_key(|e| e.3);
let mut remaining = total;
for (meta, bin, bytes, _) in all.into_iter() {
if remaining <= self.opts.size_budget_bytes {
break;
}
fs::remove_file(&meta).ok();
fs::remove_file(&bin).ok();
remaining = remaining.saturating_sub(bytes);
}
self.byte_total.store(remaining, Ordering::Relaxed);
Ok(())
}
}
const APPROX_META_BYTES: u64 = 512;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum LookupMode {
Best,
Latest,
}
impl LookupMode {
fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
match self {
LookupMode::Best => entry_better(candidate, current),
LookupMode::Latest => entry_newer(candidate, current),
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash)]
struct LookupCacheKey {
fp: Fingerprint,
mode: LookupMode,
}
#[derive(Clone)]
struct CachedLookup {
meta_path: PathBuf,
meta_mtime: SystemTime,
write_nanos: u128,
entry: CachedEntry,
}
const fn meta_expired(secs: u64, nanos: u32, ttl: Duration, now_nanos: u128) -> bool {
let ttl_nanos = ttl.as_nanos();
if ttl_nanos == 0 {
return false;
}
let write_nanos = (secs as u128) * 1_000_000_000u128 + nanos as u128;
now_nanos.saturating_sub(write_nanos) >= ttl_nanos
}
fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
CACHE.get_or_init(|| Mutex::new(HashMap::new()))
}
const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
}
fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
let guard = lookup_cache().lock().ok()?;
guard.get(key).cloned()
}
fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
if let Ok(mut guard) = lookup_cache().lock() {
let new_bytes = cached_lookup_resident_bytes(&val);
if new_bytes > LOOKUP_CACHE_MAX_BYTES {
return;
}
let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
if let Some(old) = guard.remove(&key) {
resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
}
while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
|| resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
{
let oldest = guard
.iter()
.min_by_key(|(_, cached)| cached.write_nanos)
.map(|(old_key, _)| *old_key);
let Some(oldest) = oldest else {
break;
};
if let Some(old) = guard.remove(&oldest) {
resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
}
}
guard.insert(key, val);
}
}
fn lookup_cache_invalidate(key: &LookupCacheKey) {
if let Ok(mut guard) = lookup_cache().lock() {
guard.remove(key);
}
}
const EVICT_EVERY_N_SAVES: u64 = 32;
fn parse_tmp_pid(name: &str) -> Option<u32> {
let tail = name.split(".tmp.").nth(1)?;
let pid_str = tail.split('.').next()?;
pid_str.parse::<u32>().ok()
}
fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
let bytes = fs::read(path)?;
let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
Ok(parsed)
}
fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
match (candidate.objective, current.objective) {
(Some(c), Some(d)) => {
if (c - d).abs() < 1e-12 {
match (candidate.kind, current.kind) {
(EntryKind::Final, EntryKind::Checkpoint) => true,
(EntryKind::Checkpoint, EntryKind::Final) => false,
_ => entry_newer(candidate, current),
}
} else {
c < d
}
}
(Some(_), None) => true,
(None, Some(_)) => false,
(None, None) => entry_newer(candidate, current),
}
}
fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
let candidate_stamp = (
candidate.written_unix_secs,
candidate.written_nanos,
candidate_kind_rank(candidate.kind),
);
let current_stamp = (
current.written_unix_secs,
current.written_nanos,
candidate_kind_rank(current.kind),
);
candidate_stamp > current_stamp
}
const fn candidate_kind_rank(kind: EntryKind) -> u8 {
match kind {
EntryKind::Checkpoint => 0,
EntryKind::Final => 1,
}
}
fn checksum_hex(payload: &[u8]) -> String {
let mut h = Sha256::new();
h.update(payload);
let out = h.finalize();
let mut s = String::with_capacity(out.len() * 2);
for b in out.iter() {
use std::fmt::Write;
write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
}
s
}
static TEST_TIME_OFFSET_NS: AtomicU64 = AtomicU64::new(0);
fn test_time_offset_ns() -> u64 {
TEST_TIME_OFFSET_NS.load(Ordering::Relaxed)
}
fn unix_now_parts() -> (u64, u32) {
let base = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let total = base.saturating_add(u128::from(test_time_offset_ns()));
let secs = (total / 1_000_000_000u128) as u64;
let nanos = (total % 1_000_000_000u128) as u32;
(secs, nanos)
}
fn nanos_now() -> u128 {
let base = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
base.saturating_add(u128::from(test_time_offset_ns()))
}
fn fresh_run_id() -> String {
let pid = std::process::id();
let nanos = nanos_now();
format!("r{pid:x}-{nanos:x}")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cache::key::Fingerprinter;
fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
let dir = tempfile::tempdir().unwrap();
let store = WarmStartStore::open(
dir.path().to_path_buf(),
StoreOptions {
size_budget_bytes: 1024 * 1024,
ttl: Duration::from_secs(60),
},
)
.unwrap();
(dir, store)
}
fn key_for(s: &str) -> Fingerprint {
let mut fp = Fingerprinter::new();
fp.absorb_str(b"test", s);
fp.finalize()
}
#[test]
fn roundtrip_save_then_lookup() {
let (_d, store) = temp_store();
let key = key_for("roundtrip");
let _id = store
.save(
&key,
b"hello-warm",
Some(1.5),
Some(7),
EntryKind::Checkpoint,
)
.unwrap();
let got = store.lookup(&key).unwrap().unwrap();
assert_eq!(got.payload, b"hello-warm");
assert_eq!(got.objective, Some(1.5));
assert_eq!(got.iteration, Some(7));
assert_eq!(got.kind, EntryKind::Checkpoint);
}
#[test]
fn lookup_picks_lowest_objective() {
let (_d, store) = temp_store();
let key = key_for("multi");
store
.save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
.unwrap();
store
.save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
.unwrap();
store
.save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
.unwrap();
let got = store.lookup(&key).unwrap().unwrap();
assert_eq!(got.payload, b"better");
assert_eq!(got.objective, Some(1.0));
}
#[test]
fn lookup_latest_ignores_objective_ordering() {
let (_d, store) = temp_store();
let key = key_for("latest-vs-best");
store
.save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
.unwrap();
TEST_TIME_OFFSET_NS.fetch_add(2_000_000, Ordering::Relaxed);
store
.save(
&key,
b"newer-higher-objective",
Some(10.0),
Some(2),
EntryKind::Checkpoint,
)
.unwrap();
let best = store.lookup(&key).unwrap().unwrap();
assert_eq!(best.payload, b"low-objective");
let latest = store.lookup_latest(&key).unwrap().unwrap();
assert_eq!(latest.payload, b"newer-higher-objective");
assert_eq!(latest.iteration, Some(2));
}
#[test]
fn tiebreak_final_beats_checkpoint() {
let (_d, store) = temp_store();
let key = key_for("tie");
store
.save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
.unwrap();
store
.save(&key, b"final", Some(1.0), None, EntryKind::Final)
.unwrap();
let got = store.lookup(&key).unwrap().unwrap();
assert_eq!(got.payload, b"final");
assert_eq!(got.kind, EntryKind::Final);
}
#[test]
fn tiebreak_latest_mtime_when_no_objective() {
let (_d, store) = temp_store();
let key = key_for("latest");
store
.save(&key, b"first", None, None, EntryKind::Checkpoint)
.unwrap();
TEST_TIME_OFFSET_NS.fetch_add(1_100_000_000, Ordering::Relaxed);
store
.save(&key, b"second", None, None, EntryKind::Checkpoint)
.unwrap();
let got = store.lookup(&key).unwrap().unwrap();
assert_eq!(got.payload, b"second");
}
#[test]
fn corrupt_payload_is_cleaned_up() {
let (_d, store) = temp_store();
let key = key_for("corrupt");
store
.save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
.unwrap();
let dir = store.key_dir(&key);
for entry in fs::read_dir(&dir).unwrap() {
let p = entry.unwrap().path();
if p.extension().and_then(|s| s.to_str()) == Some("bin") {
fs::write(&p, b"tampered!").unwrap();
}
}
let got = store.lookup(&key).unwrap();
assert!(got.is_none(), "tampered entry must be rejected");
let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
assert!(remaining.is_empty(), "corrupt entry should be removed");
}
#[test]
fn corrupt_meta_json_is_cleaned_up() {
let (_d, store) = temp_store();
let key = key_for("badjson");
store
.save(&key, b"x", None, None, EntryKind::Checkpoint)
.unwrap();
let dir = store.key_dir(&key);
for entry in fs::read_dir(&dir).unwrap() {
let p = entry.unwrap().path();
if p.extension().and_then(|s| s.to_str()) == Some("json") {
fs::write(&p, b"{not valid json").unwrap();
}
}
let got = store.lookup(&key).unwrap();
assert!(got.is_none());
}
#[test]
fn schema_mismatch_is_ignored() {
let (_d, store) = temp_store();
let key = key_for("schema");
store
.save(&key, b"x", None, None, EntryKind::Checkpoint)
.unwrap();
let dir = store.key_dir(&key);
for entry in fs::read_dir(&dir).unwrap() {
let p = entry.unwrap().path();
if p.extension().and_then(|s| s.to_str()) == Some("json") {
let raw = fs::read(&p).unwrap();
let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
}
}
assert!(store.lookup(&key).unwrap().is_none());
}
#[test]
fn missing_bin_treated_as_missing() {
let (_d, store) = temp_store();
let key = key_for("nobin");
store
.save(&key, b"x", None, None, EntryKind::Checkpoint)
.unwrap();
let dir = store.key_dir(&key);
for entry in fs::read_dir(&dir).unwrap() {
let p = entry.unwrap().path();
if p.extension().and_then(|s| s.to_str()) == Some("bin") {
fs::remove_file(&p).unwrap();
}
}
assert!(store.lookup(&key).unwrap().is_none());
}
#[test]
fn missing_key_returns_none() {
let (_d, store) = temp_store();
let key = key_for("absent");
assert!(store.lookup(&key).unwrap().is_none());
}
#[test]
fn lru_eviction_under_size_budget() {
let dir = tempfile::tempdir().unwrap();
let store = WarmStartStore::open(
dir.path().to_path_buf(),
StoreOptions {
size_budget_bytes: 4 * 1024,
ttl: Duration::from_secs(3600),
},
)
.unwrap();
let mut keys = Vec::new();
for i in 0..20 {
let mut fp = Fingerprinter::new();
fp.absorb_u64(b"i", i);
let key = fp.finalize();
keys.push(key);
let payload = vec![0u8; 256];
store
.save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
.unwrap();
}
let mut total = 0u64;
for kd in fs::read_dir(store.root()).unwrap() {
let kd = kd.unwrap().path();
if kd.is_dir() {
for f in fs::read_dir(&kd).unwrap() {
total += fs::metadata(f.unwrap().path()).unwrap().len();
}
}
}
assert!(
total <= 8 * 1024,
"eviction failed to bound size (got {total})"
);
assert!(store.lookup(&keys[0]).unwrap().is_none());
assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
}
#[test]
fn ttl_drops_old_entries() {
let dir = tempfile::tempdir().unwrap();
let store = WarmStartStore::open(
dir.path().to_path_buf(),
StoreOptions {
size_budget_bytes: 1024 * 1024,
ttl: Duration::from_secs(1),
},
)
.unwrap();
let key = key_for("ttl");
store
.save(&key, b"x", None, None, EntryKind::Checkpoint)
.unwrap();
assert!(store.lookup(&key).unwrap().is_some());
TEST_TIME_OFFSET_NS.fetch_add(1_500_000_000, Ordering::Relaxed);
let other = key_for("ttl-other");
store
.save(&other, b"y", None, None, EntryKind::Checkpoint)
.unwrap();
assert!(store.lookup(&key).unwrap().is_none());
assert!(store.lookup(&other).unwrap().is_some());
}
#[test]
fn orphan_temp_files_from_dead_processes_are_swept() {
let (_d, store) = temp_store();
let key = key_for("tmp");
let dir = store.key_dir(&key);
fs::create_dir_all(&dir).unwrap();
let orphan_other = dir.join("r0-0.json.tmp.1.0");
let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
fs::write(&orphan_other, b"orphan").unwrap();
fs::write(&mine, b"mine").unwrap();
store.evict_overflow().unwrap();
assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
assert!(mine.exists(), "same-PID tmp file must be left alone");
}
#[test]
fn tmp_filenames_without_pid_are_skipped() {
let (_d, store) = temp_store();
let key = key_for("malformed");
let dir = store.key_dir(&key);
fs::create_dir_all(&dir).unwrap();
let weird = dir.join("garbage.tmp.notapid.suffix");
fs::write(&weird, b"x").unwrap();
store.evict_overflow().unwrap();
assert!(weird.exists());
}
#[test]
fn save_overwrite_keeps_single_entry() {
let (_d, store) = temp_store();
let key = key_for("overwrite");
let id = store
.save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
.unwrap();
store
.save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
.unwrap();
let dir = store.key_dir(&key);
let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
let got = store.lookup(&key).unwrap().unwrap();
assert_eq!(got.payload, b"v2");
assert_eq!(got.objective, Some(1.0));
}
#[test]
fn keys_are_isolated() {
let (_d, store) = temp_store();
let a = key_for("a");
let b = key_for("b");
store
.save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
.unwrap();
store
.save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
.unwrap();
assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
}
}