Skip to main content

gam_runtime/warm_start/
store.rs

1//! Filesystem store for warm-start entries.
2//!
3//! Each entry is a `(<runid>.json, <runid>.bin)` pair inside a per-key
4//! directory. Writes go through a temp-file → fsync → rename sequence so a
5//! crash mid-write never leaves a half-written entry visible to readers.
6//! Per-entry SHA-256 checksums catch any residual corruption.
7//!
8//! See [`crate::warm_start`] for the public API summary.
9
10use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21/// On-disk schema version. Bump on incompatible format changes; old entries
22/// are then ignored at read time and evicted on the next save.
23pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25/// Default disk-budget for the whole warm-start store root (~1 GiB).
26pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28/// Default TTL — entries untouched for this long are dropped.
29pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33    #[error("io: {0}")]
34    Io(#[from] io::Error),
35    #[error("json: {0}")]
36    Json(#[from] serde_json::Error),
37}
38
39/// Entry returned from [`WarmStartStore::lookup`].
40#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42    pub payload: Vec<u8>,
43    pub objective: Option<f64>,
44    pub iteration: Option<u64>,
45    pub written_unix_secs: u64,
46    pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51    /// Mid-fit checkpoint — fit was alive when written.
52    Checkpoint,
53    /// End-of-fit — fit terminated successfully.
54    Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59    schema_version: u32,
60    written_unix_secs: u64,
61    /// Nanosecond component of the write timestamp. Used to break ties in
62    /// LRU eviction so entries written within the same second don't sort
63    /// arbitrarily.
64    #[serde(default)]
65    written_nanos: u32,
66    objective: Option<f64>,
67    iteration: Option<u64>,
68    kind: EntryKind,
69    checksum_hex: String,
70    payload_bytes: u64,
71    /// Set when a lookup has reused this entry. Eviction keeps recently reused
72    /// entries behind never-hit writes when a tight budget forces a choice.
73    #[serde(default)]
74    accessed: bool,
75    /// Last-access timestamp (unix seconds + nanos). Distinct from the
76    /// immutable `written_*` creation stamp: a lookup that reuses this entry
77    /// bumps the access stamp (refreshing its TTL so hot entries survive)
78    /// WITHOUT touching `written_*`. Keeping the two separate is required for
79    /// correctness — `lookup_latest`/`entry_newer` order by the immutable
80    /// creation stamp, so if a read moved `written_*` forward the merely
81    /// *read* entry would masquerade as the most-recently-*written* one. Zero
82    /// (the serde default for entries written before this field existed, and
83    /// for never-reused entries) means "no access newer than creation": TTL
84    /// then falls back to `written_*`.
85    #[serde(default)]
86    accessed_unix_secs: u64,
87    #[serde(default)]
88    accessed_nanos: u32,
89}
90
91/// Effective activity timestamp (nanoseconds since the unix epoch): the more
92/// recent of the immutable creation stamp and the last-access stamp. TTL
93/// expiry is measured from this so a reused entry stays alive, while ordering
94/// (`entry_newer`) keys on `written_*` alone.
95fn meta_activity_nanos(meta: &OnDiskMeta) -> u128 {
96    let written =
97        (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
98    let accessed =
99        (meta.accessed_unix_secs as u128) * 1_000_000_000u128 + meta.accessed_nanos as u128;
100    written.max(accessed)
101}
102
103#[derive(Debug, Clone)]
104pub struct StoreOptions {
105    pub size_budget_bytes: u64,
106    pub ttl: Duration,
107}
108
109impl Default for StoreOptions {
110    fn default() -> Self {
111        Self {
112            size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
113            ttl: Duration::from_secs(DEFAULT_TTL_SECS),
114        }
115    }
116}
117
118#[derive(Debug)]
119pub struct WarmStartStore {
120    root: PathBuf,
121    opts: StoreOptions,
122    /// Per-store metadata index. It is populated lazily and shared by clones so
123    /// checkpoint-heavy sessions do not repeatedly open every metadata JSON.
124    index: Arc<Mutex<MetadataIndex>>,
125    /// Approximate sum of bytes written under `root`. Used to throttle the
126    /// full directory-scanning eviction in [`Self::save_overwrite`] — see
127    /// `EVICT_EVERY_N_SAVES`. The counter resyncs to ground truth after every
128    /// triggered sweep. Shared across clones (`Arc`) so the eviction throttle
129    /// survives the per-operation store reuse in
130    /// `solver::persistent_warm_start::persistent_store` — otherwise every
131    /// fit reset the counter and ran a full eviction walk on its first save
132    /// (gam#1114).
133    byte_total: Arc<AtomicU64>,
134    /// Monotonically increasing save counter, shared across clones. Used
135    /// together with `byte_total` to throttle the eviction directory walk.
136    save_counter: Arc<AtomicU64>,
137    /// Root-directory mtime observed at the last completed eviction sweep,
138    /// shared across clones. When a throttled sweep fires while the store is
139    /// comfortably *under* the size budget, the only work left for it is a
140    /// TTL/byte resync over every key dir — an N-dir `read_dir` + `stat` walk
141    /// that, with thousands of fingerprint dirs in a long CI run, dominates
142    /// the per-32-save sweep even after the per-dir listing cache lands (the
143    /// residual #1114 walk). The root dir's mtime is bumped by the OS whenever
144    /// a key dir is created or removed under it, so an unchanged root mtime
145    /// means no key dir was added/dropped since our last sweep; combined with
146    /// a comfortably-under-budget byte total, the size-eviction walk is then a
147    /// guaranteed no-op and is skipped. TTL expiry of *existing* entries does
148    /// not change the root mtime, but it is already performed lazily on every
149    /// `lookup_with` and on the next root-changing save, so skipping it here is
150    /// behaviour-neutral (no entry the gate skips could be returned stale).
151    last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
152    /// Per-store test-only monotonic time offset (nanoseconds) added to every
153    /// `*_now` reading. Always zero in production. Tests mutate it through
154    /// [`Self::test_advance_time`] to simulate elapsed time without
155    /// `thread::sleep`. Lives on the store rather than as a process-wide
156    /// static so parallel tests with their own stores cannot pollute each
157    /// other's clocks — a global clock made `cargo test` non-deterministic
158    /// (gam test infra: one test's +1.5s TTL advance was bumping another
159    /// test's just-saved entry past its 1s TTL on immediate lookup).
160    test_time_offset_ns: AtomicU64,
161}
162
163impl Clone for WarmStartStore {
164    fn clone(&self) -> Self {
165        Self {
166            root: self.root.clone(),
167            opts: self.opts.clone(),
168            index: Arc::clone(&self.index),
169            // Throttle counters are shared across clones so the eviction
170            // directory walk stays throttled to every Nth save across the
171            // whole process, even though `persistent_store` hands out a fresh
172            // clone per save/lookup.
173            byte_total: Arc::clone(&self.byte_total),
174            save_counter: Arc::clone(&self.save_counter),
175            last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
176            test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
177        }
178    }
179}
180
181impl WarmStartStore {
182    /// Open (or create) a store rooted at `root`.
183    pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
184        fs::create_dir_all(&root)?;
185        Ok(Self {
186            root,
187            opts,
188            index: Arc::new(Mutex::new(MetadataIndex::default())),
189            byte_total: Arc::new(AtomicU64::new(0)),
190            save_counter: Arc::new(AtomicU64::new(0)),
191            last_evict_root_mtime: Arc::new(Mutex::new(None)),
192            test_time_offset_ns: AtomicU64::new(0),
193        })
194    }
195
196    pub fn root(&self) -> &Path {
197        &self.root
198    }
199
200    pub fn options(&self) -> &StoreOptions {
201        &self.opts
202    }
203
204    fn key_dir(&self, key: &Fingerprint) -> PathBuf {
205        self.root.join(key.to_hex())
206    }
207
208    /// Look up the best entry for `key`, or `None` if no valid entry exists.
209    ///
210    /// Selection: lowest `objective` first; ties prefer [`EntryKind::Final`]
211    /// over [`EntryKind::Checkpoint`], then latest `written_unix_secs`. If
212    /// every candidate has `objective = None`, picks the latest write.
213    /// Corrupt or schema-mismatched candidates are silently cleaned up and
214    /// skipped.
215    pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
216        self.lookup_with(key, LookupMode::Best)
217    }
218
219    /// Look up the newest valid entry for `key`, or `None` if no valid entry
220    /// exists.
221    ///
222    /// Unlike [`Self::lookup`], this deliberately ignores objective values.
223    /// Use this for near-match seed namespaces where entries may come from
224    /// different folds, diseases, or row sets, and objective magnitudes are
225    /// not comparable. Exact-key resume should keep using [`Self::lookup`].
226    pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
227        self.lookup_with(key, LookupMode::Latest)
228    }
229
230    fn lookup_with(
231        &self,
232        key: &Fingerprint,
233        mode: LookupMode,
234    ) -> Result<Option<WarmStartEntry>, StoreError> {
235        let dir = self.key_dir(key);
236        if !dir.exists() {
237            // A stale in-memory cache entry could outlive its directory if
238            // another process evicted us. Drop it so we don't return data
239            // for a key whose backing files are gone.
240            lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
241            self.metadata_index_remove_key(key);
242            return Ok(None);
243        }
244        // Fast path: if the same (key, mode) was looked up before and the
245        // chosen meta file's mtime is unchanged, return the cached entry
246        // without re-reading any JSON or re-checksumming the .bin payload.
247        // A separate writer (this process or another) bumps mtime on
248        // rename → mismatch → we fall through to the slow path. The TTL
249        // cutoff is also re-checked here against `nanos_now()` so a hot
250        // poll loop cannot keep returning an expired entry between eviction
251        // sweeps (eviction is throttled via `EVICT_EVERY_N_SAVES`).
252        let cache_key = LookupCacheKey { fp: *key, mode };
253        let now_nanos = self.nanos_now();
254        if let Some(hit) = lookup_cache_get(&cache_key) {
255            if let Ok(md) = fs::metadata(&hit.meta_path)
256                && md.modified().ok() == Some(hit.meta_mtime)
257            {
258                let expired = self.opts.ttl.as_nanos() > 0
259                    && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
260                if !expired {
261                    let entry = self.touch_lookup_hit(&hit.meta_path, hit.entry)?;
262                    return Ok(Some(entry));
263                }
264                lookup_cache_invalidate(&cache_key);
265                let bin = hit.meta_path.with_extension("bin");
266                fs::remove_file(&hit.meta_path).ok();
267                fs::remove_file(&bin).ok();
268                // Removing the entry stales any cached directory listing.
269                self.metadata_index_remove(&hit.meta_path);
270                return Ok(None);
271            }
272            lookup_cache_invalidate(&cache_key);
273        }
274        // Resolve all valid entries for this key directory. `scan_key_dir`
275        // serves the listing from the per-store directory cache when the dir's
276        // mtime is unchanged since the last scan (no re-`read_dir`, no per-file
277        // `stat`, no JSON re-parse), and drops TTL-expired / corrupt entries in
278        // passing — exactly the syscall storm #1114 traced.
279        let mut best: Option<(OnDiskMeta, PathBuf)> = None;
280        for scanned in self.scan_key_dir(&dir, now_nanos) {
281            let take = match best {
282                None => true,
283                Some((ref cur, _)) => mode.better(&scanned.meta, cur),
284            };
285            if take {
286                best = Some((scanned.meta, scanned.meta_path));
287            }
288        }
289        let (meta, meta_path) = match best {
290            Some(b) => b,
291            None => {
292                lookup_cache_invalidate(&cache_key);
293                return Ok(None);
294            }
295        };
296        let bin_path = meta_path.with_extension("bin");
297        let payload = match fs::read(&bin_path) {
298            Ok(v) => v,
299            Err(_) => return Ok(None),
300        };
301        // Validate checksum
302        if checksum_hex(&payload) != meta.checksum_hex {
303            fs::remove_file(&meta_path).ok();
304            fs::remove_file(&bin_path).ok();
305            lookup_cache_invalidate(&cache_key);
306            self.metadata_index_remove(&meta_path);
307            return Ok(None);
308        }
309        let entry = WarmStartEntry {
310            payload,
311            objective: meta.objective,
312            iteration: meta.iteration,
313            written_unix_secs: meta.written_unix_secs,
314            kind: meta.kind,
315        };
316        let (meta, entry) = self.touch_lookup_meta(&meta_path, meta, entry)?;
317        // Record (meta_path, mtime) → entry so subsequent identical lookups
318        // short-circuit until the meta file's mtime changes. The effective
319        // activity stamp (post-touch, so it reflects this very access) is
320        // cached alongside so the fast path can re-apply the TTL cutoff without
321        // re-reading the JSON.
322        if let Ok(md) = fs::metadata(&meta_path)
323            && let Ok(mtime) = md.modified()
324        {
325            let write_nanos = meta_activity_nanos(&meta);
326            lookup_cache_insert(
327                cache_key,
328                CachedLookup {
329                    meta_path: meta_path.clone(),
330                    meta_mtime: mtime,
331                    write_nanos,
332                    entry: entry.clone(),
333                },
334            );
335        }
336        Ok(Some(entry))
337    }
338
339    /// Save a new entry with a fresh run-id. Returns the run-id (caller may
340    /// hand it to [`Self::save_overwrite`] for periodic in-place updates).
341    pub fn save(
342        &self,
343        key: &Fingerprint,
344        payload: &[u8],
345        objective: Option<f64>,
346        iteration: Option<u64>,
347        kind: EntryKind,
348    ) -> Result<String, StoreError> {
349        let run_id = self.fresh_run_id();
350        self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
351        Ok(run_id)
352    }
353
354    /// Save under a specific run-id (overwrites an existing entry with the
355    /// same id atomically).
356    pub fn save_overwrite(
357        &self,
358        key: &Fingerprint,
359        run_id: &str,
360        payload: &[u8],
361        objective: Option<f64>,
362        iteration: Option<u64>,
363        kind: EntryKind,
364    ) -> Result<(), StoreError> {
365        // Any new write under this key may change which entry wins both
366        // `LookupMode::Best` and `LookupMode::Latest`, so drop both cached
367        // rows before touching disk. A pure save_overwrite of the same
368        // run_id would also bump mtime and self-invalidate, but a save()
369        // with a fresh run_id leaves the old meta file unchanged — only
370        // explicit invalidation catches that.
371        lookup_cache_invalidate(&LookupCacheKey {
372            fp: *key,
373            mode: LookupMode::Best,
374        });
375        lookup_cache_invalidate(&LookupCacheKey {
376            fp: *key,
377            mode: LookupMode::Latest,
378        });
379        let dir = self.key_dir(key);
380        let pid = std::process::id();
381        // 1. Compute checksum from payload.
382        let checksum = checksum_hex(payload);
383        let objective_finite = objective.filter(|o| o.is_finite());
384        // The meta's `written_unix_secs`/`written_nanos` are captured INSIDE the
385        // write loop — just before the meta_tmp is written, AFTER the bin write
386        // has completed. The stored timestamp drives the TTL contract: an
387        // entry's clock should start ticking from when the entry becomes
388        // (nearly) visible to lookups, not from `save_overwrite`'s entry. On
389        // slow disks the bin write + fsync + rename can take longer than the
390        // entire TTL window itself (the warm-start test fixture pins TTL=1s
391        // while the ext4-backed CI image takes >1s on small writes), so an
392        // up-front stamp causes the entry to be classified as expired the
393        // moment `save_overwrite` returns. Pushing the stamp past the bin
394        // fsync removes that systemic drift from the cost of writing the
395        // entry — only the meta fsync + final rename + dir fsync still
396        // elapse between the stamp and the entry becoming visible.
397
398        // 3. Write both temp files and atomically rename them into place. The
399        //    whole "ensure dir → write temps → rename" sequence is retried once
400        //    as a unit on `ErrorKind::NotFound`, because a concurrent process'
401        //    `evict_overflow` can `remove_dir` this key dir the instant it
402        //    observes it empty (store.rs `evict_overflow`, "Sweep now-empty key
403        //    dirs"). That removal races every write step here: it can vanish the
404        //    dir after `create_dir_all` but before a temp `File::create`, or
405        //    take the dir *and our just-written temps with it* before the
406        //    rename, surfacing as `io: No such file or directory (os error 2)`
407        //    under parallel CV / bootstrap fitting (gam#868). Retrying the
408        //    sequence (not an individual step) is the only correct response: a
409        //    bare rename retry can't recover once the source temp was swept with
410        //    the dir, so we recreate the dir and rewrite the temps from the
411        //    in-memory `payload` / `meta_json` we still hold. A single retry is
412        //    sufficient — the eviction window is one `remove_dir` syscall wide —
413        //    and a second genuine `NotFound` is propagated as before.
414        let nonce = self.nanos_now();
415        let bin_final = dir.join(format!("{run_id}.bin"));
416        let meta_final = dir.join(format!("{run_id}.json"));
417        let mut attempt = 0u8;
418        let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
419            let meta = OnDiskMeta {
420                schema_version: SCHEMA_VERSION,
421                written_unix_secs: secs,
422                written_nanos: subsec_nanos,
423                objective: objective_finite,
424                iteration,
425                kind,
426                checksum_hex: checksum.clone(),
427                payload_bytes: payload.len() as u64,
428                accessed: false,
429                accessed_unix_secs: 0,
430                accessed_nanos: 0,
431            };
432            Ok(serde_json::to_vec_pretty(&meta)?)
433        };
434        loop {
435            let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
436            let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
437            let stamp_fn = || self.unix_now_parts();
438            let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
439                build_meta_json(secs, subsec_nanos)
440                    .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
441            };
442            match write_and_promote_entry(&EntryWrite {
443                dir: &dir,
444                bin_tmp: &bin_tmp,
445                meta_tmp: &meta_tmp,
446                payload,
447                bin_final: &bin_final,
448                meta_final: &meta_final,
449                stamp_fn: &stamp_fn,
450                build_meta_json: &build_meta_for_io,
451            }) {
452                Ok(()) => break,
453                Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
454                    // A sibling process' eviction removed the key dir mid-write.
455                    // Clean up any partial temps, then retry the whole sequence
456                    // once after recreating the dir inside `write_and_promote_entry`.
457                    fs::remove_file(&bin_tmp).ok();
458                    fs::remove_file(&meta_tmp).ok();
459                    attempt += 1;
460                    continue;
461                }
462                Err(e) => {
463                    fs::remove_file(&bin_tmp).ok();
464                    fs::remove_file(&meta_tmp).ok();
465                    fs::remove_file(&bin_final).ok();
466                    return Err(StoreError::Io(e));
467                }
468            }
469        }
470        // Fsync the containing directory so the rename itself is durable
471        // across a power loss / hard crash. fs::File::sync_all on the
472        // payload only guarantees the file content reaches disk; without
473        // also fsyncing the directory inode, the *rename* (which is what
474        // makes the entry visible to lookups) can be lost. Best-effort on
475        // platforms where opening a directory for fsync is not supported.
476        if let Ok(d) = fs::File::open(&dir) {
477            d.sync_all().ok();
478        }
479        self.metadata_index_upsert(&meta_final, &bin_final).ok();
480        // 5. Best-effort eviction; failure here is non-fatal. Throttle the
481        // full directory scan: maintain a process-wide approximate byte
482        // total and only run eviction when the per-save counter wraps
483        // `EVICT_EVERY_N_SAVES` as a drift-resync trigger, or on the very
484        // first save (so a fresh process inheriting a populated store root
485        // sweeps once). The
486        // counter is best-effort: it can drift relative to disk truth
487        // because other processes may write/evict, but every triggered
488        // sweep resyncs it to ground truth.
489        //
490        // The counter throttle alone does NOT bound the store: a burst of up
491        // to `EVICT_EVERY_N_SAVES - 1` saves between two counter-triggered
492        // sweeps can push the footprint arbitrarily far past the budget (e.g.
493        // 31 payloads under a budget that fits a handful). Bound it by also
494        // sweeping whenever the approximate byte total already exceeds the
495        // budget — a single cheap atomic load, so the common under-budget path
496        // still walks the directory only every Nth save, while an over-budget
497        // total forces the very next save to reclaim it. The eviction resyncs
498        // `byte_total` to ground truth, so this fires once per crossing rather
499        // than on every subsequent save.
500        let approx_added = payload.len() as u64 + APPROX_META_BYTES;
501        let new_total = self.byte_total.fetch_add(approx_added, Ordering::Relaxed) + approx_added;
502        let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
503        if n == 0
504            || n.is_multiple_of(EVICT_EVERY_N_SAVES)
505            || new_total > self.opts.size_budget_bytes
506        {
507            self.evict_overflow().ok();
508        }
509        Ok(())
510    }
511
512    /// Drop entries older than TTL, then evict by recorded write-time
513    /// ascending until total bytes ≤ `opts.size_budget_bytes`. Idempotent;
514    /// safe under concurrent processes (worst case some entries are
515    /// double-removed, which is a no-op).
516    ///
517    /// Sort key is the `(written_unix_secs, written_nanos)` recorded in
518    /// each entry's meta, not the filesystem mtime — at second-resolution
519    /// mtime, batches of writes within the same second would sort
520    /// arbitrarily and could evict the most recent entry.
521    pub fn evict_overflow(&self) -> Result<(), StoreError> {
522        // Root-mtime short-circuit. A throttled sweep that fires while the
523        // approximate byte total is comfortably under budget has no size
524        // eviction to do; its only residual work is the TTL/byte resync walk
525        // over every key dir. The root mtime is bumped whenever a key dir is
526        // created/removed beneath it, so if it is unchanged since our last
527        // completed sweep AND we are under budget, no key dir was added or
528        // dropped and the size-eviction walk is provably a no-op — skip the
529        // N-dir `read_dir`+`stat` storm. (TTL expiry of existing entries does
530        // not move the root mtime, but it is already enforced lazily on every
531        // `lookup_with` and on the next root-changing save, so the gate cannot
532        // surface a stale entry.) This trims the residual #1114 walk in long
533        // refit-heavy CI runs where thousands of fingerprint dirs accumulate.
534        let current_root_mtime = fs::metadata(&self.root)
535            .ok()
536            .and_then(|m| m.modified().ok());
537        if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
538            && let Some(now_mtime) = current_root_mtime
539            && let Ok(last) = self.last_evict_root_mtime.lock()
540            && *last == Some(now_mtime)
541        {
542            return Ok(());
543        }
544        let read_dir = match fs::read_dir(&self.root) {
545            Ok(rd) => rd,
546            Err(_) => return Ok(()),
547        };
548        // Collect (meta_path, bin_path, total_bytes, write_nanos_since_epoch, accessed).
549        let mut all: Vec<(PathBuf, PathBuf, u64, u128, bool)> = Vec::new();
550        let now_nanos = self.nanos_now();
551        for key_dir_entry in read_dir {
552            let key_dir = match key_dir_entry {
553                Ok(e) => e.path(),
554                Err(_) => continue,
555            };
556            if !key_dir.is_dir() {
557                continue;
558            }
559            // `scan_key_dir` reuses the per-store directory-listing cache when
560            // the key dir's mtime is unchanged, so an unchanged dir costs a
561            // single `stat` rather than a `read_dir` + per-file `stat` + JSON
562            // read of every entry. It also sweeps foreign tmp files and drops
563            // corrupt / TTL-expired entries, mirroring the old inline pass.
564            let scanned = self.scan_key_dir(&key_dir, now_nanos);
565            for entry in &scanned {
566                let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
567                    + entry.meta.written_nanos as u128;
568                let total_bytes = entry.meta_len + entry.bin_len;
569                all.push((
570                    entry.meta_path.clone(),
571                    entry.bin_path.clone(),
572                    total_bytes,
573                    write_nanos,
574                    entry.meta.accessed,
575                ));
576            }
577            // Sweep now-empty key dirs.
578            if scanned.is_empty()
579                && fs::read_dir(&key_dir)
580                    .map(|mut it| it.next().is_none())
581                    .unwrap_or(false)
582            {
583                fs::remove_dir(&key_dir).ok();
584                if let Ok(mut index) = self.index.lock() {
585                    index.by_key_dir.remove(&key_dir);
586                }
587            }
588        }
589        let total: u64 = all.iter().map(|e| e.2).sum();
590        if total <= self.opts.size_budget_bytes {
591            // Resync the approximate byte counter even when no eviction was
592            // needed. Otherwise the in-memory `byte_total` only grows (it
593            // never observes deletions made by sibling processes or
594            // expiration sweeps), so after enough saves `new_total` exceeds
595            // the budget on every call and triggers a full directory walk
596            // on every save instead of every Nth save.
597            self.byte_total.store(total, Ordering::Relaxed);
598            // Record the root mtime observed by this completed under-budget
599            // sweep so a subsequent throttled sweep can short-circuit while
600            // the root is unchanged. Re-read after the walk: any key dir the
601            // walk removed (empty-dir sweep above) bumps the root mtime, and
602            // capturing the post-walk value keeps the gate from skipping a
603            // genuinely-changed root on the next call.
604            if let (Ok(mut last), Some(m)) = (
605                self.last_evict_root_mtime.lock(),
606                fs::metadata(&self.root)
607                    .ok()
608                    .and_then(|m| m.modified().ok()),
609            ) {
610                *last = Some(m);
611            }
612            return Ok(());
613        }
614        all.sort_by(|a, b| {
615            a.4.cmp(&b.4)
616                .then_with(|| a.3.cmp(&b.3))
617                .then_with(|| a.0.cmp(&b.0))
618        });
619        let mut remaining = total;
620        for (meta, bin, bytes, _, _) in all.into_iter() {
621            if remaining <= self.opts.size_budget_bytes {
622                break;
623            }
624            fs::remove_file(&meta).ok();
625            fs::remove_file(&bin).ok();
626            self.metadata_index_remove(&meta);
627            remaining = remaining.saturating_sub(bytes);
628        }
629        // Resync the approximate byte counter to ground truth. Subsequent
630        // saves increment from here until the next sweep.
631        self.byte_total.store(remaining, Ordering::Relaxed);
632        Ok(())
633    }
634}
635
636/// Ensure the key dir exists, write the `.bin` and `.json` temp files, and
637/// atomically rename both into place. Returns the raw `io::Error` (not a
638/// `StoreError`) so the caller can branch on `ErrorKind::NotFound` to retry the
639/// whole sequence after a concurrent eviction removed the dir mid-write
640/// (gam#868). Idempotent across a retry: every path is derived from the caller's
641/// stable args and the temps are rewritten from the in-memory payload, so a
642/// second pass into a freshly recreated dir produces the same final entry.
643///
644/// `.bin` is renamed before `.json` so a meta-pointing-to-missing-bin window is
645/// impossible on the happy path; a reader that catches `.bin`-missing treats the
646/// entry as corrupt and cleans it up.
647struct EntryWrite<'a> {
648    dir: &'a Path,
649    bin_tmp: &'a Path,
650    meta_tmp: &'a Path,
651    payload: &'a [u8],
652    bin_final: &'a Path,
653    meta_final: &'a Path,
654    /// Read the current wall clock as `(unix_secs, subsec_nanos)`. Called
655    /// AFTER the bin write and bin fsync complete (and after the bin rename)
656    /// so the recorded write time tracks when the entry actually becomes
657    /// (nearly) visible, not when `save_overwrite` was first invoked. On
658    /// slow disks the bin fsync can dominate save latency and a pre-write
659    /// stamp would burn TTL the caller never sees.
660    stamp_fn: &'a dyn Fn() -> (u64, u32),
661    /// Build the meta JSON given the captured `(secs, subsec_nanos)`. The
662    /// closure folds those values into `OnDiskMeta` and serializes it.
663    build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
664}
665
666fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
667    // Recreate the dir up front: on the first attempt this is the original
668    // `create_dir_all`; on a retry it re-establishes the dir a sibling
669    // process' eviction removed.
670    fs::create_dir_all(w.dir)?;
671    {
672        let mut f = fs::File::create(w.bin_tmp)?;
673        f.write_all(w.payload)?;
674        f.sync_all().ok();
675    }
676    // Promote the bin first so a crash between the two renames leaves an
677    // orphan .bin (cleaned up by `evict_overflow`) rather than a meta
678    // pointing at a missing .bin (which the reader would mark corrupt).
679    fs::rename(w.bin_tmp, w.bin_final)?;
680    // Stamp the meta AFTER the bin promotion. This is the latest moment the
681    // timestamp can still be inlined into the meta JSON. The remaining gap
682    // before the entry is visible to lookups is one meta write+fsync + the
683    // meta rename + the caller's directory fsync — all bounded, so TTL is
684    // measured from a near-visible moment instead of from the entry to
685    // `save_overwrite`.
686    let (secs, subsec_nanos) = (w.stamp_fn)();
687    let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
688    {
689        let mut f = fs::File::create(w.meta_tmp)?;
690        f.write_all(&meta_json)?;
691        f.sync_all().ok();
692    }
693    if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
694        // Roll back the bin we just promoted to avoid orphaning it, then
695        // surface the error so the caller can retry or fail.
696        fs::remove_file(w.bin_final).ok();
697        return Err(e);
698    }
699    Ok(())
700}
701
702/// Conservative meta-JSON size used by the throttled save counter. Real
703/// meta files run ~250-400 bytes after pretty-printing; overestimating
704/// just means the throttle fires slightly earlier, never later.
705const APPROX_META_BYTES: u64 = 512;
706
707/// How [`WarmStartStore::lookup_with`] ranks candidate entries.
708#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
709enum LookupMode {
710    /// Lowest objective wins; ties to [`entry_better`].
711    Best,
712    /// Newest write wins; objectives ignored.
713    Latest,
714}
715
716impl LookupMode {
717    fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
718        match self {
719            LookupMode::Best => entry_better(candidate, current),
720            LookupMode::Latest => entry_newer(candidate, current),
721        }
722    }
723}
724
725#[derive(Clone, Copy, PartialEq, Eq, Hash)]
726struct LookupCacheKey {
727    fp: Fingerprint,
728    mode: LookupMode,
729}
730
731#[derive(Clone)]
732struct CachedLookup {
733    meta_path: PathBuf,
734    meta_mtime: SystemTime,
735    /// Full-precision nanosecond write timestamp from the on-disk meta,
736    /// kept alongside `entry.written_unix_secs` so the fast path can apply
737    /// the same TTL cutoff as `evict_overflow` without re-reading the JSON.
738    write_nanos: u128,
739    entry: WarmStartEntry,
740}
741
742#[derive(Debug, Default)]
743struct MetadataIndex {
744    by_meta_path: HashMap<PathBuf, IndexedMeta>,
745    /// Per-key-directory cached listing, keyed by the directory's mtime.
746    ///
747    /// A key dir's mtime is bumped by the OS whenever an entry is created,
748    /// renamed, or removed inside it (which is exactly when our entries
749    /// change). So a matching `dir_mtime` means the set of `<runid>.{json,bin}`
750    /// pairs is byte-for-byte what we scanned last time, letting
751    /// [`WarmStartStore::scan_key_dir`] return the cached `Vec<ScannedEntry>`
752    /// without a fresh `read_dir` or any per-file `stat`/JSON read. This is
753    /// what kills the metadata-syscall storm in repeated `lookup_with` /
754    /// `evict_overflow` calls within one fit (gam#1114).
755    by_key_dir: HashMap<PathBuf, ScannedDir>,
756}
757
758#[derive(Debug, Clone)]
759struct IndexedMeta {
760    meta_mtime: SystemTime,
761    meta_len: u64,
762    bin_len: u64,
763    meta: OnDiskMeta,
764}
765
766impl IndexedMeta {
767    fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
768        meta_md.modified().ok() == Some(self.meta_mtime)
769            && meta_md.len() == self.meta_len
770            && bin_md.len() == self.bin_len
771    }
772}
773
774/// Cached result of scanning one key directory: its mtime at scan time plus
775/// the resolved entries. Reused verbatim while the dir's mtime is unchanged.
776#[derive(Debug, Clone)]
777struct ScannedDir {
778    dir_mtime: SystemTime,
779    entries: Vec<ScannedEntry>,
780}
781
782/// One resolved `(meta, bin)` pair discovered during a key-dir scan. Carries
783/// everything both the lookup ranker and the eviction sweep need so neither
784/// has to re-`stat` or re-read the files when the dir is unchanged.
785#[derive(Debug, Clone)]
786struct ScannedEntry {
787    meta_path: PathBuf,
788    bin_path: PathBuf,
789    meta_len: u64,
790    bin_len: u64,
791    meta_mtime: Option<SystemTime>,
792    bin_mtime: Option<SystemTime>,
793    meta: OnDiskMeta,
794}
795
796impl ScannedEntry {
797    fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
798        meta_md.len() == self.meta_len
799            && bin_md.len() == self.bin_len
800            && meta_md.modified().ok() == self.meta_mtime
801            && bin_md.modified().ok() == self.bin_mtime
802    }
803}
804
805/// True iff a meta with the given (`secs`, `nanos`) write timestamp is older
806/// than `ttl` relative to `now_nanos`. Mirrors the cutoff in
807/// [`WarmStartStore::evict_overflow`] so `lookup_with` cannot return an entry
808/// that the eviction sweep would have dropped.
809/// TTL expiry test. `activity_nanos` is the entry's effective activity stamp
810/// (`meta_activity_nanos`: the more recent of creation and last access), so a
811/// reused entry's TTL restarts from its last lookup rather than its creation.
812const fn meta_expired(activity_nanos: u128, ttl: Duration, now_nanos: u128) -> bool {
813    let ttl_nanos = ttl.as_nanos();
814    if ttl_nanos == 0 {
815        return false;
816    }
817    now_nanos.saturating_sub(activity_nanos) >= ttl_nanos
818}
819
820/// Process-wide in-memory cache for [`WarmStartStore::lookup_with`]. Hot poll
821/// loops hit the same (key, mode) repeatedly between writes, so caching the
822/// resolved entry behind an mtime check eliminates the per-call directory
823/// walk, JSON parse, and SHA-256 recomputation. Mtime mismatch — including
824/// writes from a sibling process — invalidates the row and falls back to
825/// the full slow path.
826fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
827    static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
828    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
829}
830
831const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
832const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
833
834const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
835    std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
836}
837
838fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
839    let guard = lookup_cache().lock().ok()?;
840    guard.get(key).cloned()
841}
842
843fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
844    if let Ok(mut guard) = lookup_cache().lock() {
845        let new_bytes = cached_lookup_resident_bytes(&val);
846        if new_bytes > LOOKUP_CACHE_MAX_BYTES {
847            return;
848        }
849        let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
850        if let Some(old) = guard.remove(&key) {
851            resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
852        }
853        while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
854            || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
855        {
856            let oldest = guard
857                .iter()
858                .min_by_key(|(_, cached)| cached.write_nanos)
859                .map(|(old_key, _)| *old_key);
860            let Some(oldest) = oldest else {
861                break;
862            };
863            if let Some(old) = guard.remove(&oldest) {
864                resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
865            }
866        }
867        guard.insert(key, val);
868    }
869}
870
871fn lookup_cache_invalidate(key: &LookupCacheKey) {
872    if let Ok(mut guard) = lookup_cache().lock() {
873        guard.remove(key);
874    }
875}
876
877/// Run a full [`WarmStartStore::evict_overflow`] sweep every Nth save. The
878/// budget can briefly overshoot by K-1 payloads, which the next sweep
879/// reclaims. K=32 keeps the amortized cost negligible on hot checkpoint
880/// paths while still bounding worst-case disk drift.
881const EVICT_EVERY_N_SAVES: u64 = 32;
882
883fn parse_tmp_pid(name: &str) -> Option<u32> {
884    // Names look like "<runid>.bin.tmp.<pid>.<nonce>.<attempt>" or
885    // "<runid>.json.tmp.<pid>.<nonce>.<attempt>" (the trailing retry-attempt
886    // suffix is irrelevant here — only the first token after ".tmp." is the pid).
887    let tail = name.split(".tmp.").nth(1)?;
888    let pid_str = tail.split('.').next()?;
889    pid_str.parse::<u32>().ok()
890}
891
892fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
893    let bytes = fs::read(path)?;
894    let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
895    Ok(parsed)
896}
897
898fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
899    match (candidate.objective, current.objective) {
900        (Some(c), Some(d)) => {
901            if (c - d).abs() < 1e-12 {
902                match (candidate.kind, current.kind) {
903                    (EntryKind::Final, EntryKind::Checkpoint) => true,
904                    (EntryKind::Checkpoint, EntryKind::Final) => false,
905                    _ => entry_newer(candidate, current),
906                }
907            } else {
908                c < d
909            }
910        }
911        (Some(_), None) => true,
912        (None, Some(_)) => false,
913        (None, None) => entry_newer(candidate, current),
914    }
915}
916
917fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
918    let candidate_stamp = (
919        candidate.written_unix_secs,
920        candidate.written_nanos,
921        candidate_kind_rank(candidate.kind),
922    );
923    let current_stamp = (
924        current.written_unix_secs,
925        current.written_nanos,
926        candidate_kind_rank(current.kind),
927    );
928    candidate_stamp > current_stamp
929}
930
931const fn candidate_kind_rank(kind: EntryKind) -> u8 {
932    match kind {
933        EntryKind::Checkpoint => 0,
934        EntryKind::Final => 1,
935    }
936}
937
938fn checksum_hex(payload: &[u8]) -> String {
939    let mut h = Sha256::new();
940    h.update(payload);
941    let out = h.finalize();
942    let mut s = String::with_capacity(out.len() * 2);
943    for b in out.iter() {
944        use std::fmt::Write;
945        write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
946    }
947    s
948}
949
950impl WarmStartStore {
951    fn touch_lookup_hit(
952        &self,
953        meta_path: &Path,
954        entry: WarmStartEntry,
955    ) -> Result<WarmStartEntry, StoreError> {
956        let meta = read_meta(meta_path)?;
957        let (_meta, entry) = self.touch_lookup_meta(meta_path, meta, entry)?;
958        Ok(entry)
959    }
960
961    fn touch_lookup_meta(
962        &self,
963        meta_path: &Path,
964        mut meta: OnDiskMeta,
965        entry: WarmStartEntry,
966    ) -> Result<(OnDiskMeta, WarmStartEntry), StoreError> {
967        let now = self.nanos_now();
968        // Refresh the ACCESS stamp (TTL clock), never the creation stamp: the
969        // creation stamp is the ordering key for `lookup_latest`, so bumping it
970        // on a read would make a merely-read entry win "latest" over a strictly
971        // newer write. Advance strictly past the previous access stamp so a
972        // second touch inside the same nanosecond still moves forward.
973        let old_access =
974            (meta.accessed_unix_secs as u128) * 1_000_000_000u128 + meta.accessed_nanos as u128;
975        let touched = now.max(old_access.saturating_add(1));
976        meta.accessed_unix_secs = (touched / 1_000_000_000u128) as u64;
977        meta.accessed_nanos = (touched % 1_000_000_000u128) as u32;
978        meta.accessed = true;
979        let json = serde_json::to_vec_pretty(&meta)?;
980        let tmp = meta_path.with_extension(format!(
981            "json.touch.tmp.{}.{}",
982            std::process::id(),
983            self.nanos_now()
984        ));
985        {
986            let mut f = fs::File::create(&tmp)?;
987            f.write_all(&json)?;
988            f.sync_all()?;
989        }
990        fs::rename(&tmp, meta_path)?;
991        if let Some(dir) = meta_path.parent()
992            && let Ok(d) = fs::File::open(dir)
993        {
994            d.sync_all().ok();
995        }
996        self.metadata_index_remove(meta_path);
997        // `entry.written_unix_secs` intentionally keeps the immutable creation
998        // stamp — the touch above only advanced the access clock.
999        Ok((meta, entry))
1000    }
1001
1002    fn read_meta_indexed(
1003        &self,
1004        path: &Path,
1005        meta_md: &fs::Metadata,
1006        bin_md: &fs::Metadata,
1007    ) -> Result<OnDiskMeta, StoreError> {
1008        if let Ok(index) = self.index.lock()
1009            && let Some(cached) = index.by_meta_path.get(path)
1010            && cached.matches(meta_md, bin_md)
1011        {
1012            return Ok(cached.meta.clone());
1013        }
1014
1015        let meta = read_meta(path)?;
1016        let Some(meta_mtime) = meta_md.modified().ok() else {
1017            return Ok(meta);
1018        };
1019        if let Ok(mut index) = self.index.lock() {
1020            index.by_meta_path.insert(
1021                path.to_path_buf(),
1022                IndexedMeta {
1023                    meta_mtime,
1024                    meta_len: meta_md.len(),
1025                    bin_len: bin_md.len(),
1026                    meta: meta.clone(),
1027                },
1028            );
1029        }
1030        Ok(meta)
1031    }
1032
1033    fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
1034        let meta_md = fs::metadata(meta_path)?;
1035        let bin_md = fs::metadata(bin_path)?;
1036        self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
1037        // A fresh entry just landed in this key dir, so any cached listing for
1038        // the dir is stale. Drop it; the next scan rebuilds and re-caches.
1039        if let Some(parent) = meta_path.parent()
1040            && let Ok(mut index) = self.index.lock()
1041        {
1042            index.by_key_dir.remove(parent);
1043        }
1044        Ok(())
1045    }
1046
1047    fn metadata_index_remove(&self, meta_path: &Path) {
1048        if let Ok(mut index) = self.index.lock() {
1049            index.by_meta_path.remove(meta_path);
1050            if let Some(parent) = meta_path.parent() {
1051                index.by_key_dir.remove(parent);
1052            }
1053        }
1054    }
1055
1056    fn metadata_index_remove_key(&self, key: &Fingerprint) {
1057        let dir = self.key_dir(key);
1058        if let Ok(mut index) = self.index.lock() {
1059            index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
1060            index.by_key_dir.remove(&dir);
1061        }
1062    }
1063
1064    /// Cached listing lookup for one key directory.
1065    ///
1066    /// Returns the cached `Vec<ScannedEntry>` if the directory's current mtime
1067    /// matches the cached scan (no entry added/removed since), otherwise
1068    /// `None` so the caller performs a fresh scan via [`Self::scan_key_dir`].
1069    ///
1070    /// A matching dir mtime guarantees the *set* of files is unchanged, but TTL
1071    /// is wall-clock relative, so an entry valid at scan time can expire while
1072    /// the listing is still cached. The caller re-applies the TTL cutoff to the
1073    /// returned entries; this only proves the file set is stable.
1074    fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
1075        let dir_mtime = dir_md.modified().ok()?;
1076        let index = self.index.lock().ok()?;
1077        let cached = index.by_key_dir.get(dir)?;
1078        if cached.dir_mtime != dir_mtime {
1079            return None;
1080        }
1081        for entry in &cached.entries {
1082            let meta_md = fs::metadata(&entry.meta_path).ok()?;
1083            let bin_md = fs::metadata(&entry.bin_path).ok()?;
1084            if !entry.matches_files(&meta_md, &bin_md) {
1085                return None;
1086            }
1087        }
1088        Some(cached.entries.clone())
1089    }
1090
1091    fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
1092        if let Ok(mut index) = self.index.lock() {
1093            index.by_key_dir.insert(
1094                dir.to_path_buf(),
1095                ScannedDir {
1096                    dir_mtime,
1097                    entries: entries.to_vec(),
1098                },
1099            );
1100        }
1101    }
1102
1103    /// Scan one key directory, resolving every valid `(meta, bin)` pair and
1104    /// cleaning up corrupt / orphaned / schema-mismatched files in passing.
1105    ///
1106    /// Serves both [`Self::lookup_with`] and [`Self::evict_overflow`]: when the
1107    /// directory's mtime is unchanged since the previous scan it returns the
1108    /// cached listing without a single `read_dir`, `metadata`, or JSON read —
1109    /// the metadata-syscall storm that #1114 traced. A fresh scan re-caches the
1110    /// listing keyed by the dir mtime observed *after* any cleanup, so a later
1111    /// unchanged call hits the cache. (`now_nanos` drives the TTL drop; expired
1112    /// entries are removed and excluded from the result.)
1113    ///
1114    /// `.tmp.*` files belonging to other processes are swept; same-PID temps
1115    /// (in-flight writes from us) are left alone.
1116    fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1117        let dir_md = match fs::metadata(dir) {
1118            Ok(m) => m,
1119            Err(_) => return Vec::new(),
1120        };
1121        if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1122            // The file set is unchanged, but TTL is wall-clock relative: an
1123            // entry valid when scanned may have expired since. Re-apply the
1124            // cutoff against `now_nanos`, removing any that crossed it. If none
1125            // expired we return the cached listing untouched (the fast path);
1126            // otherwise the removals bump the dir mtime, so we drop the stale
1127            // cache and re-cache the survivors keyed by the post-removal mtime.
1128            let any_expired = cached
1129                .iter()
1130                .any(|e| meta_expired(meta_activity_nanos(&e.meta), self.opts.ttl, now_nanos));
1131            if !any_expired {
1132                return cached;
1133            }
1134            let mut survivors = Vec::with_capacity(cached.len());
1135            for entry in cached {
1136                if meta_expired(meta_activity_nanos(&entry.meta), self.opts.ttl, now_nanos) {
1137                    fs::remove_file(&entry.meta_path).ok();
1138                    fs::remove_file(&entry.bin_path).ok();
1139                    self.metadata_index_remove(&entry.meta_path);
1140                } else {
1141                    survivors.push(entry);
1142                }
1143            }
1144            if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1145                self.store_dir_scan(dir, mtime, &survivors);
1146            }
1147            return survivors;
1148        }
1149        let read_dir = match fs::read_dir(dir) {
1150            Ok(rd) => rd,
1151            Err(_) => return Vec::new(),
1152        };
1153        let mut entries = Vec::new();
1154        let mut mutated = false;
1155        for f in read_dir {
1156            let path = match f {
1157                Ok(e) => e.path(),
1158                Err(_) => continue,
1159            };
1160            let name = match path.file_name().and_then(|s| s.to_str()) {
1161                Some(s) => s,
1162                None => continue,
1163            };
1164            if name.contains(".tmp.") {
1165                if let Some(pid) = parse_tmp_pid(name)
1166                    && pid != std::process::id()
1167                {
1168                    fs::remove_file(&path).ok();
1169                    mutated = true;
1170                }
1171                continue;
1172            }
1173            if path.extension().and_then(|s| s.to_str()) != Some("json") {
1174                continue;
1175            }
1176            let meta_md = match fs::metadata(&path) {
1177                Ok(m) => m,
1178                Err(_) => continue,
1179            };
1180            let bin = path.with_extension("bin");
1181            let bin_md = match fs::metadata(&bin) {
1182                Ok(m) => m,
1183                Err(_) => {
1184                    fs::remove_file(&path).ok();
1185                    self.metadata_index_remove(&path);
1186                    mutated = true;
1187                    continue;
1188                }
1189            };
1190            let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1191                Ok(m) => m,
1192                Err(_) => {
1193                    fs::remove_file(&path).ok();
1194                    fs::remove_file(&bin).ok();
1195                    self.metadata_index_remove(&path);
1196                    mutated = true;
1197                    continue;
1198                }
1199            };
1200            if meta.schema_version != SCHEMA_VERSION {
1201                fs::remove_file(&path).ok();
1202                fs::remove_file(&bin).ok();
1203                self.metadata_index_remove(&path);
1204                mutated = true;
1205                continue;
1206            }
1207            if meta_expired(meta_activity_nanos(&meta), self.opts.ttl, now_nanos) {
1208                fs::remove_file(&path).ok();
1209                fs::remove_file(&bin).ok();
1210                self.metadata_index_remove(&path);
1211                mutated = true;
1212                continue;
1213            }
1214            entries.push(ScannedEntry {
1215                meta_path: path,
1216                bin_path: bin,
1217                meta_len: meta_md.len(),
1218                bin_len: bin_md.len(),
1219                meta_mtime: meta_md.modified().ok(),
1220                bin_mtime: bin_md.modified().ok(),
1221                meta,
1222            });
1223        }
1224        // Cache keyed by the mtime *after* any cleanup so the next unchanged
1225        // call is a cache hit. If cleanup mutated the dir, re-stat to capture
1226        // the post-mutation mtime; otherwise reuse the mtime we already read.
1227        let final_mtime = if mutated {
1228            fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1229        } else {
1230            dir_md.modified().ok()
1231        };
1232        if let Some(mtime) = final_mtime {
1233            self.store_dir_scan(dir, mtime, &entries);
1234        }
1235        entries
1236    }
1237
1238    fn test_time_offset_ns(&self) -> u64 {
1239        self.test_time_offset_ns.load(Ordering::Relaxed)
1240    }
1241
1242    fn unix_now_parts(&self) -> (u64, u32) {
1243        let base = SystemTime::now()
1244            .duration_since(UNIX_EPOCH)
1245            .map(|d| d.as_nanos())
1246            .unwrap_or(0);
1247        let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1248        let secs = (total / 1_000_000_000u128) as u64;
1249        let nanos = (total % 1_000_000_000u128) as u32;
1250        (secs, nanos)
1251    }
1252
1253    fn nanos_now(&self) -> u128 {
1254        let base = SystemTime::now()
1255            .duration_since(UNIX_EPOCH)
1256            .map(|d| d.as_nanos())
1257            .unwrap_or(0);
1258        base.saturating_add(u128::from(self.test_time_offset_ns()))
1259    }
1260
1261    fn fresh_run_id(&self) -> String {
1262        let pid = std::process::id();
1263        let nanos = self.nanos_now();
1264        format!("r{pid:x}-{nanos:x}")
1265    }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270    use super::*;
1271    use crate::warm_start::key::Fingerprinter;
1272
1273    impl WarmStartStore {
1274        /// Advance this store's simulated monotonic clock by `dur`. Only
1275        /// available in tests — production code reads the real wall clock and
1276        /// never mutates the per-store offset.
1277        fn test_advance_time(&self, dur: Duration) {
1278            self.test_time_offset_ns
1279                .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1280        }
1281    }
1282
1283    fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1284        let dir = tempfile::tempdir().unwrap();
1285        let store = WarmStartStore::open(
1286            dir.path().to_path_buf(),
1287            StoreOptions {
1288                size_budget_bytes: 1024 * 1024,
1289                ttl: Duration::from_secs(60),
1290            },
1291        )
1292        .unwrap();
1293        (dir, store)
1294    }
1295
1296    fn key_for(s: &str) -> Fingerprint {
1297        let mut fp = Fingerprinter::new();
1298        fp.absorb_str(b"test", s);
1299        fp.finalize()
1300    }
1301
1302    #[test]
1303    fn roundtrip_save_then_lookup() {
1304        let (_d, store) = temp_store();
1305        let key = key_for("roundtrip");
1306        store
1307            .save(
1308                &key,
1309                b"hello-warm",
1310                Some(1.5),
1311                Some(7),
1312                EntryKind::Checkpoint,
1313            )
1314            .unwrap();
1315        let got = store.lookup(&key).unwrap().unwrap();
1316        assert_eq!(got.payload, b"hello-warm");
1317        assert_eq!(got.objective, Some(1.5));
1318        assert_eq!(got.iteration, Some(7));
1319        assert_eq!(got.kind, EntryKind::Checkpoint);
1320    }
1321
1322    #[test]
1323    fn lookup_picks_lowest_objective() {
1324        let (_d, store) = temp_store();
1325        let key = key_for("multi");
1326        store
1327            .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1328            .unwrap();
1329        store
1330            .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1331            .unwrap();
1332        store
1333            .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1334            .unwrap();
1335        let got = store.lookup(&key).unwrap().unwrap();
1336        assert_eq!(got.payload, b"better");
1337        assert_eq!(got.objective, Some(1.0));
1338    }
1339
1340    #[test]
1341    fn lookup_latest_ignores_objective_ordering() {
1342        let (_d, store) = temp_store();
1343        let key = key_for("latest-vs-best");
1344        store
1345            .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1346            .unwrap();
1347        store.test_advance_time(Duration::from_millis(2));
1348        store
1349            .save(
1350                &key,
1351                b"newer-higher-objective",
1352                Some(10.0),
1353                Some(2),
1354                EntryKind::Checkpoint,
1355            )
1356            .unwrap();
1357
1358        let best = store.lookup(&key).unwrap().unwrap();
1359        assert_eq!(best.payload, b"low-objective");
1360
1361        let latest = store.lookup_latest(&key).unwrap().unwrap();
1362        assert_eq!(latest.payload, b"newer-higher-objective");
1363        assert_eq!(latest.iteration, Some(2));
1364    }
1365
1366    #[test]
1367    fn tiebreak_final_beats_checkpoint() {
1368        let (_d, store) = temp_store();
1369        let key = key_for("tie");
1370        store
1371            .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1372            .unwrap();
1373        // Same objective, different kind.
1374        store
1375            .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1376            .unwrap();
1377        let got = store.lookup(&key).unwrap().unwrap();
1378        assert_eq!(got.payload, b"final");
1379        assert_eq!(got.kind, EntryKind::Final);
1380    }
1381
1382    #[test]
1383    fn tiebreak_latest_mtime_when_no_objective() {
1384        let (_d, store) = temp_store();
1385        let key = key_for("latest");
1386        store
1387            .save(&key, b"first", None, None, EntryKind::Checkpoint)
1388            .unwrap();
1389        store.test_advance_time(Duration::from_millis(1_100));
1390        store
1391            .save(&key, b"second", None, None, EntryKind::Checkpoint)
1392            .unwrap();
1393        let got = store.lookup(&key).unwrap().unwrap();
1394        assert_eq!(got.payload, b"second");
1395    }
1396
1397    #[test]
1398    fn corrupt_payload_is_cleaned_up() {
1399        let (_d, store) = temp_store();
1400        let key = key_for("corrupt");
1401        store
1402            .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1403            .unwrap();
1404        // Tamper with the .bin file.
1405        let dir = store.key_dir(&key);
1406        for entry in fs::read_dir(&dir).unwrap() {
1407            let p = entry.unwrap().path();
1408            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1409                fs::write(&p, b"tampered!").unwrap();
1410            }
1411        }
1412        let got = store.lookup(&key).unwrap();
1413        assert!(got.is_none(), "tampered entry must be rejected");
1414        // The corrupt files should be cleaned up so they don't accumulate.
1415        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1416        assert!(remaining.is_empty(), "corrupt entry should be removed");
1417    }
1418
1419    #[test]
1420    fn corrupt_meta_json_is_cleaned_up() {
1421        let (_d, store) = temp_store();
1422        let key = key_for("badjson");
1423        store
1424            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1425            .unwrap();
1426        let dir = store.key_dir(&key);
1427        for entry in fs::read_dir(&dir).unwrap() {
1428            let p = entry.unwrap().path();
1429            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1430                fs::write(&p, b"{not valid json").unwrap();
1431            }
1432        }
1433        let got = store.lookup(&key).unwrap();
1434        assert!(got.is_none());
1435    }
1436
1437    #[test]
1438    fn schema_mismatched_entry_is_cleaned_up() {
1439        let (_d, store) = temp_store();
1440        let key = key_for("schema");
1441        store
1442            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1443            .unwrap();
1444        let dir = store.key_dir(&key);
1445        for entry in fs::read_dir(&dir).unwrap() {
1446            let p = entry.unwrap().path();
1447            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1448                let raw = fs::read(&p).unwrap();
1449                let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1450                parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1451                fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1452            }
1453        }
1454        assert!(store.lookup(&key).unwrap().is_none());
1455        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1456        assert!(
1457            remaining.is_empty(),
1458            "schema-mismatched entry should be removed"
1459        );
1460    }
1461
1462    #[test]
1463    fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1464        let dir = tempfile::tempdir().unwrap();
1465        let store = WarmStartStore::open(
1466            dir.path().to_path_buf(),
1467            StoreOptions {
1468                size_budget_bytes: 6 * 1024,
1469                ttl: Duration::from_secs(3600),
1470            },
1471        )
1472        .unwrap();
1473        let stale_key = key_for("schema-size-stale");
1474        store
1475            .save(
1476                &stale_key,
1477                &vec![0u8; 4 * 1024],
1478                None,
1479                None,
1480                EntryKind::Checkpoint,
1481            )
1482            .unwrap();
1483
1484        let stale_dir = store.key_dir(&stale_key);
1485        let mut stale_meta = None;
1486        let mut stale_bin = None;
1487        for entry in fs::read_dir(&stale_dir).unwrap() {
1488            let p = entry.unwrap().path();
1489            match p.extension().and_then(|s| s.to_str()) {
1490                Some("json") => {
1491                    let raw = fs::read(&p).unwrap();
1492                    let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1493                    parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1494                    fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1495                    stale_meta = Some(p);
1496                }
1497                Some("bin") => stale_bin = Some(p),
1498                _ => {}
1499            }
1500        }
1501        let stale_meta = stale_meta.expect("saved entry should have metadata");
1502        let stale_bin = stale_bin.expect("saved entry should have payload");
1503
1504        let fresh_key = key_for("schema-size-fresh");
1505        store
1506            .save(
1507                &fresh_key,
1508                &vec![1u8; 2 * 1024],
1509                None,
1510                None,
1511                EntryKind::Checkpoint,
1512            )
1513            .unwrap();
1514
1515        assert!(
1516            !stale_meta.exists(),
1517            "schema-mismatched metadata should be removed during eviction scan"
1518        );
1519        assert!(
1520            !stale_bin.exists(),
1521            "schema-mismatched payload should be removed during eviction scan"
1522        );
1523
1524        let mut total = 0u64;
1525        for key_dir in fs::read_dir(store.root()).unwrap() {
1526            let key_dir = key_dir.unwrap().path();
1527            if key_dir.is_dir() {
1528                for entry in fs::read_dir(key_dir).unwrap() {
1529                    total += fs::metadata(entry.unwrap().path()).unwrap().len();
1530                }
1531            }
1532        }
1533        assert!(
1534            total <= store.options().size_budget_bytes,
1535            "schema-mismatched bytes must not leak past size accounting (got {total})"
1536        );
1537        assert!(store.lookup(&stale_key).unwrap().is_none());
1538        assert!(store.lookup(&fresh_key).unwrap().is_some());
1539    }
1540
1541    #[test]
1542    fn missing_bin_treated_as_missing() {
1543        let (_d, store) = temp_store();
1544        let key = key_for("nobin");
1545        store
1546            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1547            .unwrap();
1548        let dir = store.key_dir(&key);
1549        for entry in fs::read_dir(&dir).unwrap() {
1550            let p = entry.unwrap().path();
1551            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1552                fs::remove_file(&p).unwrap();
1553            }
1554        }
1555        assert!(store.lookup(&key).unwrap().is_none());
1556    }
1557
1558    #[test]
1559    fn missing_key_returns_none() {
1560        let (_d, store) = temp_store();
1561        let key = key_for("absent");
1562        assert!(store.lookup(&key).unwrap().is_none());
1563    }
1564
1565    #[test]
1566    fn lru_eviction_under_size_budget() {
1567        let dir = tempfile::tempdir().unwrap();
1568        // Tiny budget: 4 KiB. Each entry payload + meta JSON is ~600 B.
1569        let store = WarmStartStore::open(
1570            dir.path().to_path_buf(),
1571            StoreOptions {
1572                size_budget_bytes: 4 * 1024,
1573                ttl: Duration::from_secs(3600),
1574            },
1575        )
1576        .unwrap();
1577        let mut keys = Vec::new();
1578        for i in 0..20 {
1579            let mut fp = Fingerprinter::new();
1580            fp.absorb_u64(b"i", i);
1581            let key = fp.finalize();
1582            keys.push(key);
1583            let payload = vec![0u8; 256];
1584            store
1585                .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1586                .unwrap();
1587        }
1588        // Walk the store root and confirm total bytes is bounded.
1589        let mut total = 0u64;
1590        for kd in fs::read_dir(store.root()).unwrap() {
1591            let kd = kd.unwrap().path();
1592            if kd.is_dir() {
1593                for f in fs::read_dir(&kd).unwrap() {
1594                    total += fs::metadata(f.unwrap().path()).unwrap().len();
1595                }
1596            }
1597        }
1598        assert!(
1599            total <= 8 * 1024,
1600            "eviction failed to bound size (got {total})"
1601        );
1602        // Earliest keys must have been evicted; latest survive.
1603        assert!(store.lookup(&keys[0]).unwrap().is_none());
1604        assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1605    }
1606
1607    #[test]
1608    fn ttl_drops_old_entries() {
1609        // Expiration is driven by `test_advance_time` (additive simulated time
1610        // on top of the wall clock), so the TTL itself only needs to be larger
1611        // than any plausible save→lookup wall-time on the CI runner. The
1612        // 1-second TTL the original fixture used was tighter than the worst
1613        // ext4 fsync this image sees (see `save_overwrite`'s late-stamp
1614        // comment), so the first `is_some()` check would flake to "expired"
1615        // before any time advance ever ran. 60 s clears that race with margin.
1616        let dir = tempfile::tempdir().unwrap();
1617        let ttl = Duration::from_secs(60);
1618        let store = WarmStartStore::open(
1619            dir.path().to_path_buf(),
1620            StoreOptions {
1621                size_budget_bytes: 1024 * 1024,
1622                ttl,
1623            },
1624        )
1625        .unwrap();
1626        let key = key_for("ttl");
1627        store
1628            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1629            .unwrap();
1630        assert!(store.lookup(&key).unwrap().is_some());
1631        store.test_advance_time(ttl + Duration::from_secs(5));
1632        // Trigger eviction via a save under an unrelated key.
1633        let other = key_for("ttl-other");
1634        store
1635            .save(&other, b"y", None, None, EntryKind::Checkpoint)
1636            .unwrap();
1637        // Original now expired.
1638        assert!(store.lookup(&key).unwrap().is_none());
1639        assert!(store.lookup(&other).unwrap().is_some());
1640    }
1641
1642    #[test]
1643    fn orphan_temp_files_from_dead_processes_are_swept() {
1644        let (_d, store) = temp_store();
1645        let key = key_for("tmp");
1646        let dir = store.key_dir(&key);
1647        fs::create_dir_all(&dir).unwrap();
1648        // Use PID 1 — never the current process, so it counts as "other".
1649        let orphan_other = dir.join("r0-0.json.tmp.1.0");
1650        let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1651        fs::write(&orphan_other, b"orphan").unwrap();
1652        fs::write(&mine, b"mine").unwrap();
1653        store.evict_overflow().unwrap();
1654        assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1655        assert!(mine.exists(), "same-PID tmp file must be left alone");
1656    }
1657
1658    #[test]
1659    fn tmp_filenames_without_pid_are_skipped() {
1660        // Malformed tmp names (no parseable pid) must not crash the sweep.
1661        let (_d, store) = temp_store();
1662        let key = key_for("malformed");
1663        let dir = store.key_dir(&key);
1664        fs::create_dir_all(&dir).unwrap();
1665        let weird = dir.join("garbage.tmp.notapid.suffix");
1666        fs::write(&weird, b"x").unwrap();
1667        // Must not panic.
1668        store.evict_overflow().unwrap();
1669        assert!(weird.exists());
1670    }
1671
1672    #[test]
1673    fn save_overwrite_keeps_single_entry() {
1674        let (_d, store) = temp_store();
1675        let key = key_for("overwrite");
1676        let id = store
1677            .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1678            .unwrap();
1679        store
1680            .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1681            .unwrap();
1682        // Only one (meta, bin) pair on disk.
1683        let dir = store.key_dir(&key);
1684        let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1685        assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1686        let got = store.lookup(&key).unwrap().unwrap();
1687        assert_eq!(got.payload, b"v2");
1688        assert_eq!(got.objective, Some(1.0));
1689    }
1690
1691    #[test]
1692    fn write_and_promote_recreates_dir_removed_before_write() {
1693        // gam#868: a sibling process' eviction can `remove_dir` the key dir the
1694        // instant it observes it empty, racing every write step in `save`. The
1695        // promote helper must recreate the dir rather than failing with ENOENT.
1696        let (_d, store) = temp_store();
1697        let key = key_for("race-recreate");
1698        let dir = store.key_dir(&key);
1699        // Dir does NOT exist yet (simulates eviction having removed it after a
1700        // prior `create_dir_all`). The helper must create it and succeed.
1701        assert!(!dir.exists());
1702        let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1703        let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1704        let bin_final = dir.join("r0.bin");
1705        let meta_final = dir.join("r0.json");
1706        let stamp_fn = || (0u64, 0u32);
1707        let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1708        write_and_promote_entry(&EntryWrite {
1709            dir: &dir,
1710            bin_tmp: &bin_tmp,
1711            meta_tmp: &meta_tmp,
1712            payload: b"payload",
1713            bin_final: &bin_final,
1714            meta_final: &meta_final,
1715            stamp_fn: &stamp_fn,
1716            build_meta_json: &build_meta_json,
1717        })
1718        .expect("promote into a missing dir must recreate it and succeed");
1719        assert!(bin_final.exists() && meta_final.exists());
1720        assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1721    }
1722
1723    #[test]
1724    fn save_survives_concurrent_eviction_removing_key_dir() {
1725        // gam#868 end-to-end: hammer the same key with concurrent saves while a
1726        // sibling thread repeatedly runs `evict_overflow` (which `remove_dir`s
1727        // now-empty key dirs). Before the atomic-retry fix, a save whose
1728        // `create_dir_all`→write/rename window straddled a `remove_dir` failed
1729        // with `io: No such file or directory (os error 2)`. Every save must now
1730        // succeed; we assert no save returns an error.
1731        use std::sync::Arc;
1732        use std::sync::atomic::AtomicBool;
1733
1734        let dir = tempfile::tempdir().unwrap();
1735        // Zero size budget forces `evict_overflow` to delete entries (and then
1736        // sweep the emptied key dir) on essentially every sweep, maximizing the
1737        // race window.
1738        let store = Arc::new(
1739            WarmStartStore::open(
1740                dir.path().to_path_buf(),
1741                StoreOptions {
1742                    size_budget_bytes: 0,
1743                    ttl: Duration::from_secs(60),
1744                },
1745            )
1746            .unwrap(),
1747        );
1748        let key = key_for("concurrent-evict");
1749        let stop = Arc::new(AtomicBool::new(false));
1750
1751        let evictor = {
1752            let store = Arc::clone(&store);
1753            let stop = Arc::clone(&stop);
1754            std::thread::spawn(move || {
1755                while !stop.load(Ordering::Relaxed) {
1756                    store.evict_overflow().ok();
1757                }
1758            })
1759        };
1760
1761        let writers: Vec<_> = (0..4)
1762            .map(|w| {
1763                let store = Arc::clone(&store);
1764                std::thread::spawn(move || {
1765                    for i in 0..200u32 {
1766                        let payload = format!("w{w}-i{i}");
1767                        store
1768                            .save(
1769                                &key,
1770                                payload.as_bytes(),
1771                                Some(i as f64),
1772                                Some(i as u64),
1773                                EntryKind::Checkpoint,
1774                            )
1775                            .expect("save must not fail with ENOENT under concurrent eviction");
1776                    }
1777                })
1778            })
1779            .collect();
1780
1781        for h in writers {
1782            h.join().unwrap();
1783        }
1784        stop.store(true, Ordering::Relaxed);
1785        evictor.join().unwrap();
1786    }
1787
1788    #[test]
1789    fn keys_are_isolated() {
1790        let (_d, store) = temp_store();
1791        let a = key_for("a");
1792        let b = key_for("b");
1793        store
1794            .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1795            .unwrap();
1796        store
1797            .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1798            .unwrap();
1799        assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1800        assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1801    }
1802}