Skip to main content

gam_runtime/warm_start/
store.rs

1//! Filesystem store for warm-start entries.
2//!
3//! Each entry is a `(<runid>.json, <runid>.bin)` pair inside a per-key
4//! directory. Writes go through a temp-file → fsync → rename sequence so a
5//! crash mid-write never leaves a half-written entry visible to readers.
6//! Per-entry SHA-256 checksums catch any residual corruption.
7//!
8//! See [`crate::warm_start`] for the public API summary.
9
10use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21/// On-disk schema version. Bump on incompatible format changes; old entries
22/// are then ignored at read time and evicted on the next save.
23pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25/// Default disk-budget for the whole warm-start store root (~1 GiB).
26pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28/// Default TTL — entries untouched for this long are dropped.
29pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33    #[error("io: {0}")]
34    Io(#[from] io::Error),
35    #[error("json: {0}")]
36    Json(#[from] serde_json::Error),
37}
38
39/// Entry returned from [`WarmStartStore::lookup`].
40#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42    pub payload: Vec<u8>,
43    pub objective: Option<f64>,
44    pub iteration: Option<u64>,
45    pub written_unix_secs: u64,
46    pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51    /// Mid-fit checkpoint — fit was alive when written.
52    Checkpoint,
53    /// End-of-fit — fit terminated successfully.
54    Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59    schema_version: u32,
60    written_unix_secs: u64,
61    /// Nanosecond component of the write timestamp. Used to break ties in
62    /// LRU eviction so entries written within the same second don't sort
63    /// arbitrarily.
64    #[serde(default)]
65    written_nanos: u32,
66    objective: Option<f64>,
67    iteration: Option<u64>,
68    kind: EntryKind,
69    checksum_hex: String,
70    payload_bytes: u64,
71    /// Set when a lookup has reused this entry. Eviction keeps recently reused
72    /// entries behind never-hit writes when a tight budget forces a choice.
73    #[serde(default)]
74    accessed: bool,
75}
76
77#[derive(Debug, Clone)]
78pub struct StoreOptions {
79    pub size_budget_bytes: u64,
80    pub ttl: Duration,
81}
82
83impl Default for StoreOptions {
84    fn default() -> Self {
85        Self {
86            size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
87            ttl: Duration::from_secs(DEFAULT_TTL_SECS),
88        }
89    }
90}
91
92#[derive(Debug)]
93pub struct WarmStartStore {
94    root: PathBuf,
95    opts: StoreOptions,
96    /// Per-store metadata index. It is populated lazily and shared by clones so
97    /// checkpoint-heavy sessions do not repeatedly open every metadata JSON.
98    index: Arc<Mutex<MetadataIndex>>,
99    /// Approximate sum of bytes written under `root`. Used to throttle the
100    /// full directory-scanning eviction in [`Self::save_overwrite`] — see
101    /// `EVICT_EVERY_N_SAVES`. The counter resyncs to ground truth after every
102    /// triggered sweep. Shared across clones (`Arc`) so the eviction throttle
103    /// survives the per-operation store reuse in
104    /// `solver::persistent_warm_start::persistent_store` — otherwise every
105    /// fit reset the counter and ran a full eviction walk on its first save
106    /// (gam#1114).
107    byte_total: Arc<AtomicU64>,
108    /// Monotonically increasing save counter, shared across clones. Used
109    /// together with `byte_total` to throttle the eviction directory walk.
110    save_counter: Arc<AtomicU64>,
111    /// Root-directory mtime observed at the last completed eviction sweep,
112    /// shared across clones. When a throttled sweep fires while the store is
113    /// comfortably *under* the size budget, the only work left for it is a
114    /// TTL/byte resync over every key dir — an N-dir `read_dir` + `stat` walk
115    /// that, with thousands of fingerprint dirs in a long CI run, dominates
116    /// the per-32-save sweep even after the per-dir listing cache lands (the
117    /// residual #1114 walk). The root dir's mtime is bumped by the OS whenever
118    /// a key dir is created or removed under it, so an unchanged root mtime
119    /// means no key dir was added/dropped since our last sweep; combined with
120    /// a comfortably-under-budget byte total, the size-eviction walk is then a
121    /// guaranteed no-op and is skipped. TTL expiry of *existing* entries does
122    /// not change the root mtime, but it is already performed lazily on every
123    /// `lookup_with` and on the next root-changing save, so skipping it here is
124    /// behaviour-neutral (no entry the gate skips could be returned stale).
125    last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
126    /// Per-store test-only monotonic time offset (nanoseconds) added to every
127    /// `*_now` reading. Always zero in production. Tests mutate it through
128    /// [`Self::test_advance_time`] to simulate elapsed time without
129    /// `thread::sleep`. Lives on the store rather than as a process-wide
130    /// static so parallel tests with their own stores cannot pollute each
131    /// other's clocks — a global clock made `cargo test` non-deterministic
132    /// (gam test infra: one test's +1.5s TTL advance was bumping another
133    /// test's just-saved entry past its 1s TTL on immediate lookup).
134    test_time_offset_ns: AtomicU64,
135}
136
137impl Clone for WarmStartStore {
138    fn clone(&self) -> Self {
139        Self {
140            root: self.root.clone(),
141            opts: self.opts.clone(),
142            index: Arc::clone(&self.index),
143            // Throttle counters are shared across clones so the eviction
144            // directory walk stays throttled to every Nth save across the
145            // whole process, even though `persistent_store` hands out a fresh
146            // clone per save/lookup.
147            byte_total: Arc::clone(&self.byte_total),
148            save_counter: Arc::clone(&self.save_counter),
149            last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
150            test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
151        }
152    }
153}
154
155impl WarmStartStore {
156    /// Open (or create) a store rooted at `root`.
157    pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
158        fs::create_dir_all(&root)?;
159        Ok(Self {
160            root,
161            opts,
162            index: Arc::new(Mutex::new(MetadataIndex::default())),
163            byte_total: Arc::new(AtomicU64::new(0)),
164            save_counter: Arc::new(AtomicU64::new(0)),
165            last_evict_root_mtime: Arc::new(Mutex::new(None)),
166            test_time_offset_ns: AtomicU64::new(0),
167        })
168    }
169
170    pub fn root(&self) -> &Path {
171        &self.root
172    }
173
174    pub fn options(&self) -> &StoreOptions {
175        &self.opts
176    }
177
178    fn key_dir(&self, key: &Fingerprint) -> PathBuf {
179        self.root.join(key.to_hex())
180    }
181
182    /// Look up the best entry for `key`, or `None` if no valid entry exists.
183    ///
184    /// Selection: lowest `objective` first; ties prefer [`EntryKind::Final`]
185    /// over [`EntryKind::Checkpoint`], then latest `written_unix_secs`. If
186    /// every candidate has `objective = None`, picks the latest write.
187    /// Corrupt or schema-mismatched candidates are silently cleaned up and
188    /// skipped.
189    pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
190        self.lookup_with(key, LookupMode::Best)
191    }
192
193    /// Look up the newest valid entry for `key`, or `None` if no valid entry
194    /// exists.
195    ///
196    /// Unlike [`Self::lookup`], this deliberately ignores objective values.
197    /// Use this for near-match seed namespaces where entries may come from
198    /// different folds, diseases, or row sets, and objective magnitudes are
199    /// not comparable. Exact-key resume should keep using [`Self::lookup`].
200    pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
201        self.lookup_with(key, LookupMode::Latest)
202    }
203
204    fn lookup_with(
205        &self,
206        key: &Fingerprint,
207        mode: LookupMode,
208    ) -> Result<Option<WarmStartEntry>, StoreError> {
209        let dir = self.key_dir(key);
210        if !dir.exists() {
211            // A stale in-memory cache entry could outlive its directory if
212            // another process evicted us. Drop it so we don't return data
213            // for a key whose backing files are gone.
214            lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
215            self.metadata_index_remove_key(key);
216            return Ok(None);
217        }
218        // Fast path: if the same (key, mode) was looked up before and the
219        // chosen meta file's mtime is unchanged, return the cached entry
220        // without re-reading any JSON or re-checksumming the .bin payload.
221        // A separate writer (this process or another) bumps mtime on
222        // rename → mismatch → we fall through to the slow path. The TTL
223        // cutoff is also re-checked here against `nanos_now()` so a hot
224        // poll loop cannot keep returning an expired entry between eviction
225        // sweeps (eviction is throttled via `EVICT_EVERY_N_SAVES`).
226        let cache_key = LookupCacheKey { fp: *key, mode };
227        let now_nanos = self.nanos_now();
228        if let Some(hit) = lookup_cache_get(&cache_key) {
229            if let Ok(md) = fs::metadata(&hit.meta_path)
230                && md.modified().ok() == Some(hit.meta_mtime)
231            {
232                let expired = self.opts.ttl.as_nanos() > 0
233                    && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
234                if !expired {
235                    let entry = self.touch_lookup_hit(&hit.meta_path, hit.entry)?;
236                    return Ok(Some(entry));
237                }
238                lookup_cache_invalidate(&cache_key);
239                let bin = hit.meta_path.with_extension("bin");
240                fs::remove_file(&hit.meta_path).ok();
241                fs::remove_file(&bin).ok();
242                // Removing the entry stales any cached directory listing.
243                self.metadata_index_remove(&hit.meta_path);
244                return Ok(None);
245            }
246            lookup_cache_invalidate(&cache_key);
247        }
248        // Resolve all valid entries for this key directory. `scan_key_dir`
249        // serves the listing from the per-store directory cache when the dir's
250        // mtime is unchanged since the last scan (no re-`read_dir`, no per-file
251        // `stat`, no JSON re-parse), and drops TTL-expired / corrupt entries in
252        // passing — exactly the syscall storm #1114 traced.
253        let mut best: Option<(OnDiskMeta, PathBuf)> = None;
254        for scanned in self.scan_key_dir(&dir, now_nanos) {
255            let take = match best {
256                None => true,
257                Some((ref cur, _)) => mode.better(&scanned.meta, cur),
258            };
259            if take {
260                best = Some((scanned.meta, scanned.meta_path));
261            }
262        }
263        let (meta, meta_path) = match best {
264            Some(b) => b,
265            None => {
266                lookup_cache_invalidate(&cache_key);
267                return Ok(None);
268            }
269        };
270        let bin_path = meta_path.with_extension("bin");
271        let payload = match fs::read(&bin_path) {
272            Ok(v) => v,
273            Err(_) => return Ok(None),
274        };
275        // Validate checksum
276        if checksum_hex(&payload) != meta.checksum_hex {
277            fs::remove_file(&meta_path).ok();
278            fs::remove_file(&bin_path).ok();
279            lookup_cache_invalidate(&cache_key);
280            self.metadata_index_remove(&meta_path);
281            return Ok(None);
282        }
283        let entry = WarmStartEntry {
284            payload,
285            objective: meta.objective,
286            iteration: meta.iteration,
287            written_unix_secs: meta.written_unix_secs,
288            kind: meta.kind,
289        };
290        let (meta, entry) = self.touch_lookup_meta(&meta_path, meta, entry)?;
291        // Record (meta_path, mtime) → entry so subsequent identical lookups
292        // short-circuit until the meta file's mtime changes. The full
293        // nanosecond write timestamp is cached alongside so the fast path
294        // can re-apply the TTL cutoff without re-reading the JSON.
295        if let Ok(md) = fs::metadata(&meta_path)
296            && let Ok(mtime) = md.modified()
297        {
298            let write_nanos =
299                (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
300            lookup_cache_insert(
301                cache_key,
302                CachedLookup {
303                    meta_path: meta_path.clone(),
304                    meta_mtime: mtime,
305                    write_nanos,
306                    entry: entry.clone(),
307                },
308            );
309        }
310        Ok(Some(entry))
311    }
312
313    /// Save a new entry with a fresh run-id. Returns the run-id (caller may
314    /// hand it to [`Self::save_overwrite`] for periodic in-place updates).
315    pub fn save(
316        &self,
317        key: &Fingerprint,
318        payload: &[u8],
319        objective: Option<f64>,
320        iteration: Option<u64>,
321        kind: EntryKind,
322    ) -> Result<String, StoreError> {
323        let run_id = self.fresh_run_id();
324        self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
325        Ok(run_id)
326    }
327
328    /// Save under a specific run-id (overwrites an existing entry with the
329    /// same id atomically).
330    pub fn save_overwrite(
331        &self,
332        key: &Fingerprint,
333        run_id: &str,
334        payload: &[u8],
335        objective: Option<f64>,
336        iteration: Option<u64>,
337        kind: EntryKind,
338    ) -> Result<(), StoreError> {
339        // Any new write under this key may change which entry wins both
340        // `LookupMode::Best` and `LookupMode::Latest`, so drop both cached
341        // rows before touching disk. A pure save_overwrite of the same
342        // run_id would also bump mtime and self-invalidate, but a save()
343        // with a fresh run_id leaves the old meta file unchanged — only
344        // explicit invalidation catches that.
345        lookup_cache_invalidate(&LookupCacheKey {
346            fp: *key,
347            mode: LookupMode::Best,
348        });
349        lookup_cache_invalidate(&LookupCacheKey {
350            fp: *key,
351            mode: LookupMode::Latest,
352        });
353        let dir = self.key_dir(key);
354        let pid = std::process::id();
355        // 1. Compute checksum from payload.
356        let checksum = checksum_hex(payload);
357        let objective_finite = objective.filter(|o| o.is_finite());
358        // The meta's `written_unix_secs`/`written_nanos` are captured INSIDE the
359        // write loop — just before the meta_tmp is written, AFTER the bin write
360        // has completed. The stored timestamp drives the TTL contract: an
361        // entry's clock should start ticking from when the entry becomes
362        // (nearly) visible to lookups, not from `save_overwrite`'s entry. On
363        // slow disks the bin write + fsync + rename can take longer than the
364        // entire TTL window itself (the warm-start test fixture pins TTL=1s
365        // while the ext4-backed CI image takes >1s on small writes), so an
366        // up-front stamp causes the entry to be classified as expired the
367        // moment `save_overwrite` returns. Pushing the stamp past the bin
368        // fsync removes that systemic drift from the cost of writing the
369        // entry — only the meta fsync + final rename + dir fsync still
370        // elapse between the stamp and the entry becoming visible.
371
372        // 3. Write both temp files and atomically rename them into place. The
373        //    whole "ensure dir → write temps → rename" sequence is retried once
374        //    as a unit on `ErrorKind::NotFound`, because a concurrent process'
375        //    `evict_overflow` can `remove_dir` this key dir the instant it
376        //    observes it empty (store.rs `evict_overflow`, "Sweep now-empty key
377        //    dirs"). That removal races every write step here: it can vanish the
378        //    dir after `create_dir_all` but before a temp `File::create`, or
379        //    take the dir *and our just-written temps with it* before the
380        //    rename, surfacing as `io: No such file or directory (os error 2)`
381        //    under parallel CV / bootstrap fitting (gam#868). Retrying the
382        //    sequence (not an individual step) is the only correct response: a
383        //    bare rename retry can't recover once the source temp was swept with
384        //    the dir, so we recreate the dir and rewrite the temps from the
385        //    in-memory `payload` / `meta_json` we still hold. A single retry is
386        //    sufficient — the eviction window is one `remove_dir` syscall wide —
387        //    and a second genuine `NotFound` is propagated as before.
388        let nonce = self.nanos_now();
389        let bin_final = dir.join(format!("{run_id}.bin"));
390        let meta_final = dir.join(format!("{run_id}.json"));
391        let mut attempt = 0u8;
392        let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
393            let meta = OnDiskMeta {
394                schema_version: SCHEMA_VERSION,
395                written_unix_secs: secs,
396                written_nanos: subsec_nanos,
397                objective: objective_finite,
398                iteration,
399                kind,
400                checksum_hex: checksum.clone(),
401                payload_bytes: payload.len() as u64,
402                accessed: false,
403            };
404            Ok(serde_json::to_vec_pretty(&meta)?)
405        };
406        loop {
407            let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
408            let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
409            let stamp_fn = || self.unix_now_parts();
410            let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
411                build_meta_json(secs, subsec_nanos)
412                    .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
413            };
414            match write_and_promote_entry(&EntryWrite {
415                dir: &dir,
416                bin_tmp: &bin_tmp,
417                meta_tmp: &meta_tmp,
418                payload,
419                bin_final: &bin_final,
420                meta_final: &meta_final,
421                stamp_fn: &stamp_fn,
422                build_meta_json: &build_meta_for_io,
423            }) {
424                Ok(()) => break,
425                Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
426                    // A sibling process' eviction removed the key dir mid-write.
427                    // Clean up any partial temps, then retry the whole sequence
428                    // once after recreating the dir inside `write_and_promote_entry`.
429                    fs::remove_file(&bin_tmp).ok();
430                    fs::remove_file(&meta_tmp).ok();
431                    attempt += 1;
432                    continue;
433                }
434                Err(e) => {
435                    fs::remove_file(&bin_tmp).ok();
436                    fs::remove_file(&meta_tmp).ok();
437                    fs::remove_file(&bin_final).ok();
438                    return Err(StoreError::Io(e));
439                }
440            }
441        }
442        // Fsync the containing directory so the rename itself is durable
443        // across a power loss / hard crash. fs::File::sync_all on the
444        // payload only guarantees the file content reaches disk; without
445        // also fsyncing the directory inode, the *rename* (which is what
446        // makes the entry visible to lookups) can be lost. Best-effort on
447        // platforms where opening a directory for fsync is not supported.
448        if let Ok(d) = fs::File::open(&dir) {
449            d.sync_all().ok();
450        }
451        self.metadata_index_upsert(&meta_final, &bin_final).ok();
452        // 5. Best-effort eviction; failure here is non-fatal. Throttle the
453        // full directory scan: maintain a process-wide approximate byte
454        // total and only run eviction when the per-save counter wraps
455        // `EVICT_EVERY_N_SAVES` as a drift-resync trigger, or on the very
456        // first save (so a fresh process inheriting a populated store root
457        // sweeps once). The
458        // counter is best-effort: it can drift relative to disk truth
459        // because other processes may write/evict, but every triggered
460        // sweep resyncs it to ground truth.
461        let approx_added = payload.len() as u64 + APPROX_META_BYTES;
462        self.byte_total.fetch_add(approx_added, Ordering::Relaxed);
463        let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
464        if n == 0 || n.is_multiple_of(EVICT_EVERY_N_SAVES) {
465            self.evict_overflow().ok();
466        }
467        Ok(())
468    }
469
470    /// Drop entries older than TTL, then evict by recorded write-time
471    /// ascending until total bytes ≤ `opts.size_budget_bytes`. Idempotent;
472    /// safe under concurrent processes (worst case some entries are
473    /// double-removed, which is a no-op).
474    ///
475    /// Sort key is the `(written_unix_secs, written_nanos)` recorded in
476    /// each entry's meta, not the filesystem mtime — at second-resolution
477    /// mtime, batches of writes within the same second would sort
478    /// arbitrarily and could evict the most recent entry.
479    pub fn evict_overflow(&self) -> Result<(), StoreError> {
480        // Root-mtime short-circuit. A throttled sweep that fires while the
481        // approximate byte total is comfortably under budget has no size
482        // eviction to do; its only residual work is the TTL/byte resync walk
483        // over every key dir. The root mtime is bumped whenever a key dir is
484        // created/removed beneath it, so if it is unchanged since our last
485        // completed sweep AND we are under budget, no key dir was added or
486        // dropped and the size-eviction walk is provably a no-op — skip the
487        // N-dir `read_dir`+`stat` storm. (TTL expiry of existing entries does
488        // not move the root mtime, but it is already enforced lazily on every
489        // `lookup_with` and on the next root-changing save, so the gate cannot
490        // surface a stale entry.) This trims the residual #1114 walk in long
491        // refit-heavy CI runs where thousands of fingerprint dirs accumulate.
492        let current_root_mtime = fs::metadata(&self.root)
493            .ok()
494            .and_then(|m| m.modified().ok());
495        if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
496            && let Some(now_mtime) = current_root_mtime
497            && let Ok(last) = self.last_evict_root_mtime.lock()
498            && *last == Some(now_mtime)
499        {
500            return Ok(());
501        }
502        let read_dir = match fs::read_dir(&self.root) {
503            Ok(rd) => rd,
504            Err(_) => return Ok(()),
505        };
506        // Collect (meta_path, bin_path, total_bytes, write_nanos_since_epoch, accessed).
507        let mut all: Vec<(PathBuf, PathBuf, u64, u128, bool)> = Vec::new();
508        let now_nanos = self.nanos_now();
509        for key_dir_entry in read_dir {
510            let key_dir = match key_dir_entry {
511                Ok(e) => e.path(),
512                Err(_) => continue,
513            };
514            if !key_dir.is_dir() {
515                continue;
516            }
517            // `scan_key_dir` reuses the per-store directory-listing cache when
518            // the key dir's mtime is unchanged, so an unchanged dir costs a
519            // single `stat` rather than a `read_dir` + per-file `stat` + JSON
520            // read of every entry. It also sweeps foreign tmp files and drops
521            // corrupt / TTL-expired entries, mirroring the old inline pass.
522            let scanned = self.scan_key_dir(&key_dir, now_nanos);
523            for entry in &scanned {
524                let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
525                    + entry.meta.written_nanos as u128;
526                let total_bytes = entry.meta_len + entry.bin_len;
527                all.push((
528                    entry.meta_path.clone(),
529                    entry.bin_path.clone(),
530                    total_bytes,
531                    write_nanos,
532                    entry.meta.accessed,
533                ));
534            }
535            // Sweep now-empty key dirs.
536            if scanned.is_empty()
537                && fs::read_dir(&key_dir)
538                    .map(|mut it| it.next().is_none())
539                    .unwrap_or(false)
540            {
541                fs::remove_dir(&key_dir).ok();
542                if let Ok(mut index) = self.index.lock() {
543                    index.by_key_dir.remove(&key_dir);
544                }
545            }
546        }
547        let total: u64 = all.iter().map(|e| e.2).sum();
548        if total <= self.opts.size_budget_bytes {
549            // Resync the approximate byte counter even when no eviction was
550            // needed. Otherwise the in-memory `byte_total` only grows (it
551            // never observes deletions made by sibling processes or
552            // expiration sweeps), so after enough saves `new_total` exceeds
553            // the budget on every call and triggers a full directory walk
554            // on every save instead of every Nth save.
555            self.byte_total.store(total, Ordering::Relaxed);
556            // Record the root mtime observed by this completed under-budget
557            // sweep so a subsequent throttled sweep can short-circuit while
558            // the root is unchanged. Re-read after the walk: any key dir the
559            // walk removed (empty-dir sweep above) bumps the root mtime, and
560            // capturing the post-walk value keeps the gate from skipping a
561            // genuinely-changed root on the next call.
562            if let (Ok(mut last), Some(m)) = (
563                self.last_evict_root_mtime.lock(),
564                fs::metadata(&self.root)
565                    .ok()
566                    .and_then(|m| m.modified().ok()),
567            ) {
568                *last = Some(m);
569            }
570            return Ok(());
571        }
572        all.sort_by(|a, b| {
573            a.4.cmp(&b.4)
574                .then_with(|| a.3.cmp(&b.3))
575                .then_with(|| a.0.cmp(&b.0))
576        });
577        let mut remaining = total;
578        for (meta, bin, bytes, _, _) in all.into_iter() {
579            if remaining <= self.opts.size_budget_bytes {
580                break;
581            }
582            fs::remove_file(&meta).ok();
583            fs::remove_file(&bin).ok();
584            self.metadata_index_remove(&meta);
585            remaining = remaining.saturating_sub(bytes);
586        }
587        // Resync the approximate byte counter to ground truth. Subsequent
588        // saves increment from here until the next sweep.
589        self.byte_total.store(remaining, Ordering::Relaxed);
590        Ok(())
591    }
592}
593
594/// Ensure the key dir exists, write the `.bin` and `.json` temp files, and
595/// atomically rename both into place. Returns the raw `io::Error` (not a
596/// `StoreError`) so the caller can branch on `ErrorKind::NotFound` to retry the
597/// whole sequence after a concurrent eviction removed the dir mid-write
598/// (gam#868). Idempotent across a retry: every path is derived from the caller's
599/// stable args and the temps are rewritten from the in-memory payload, so a
600/// second pass into a freshly recreated dir produces the same final entry.
601///
602/// `.bin` is renamed before `.json` so a meta-pointing-to-missing-bin window is
603/// impossible on the happy path; a reader that catches `.bin`-missing treats the
604/// entry as corrupt and cleans it up.
605struct EntryWrite<'a> {
606    dir: &'a Path,
607    bin_tmp: &'a Path,
608    meta_tmp: &'a Path,
609    payload: &'a [u8],
610    bin_final: &'a Path,
611    meta_final: &'a Path,
612    /// Read the current wall clock as `(unix_secs, subsec_nanos)`. Called
613    /// AFTER the bin write and bin fsync complete (and after the bin rename)
614    /// so the recorded write time tracks when the entry actually becomes
615    /// (nearly) visible, not when `save_overwrite` was first invoked. On
616    /// slow disks the bin fsync can dominate save latency and a pre-write
617    /// stamp would burn TTL the caller never sees.
618    stamp_fn: &'a dyn Fn() -> (u64, u32),
619    /// Build the meta JSON given the captured `(secs, subsec_nanos)`. The
620    /// closure folds those values into `OnDiskMeta` and serializes it.
621    build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
622}
623
624fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
625    // Recreate the dir up front: on the first attempt this is the original
626    // `create_dir_all`; on a retry it re-establishes the dir a sibling
627    // process' eviction removed.
628    fs::create_dir_all(w.dir)?;
629    {
630        let mut f = fs::File::create(w.bin_tmp)?;
631        f.write_all(w.payload)?;
632        f.sync_all().ok();
633    }
634    // Promote the bin first so a crash between the two renames leaves an
635    // orphan .bin (cleaned up by `evict_overflow`) rather than a meta
636    // pointing at a missing .bin (which the reader would mark corrupt).
637    fs::rename(w.bin_tmp, w.bin_final)?;
638    // Stamp the meta AFTER the bin promotion. This is the latest moment the
639    // timestamp can still be inlined into the meta JSON. The remaining gap
640    // before the entry is visible to lookups is one meta write+fsync + the
641    // meta rename + the caller's directory fsync — all bounded, so TTL is
642    // measured from a near-visible moment instead of from the entry to
643    // `save_overwrite`.
644    let (secs, subsec_nanos) = (w.stamp_fn)();
645    let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
646    {
647        let mut f = fs::File::create(w.meta_tmp)?;
648        f.write_all(&meta_json)?;
649        f.sync_all().ok();
650    }
651    if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
652        // Roll back the bin we just promoted to avoid orphaning it, then
653        // surface the error so the caller can retry or fail.
654        fs::remove_file(w.bin_final).ok();
655        return Err(e);
656    }
657    Ok(())
658}
659
660/// Conservative meta-JSON size used by the throttled save counter. Real
661/// meta files run ~250-400 bytes after pretty-printing; overestimating
662/// just means the throttle fires slightly earlier, never later.
663const APPROX_META_BYTES: u64 = 512;
664
665/// How [`WarmStartStore::lookup_with`] ranks candidate entries.
666#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
667enum LookupMode {
668    /// Lowest objective wins; ties to [`entry_better`].
669    Best,
670    /// Newest write wins; objectives ignored.
671    Latest,
672}
673
674impl LookupMode {
675    fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
676        match self {
677            LookupMode::Best => entry_better(candidate, current),
678            LookupMode::Latest => entry_newer(candidate, current),
679        }
680    }
681}
682
683#[derive(Clone, Copy, PartialEq, Eq, Hash)]
684struct LookupCacheKey {
685    fp: Fingerprint,
686    mode: LookupMode,
687}
688
689#[derive(Clone)]
690struct CachedLookup {
691    meta_path: PathBuf,
692    meta_mtime: SystemTime,
693    /// Full-precision nanosecond write timestamp from the on-disk meta,
694    /// kept alongside `entry.written_unix_secs` so the fast path can apply
695    /// the same TTL cutoff as `evict_overflow` without re-reading the JSON.
696    write_nanos: u128,
697    entry: WarmStartEntry,
698}
699
700#[derive(Debug, Default)]
701struct MetadataIndex {
702    by_meta_path: HashMap<PathBuf, IndexedMeta>,
703    /// Per-key-directory cached listing, keyed by the directory's mtime.
704    ///
705    /// A key dir's mtime is bumped by the OS whenever an entry is created,
706    /// renamed, or removed inside it (which is exactly when our entries
707    /// change). So a matching `dir_mtime` means the set of `<runid>.{json,bin}`
708    /// pairs is byte-for-byte what we scanned last time, letting
709    /// [`WarmStartStore::scan_key_dir`] return the cached `Vec<ScannedEntry>`
710    /// without a fresh `read_dir` or any per-file `stat`/JSON read. This is
711    /// what kills the metadata-syscall storm in repeated `lookup_with` /
712    /// `evict_overflow` calls within one fit (gam#1114).
713    by_key_dir: HashMap<PathBuf, ScannedDir>,
714}
715
716#[derive(Debug, Clone)]
717struct IndexedMeta {
718    meta_mtime: SystemTime,
719    meta_len: u64,
720    bin_len: u64,
721    meta: OnDiskMeta,
722}
723
724impl IndexedMeta {
725    fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
726        meta_md.modified().ok() == Some(self.meta_mtime)
727            && meta_md.len() == self.meta_len
728            && bin_md.len() == self.bin_len
729    }
730}
731
732/// Cached result of scanning one key directory: its mtime at scan time plus
733/// the resolved entries. Reused verbatim while the dir's mtime is unchanged.
734#[derive(Debug, Clone)]
735struct ScannedDir {
736    dir_mtime: SystemTime,
737    entries: Vec<ScannedEntry>,
738}
739
740/// One resolved `(meta, bin)` pair discovered during a key-dir scan. Carries
741/// everything both the lookup ranker and the eviction sweep need so neither
742/// has to re-`stat` or re-read the files when the dir is unchanged.
743#[derive(Debug, Clone)]
744struct ScannedEntry {
745    meta_path: PathBuf,
746    bin_path: PathBuf,
747    meta_len: u64,
748    bin_len: u64,
749    meta_mtime: Option<SystemTime>,
750    bin_mtime: Option<SystemTime>,
751    meta: OnDiskMeta,
752}
753
754impl ScannedEntry {
755    fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
756        meta_md.len() == self.meta_len
757            && bin_md.len() == self.bin_len
758            && meta_md.modified().ok() == self.meta_mtime
759            && bin_md.modified().ok() == self.bin_mtime
760    }
761}
762
763/// True iff a meta with the given (`secs`, `nanos`) write timestamp is older
764/// than `ttl` relative to `now_nanos`. Mirrors the cutoff in
765/// [`WarmStartStore::evict_overflow`] so `lookup_with` cannot return an entry
766/// that the eviction sweep would have dropped.
767const fn meta_expired(secs: u64, nanos: u32, ttl: Duration, now_nanos: u128) -> bool {
768    let ttl_nanos = ttl.as_nanos();
769    if ttl_nanos == 0 {
770        return false;
771    }
772    let write_nanos = (secs as u128) * 1_000_000_000u128 + nanos as u128;
773    now_nanos.saturating_sub(write_nanos) >= ttl_nanos
774}
775
776/// Process-wide in-memory cache for [`WarmStartStore::lookup_with`]. Hot poll
777/// loops hit the same (key, mode) repeatedly between writes, so caching the
778/// resolved entry behind an mtime check eliminates the per-call directory
779/// walk, JSON parse, and SHA-256 recomputation. Mtime mismatch — including
780/// writes from a sibling process — invalidates the row and falls back to
781/// the full slow path.
782fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
783    static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
784    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
785}
786
787const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
788const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
789
790const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
791    std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
792}
793
794fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
795    let guard = lookup_cache().lock().ok()?;
796    guard.get(key).cloned()
797}
798
799fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
800    if let Ok(mut guard) = lookup_cache().lock() {
801        let new_bytes = cached_lookup_resident_bytes(&val);
802        if new_bytes > LOOKUP_CACHE_MAX_BYTES {
803            return;
804        }
805        let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
806        if let Some(old) = guard.remove(&key) {
807            resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
808        }
809        while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
810            || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
811        {
812            let oldest = guard
813                .iter()
814                .min_by_key(|(_, cached)| cached.write_nanos)
815                .map(|(old_key, _)| *old_key);
816            let Some(oldest) = oldest else {
817                break;
818            };
819            if let Some(old) = guard.remove(&oldest) {
820                resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
821            }
822        }
823        guard.insert(key, val);
824    }
825}
826
827fn lookup_cache_invalidate(key: &LookupCacheKey) {
828    if let Ok(mut guard) = lookup_cache().lock() {
829        guard.remove(key);
830    }
831}
832
833/// Run a full [`WarmStartStore::evict_overflow`] sweep every Nth save. The
834/// budget can briefly overshoot by K-1 payloads, which the next sweep
835/// reclaims. K=32 keeps the amortized cost negligible on hot checkpoint
836/// paths while still bounding worst-case disk drift.
837const EVICT_EVERY_N_SAVES: u64 = 32;
838
839fn parse_tmp_pid(name: &str) -> Option<u32> {
840    // Names look like "<runid>.bin.tmp.<pid>.<nonce>.<attempt>" or
841    // "<runid>.json.tmp.<pid>.<nonce>.<attempt>" (the trailing retry-attempt
842    // suffix is irrelevant here — only the first token after ".tmp." is the pid).
843    let tail = name.split(".tmp.").nth(1)?;
844    let pid_str = tail.split('.').next()?;
845    pid_str.parse::<u32>().ok()
846}
847
848fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
849    let bytes = fs::read(path)?;
850    let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
851    Ok(parsed)
852}
853
854fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
855    match (candidate.objective, current.objective) {
856        (Some(c), Some(d)) => {
857            if (c - d).abs() < 1e-12 {
858                match (candidate.kind, current.kind) {
859                    (EntryKind::Final, EntryKind::Checkpoint) => true,
860                    (EntryKind::Checkpoint, EntryKind::Final) => false,
861                    _ => entry_newer(candidate, current),
862                }
863            } else {
864                c < d
865            }
866        }
867        (Some(_), None) => true,
868        (None, Some(_)) => false,
869        (None, None) => entry_newer(candidate, current),
870    }
871}
872
873fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
874    let candidate_stamp = (
875        candidate.written_unix_secs,
876        candidate.written_nanos,
877        candidate_kind_rank(candidate.kind),
878    );
879    let current_stamp = (
880        current.written_unix_secs,
881        current.written_nanos,
882        candidate_kind_rank(current.kind),
883    );
884    candidate_stamp > current_stamp
885}
886
887const fn candidate_kind_rank(kind: EntryKind) -> u8 {
888    match kind {
889        EntryKind::Checkpoint => 0,
890        EntryKind::Final => 1,
891    }
892}
893
894fn checksum_hex(payload: &[u8]) -> String {
895    let mut h = Sha256::new();
896    h.update(payload);
897    let out = h.finalize();
898    let mut s = String::with_capacity(out.len() * 2);
899    for b in out.iter() {
900        use std::fmt::Write;
901        write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
902    }
903    s
904}
905
906impl WarmStartStore {
907    fn touch_lookup_hit(
908        &self,
909        meta_path: &Path,
910        entry: WarmStartEntry,
911    ) -> Result<WarmStartEntry, StoreError> {
912        let meta = read_meta(meta_path)?;
913        let (_meta, entry) = self.touch_lookup_meta(meta_path, meta, entry)?;
914        Ok(entry)
915    }
916
917    fn touch_lookup_meta(
918        &self,
919        meta_path: &Path,
920        mut meta: OnDiskMeta,
921        mut entry: WarmStartEntry,
922    ) -> Result<(OnDiskMeta, WarmStartEntry), StoreError> {
923        let now = self.nanos_now();
924        let old = (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
925        let touched = now.max(old.saturating_add(1));
926        let secs = (touched / 1_000_000_000u128) as u64;
927        let nanos = (touched % 1_000_000_000u128) as u32;
928        meta.written_unix_secs = secs;
929        meta.written_nanos = nanos;
930        meta.accessed = true;
931        let json = serde_json::to_vec_pretty(&meta)?;
932        let tmp = meta_path.with_extension(format!(
933            "json.touch.tmp.{}.{}",
934            std::process::id(),
935            self.nanos_now()
936        ));
937        {
938            let mut f = fs::File::create(&tmp)?;
939            f.write_all(&json)?;
940            f.sync_all()?;
941        }
942        fs::rename(&tmp, meta_path)?;
943        if let Some(dir) = meta_path.parent()
944            && let Ok(d) = fs::File::open(dir)
945        {
946            d.sync_all().ok();
947        }
948        self.metadata_index_remove(meta_path);
949        entry.written_unix_secs = secs;
950        Ok((meta, entry))
951    }
952
953    fn read_meta_indexed(
954        &self,
955        path: &Path,
956        meta_md: &fs::Metadata,
957        bin_md: &fs::Metadata,
958    ) -> Result<OnDiskMeta, StoreError> {
959        if let Ok(index) = self.index.lock()
960            && let Some(cached) = index.by_meta_path.get(path)
961            && cached.matches(meta_md, bin_md)
962        {
963            return Ok(cached.meta.clone());
964        }
965
966        let meta = read_meta(path)?;
967        let Some(meta_mtime) = meta_md.modified().ok() else {
968            return Ok(meta);
969        };
970        if let Ok(mut index) = self.index.lock() {
971            index.by_meta_path.insert(
972                path.to_path_buf(),
973                IndexedMeta {
974                    meta_mtime,
975                    meta_len: meta_md.len(),
976                    bin_len: bin_md.len(),
977                    meta: meta.clone(),
978                },
979            );
980        }
981        Ok(meta)
982    }
983
984    fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
985        let meta_md = fs::metadata(meta_path)?;
986        let bin_md = fs::metadata(bin_path)?;
987        self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
988        // A fresh entry just landed in this key dir, so any cached listing for
989        // the dir is stale. Drop it; the next scan rebuilds and re-caches.
990        if let Some(parent) = meta_path.parent()
991            && let Ok(mut index) = self.index.lock()
992        {
993            index.by_key_dir.remove(parent);
994        }
995        Ok(())
996    }
997
998    fn metadata_index_remove(&self, meta_path: &Path) {
999        if let Ok(mut index) = self.index.lock() {
1000            index.by_meta_path.remove(meta_path);
1001            if let Some(parent) = meta_path.parent() {
1002                index.by_key_dir.remove(parent);
1003            }
1004        }
1005    }
1006
1007    fn metadata_index_remove_key(&self, key: &Fingerprint) {
1008        let dir = self.key_dir(key);
1009        if let Ok(mut index) = self.index.lock() {
1010            index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
1011            index.by_key_dir.remove(&dir);
1012        }
1013    }
1014
1015    /// Cached listing lookup for one key directory.
1016    ///
1017    /// Returns the cached `Vec<ScannedEntry>` if the directory's current mtime
1018    /// matches the cached scan (no entry added/removed since), otherwise
1019    /// `None` so the caller performs a fresh scan via [`Self::scan_key_dir`].
1020    ///
1021    /// A matching dir mtime guarantees the *set* of files is unchanged, but TTL
1022    /// is wall-clock relative, so an entry valid at scan time can expire while
1023    /// the listing is still cached. The caller re-applies the TTL cutoff to the
1024    /// returned entries; this only proves the file set is stable.
1025    fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
1026        let dir_mtime = dir_md.modified().ok()?;
1027        let index = self.index.lock().ok()?;
1028        let cached = index.by_key_dir.get(dir)?;
1029        if cached.dir_mtime != dir_mtime {
1030            return None;
1031        }
1032        for entry in &cached.entries {
1033            let meta_md = fs::metadata(&entry.meta_path).ok()?;
1034            let bin_md = fs::metadata(&entry.bin_path).ok()?;
1035            if !entry.matches_files(&meta_md, &bin_md) {
1036                return None;
1037            }
1038        }
1039        Some(cached.entries.clone())
1040    }
1041
1042    fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
1043        if let Ok(mut index) = self.index.lock() {
1044            index.by_key_dir.insert(
1045                dir.to_path_buf(),
1046                ScannedDir {
1047                    dir_mtime,
1048                    entries: entries.to_vec(),
1049                },
1050            );
1051        }
1052    }
1053
1054    /// Scan one key directory, resolving every valid `(meta, bin)` pair and
1055    /// cleaning up corrupt / orphaned / schema-mismatched files in passing.
1056    ///
1057    /// Serves both [`Self::lookup_with`] and [`Self::evict_overflow`]: when the
1058    /// directory's mtime is unchanged since the previous scan it returns the
1059    /// cached listing without a single `read_dir`, `metadata`, or JSON read —
1060    /// the metadata-syscall storm that #1114 traced. A fresh scan re-caches the
1061    /// listing keyed by the dir mtime observed *after* any cleanup, so a later
1062    /// unchanged call hits the cache. (`now_nanos` drives the TTL drop; expired
1063    /// entries are removed and excluded from the result.)
1064    ///
1065    /// `.tmp.*` files belonging to other processes are swept; same-PID temps
1066    /// (in-flight writes from us) are left alone.
1067    fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1068        let dir_md = match fs::metadata(dir) {
1069            Ok(m) => m,
1070            Err(_) => return Vec::new(),
1071        };
1072        if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1073            // The file set is unchanged, but TTL is wall-clock relative: an
1074            // entry valid when scanned may have expired since. Re-apply the
1075            // cutoff against `now_nanos`, removing any that crossed it. If none
1076            // expired we return the cached listing untouched (the fast path);
1077            // otherwise the removals bump the dir mtime, so we drop the stale
1078            // cache and re-cache the survivors keyed by the post-removal mtime.
1079            let any_expired = cached.iter().any(|e| {
1080                meta_expired(
1081                    e.meta.written_unix_secs,
1082                    e.meta.written_nanos,
1083                    self.opts.ttl,
1084                    now_nanos,
1085                )
1086            });
1087            if !any_expired {
1088                return cached;
1089            }
1090            let mut survivors = Vec::with_capacity(cached.len());
1091            for entry in cached {
1092                if meta_expired(
1093                    entry.meta.written_unix_secs,
1094                    entry.meta.written_nanos,
1095                    self.opts.ttl,
1096                    now_nanos,
1097                ) {
1098                    fs::remove_file(&entry.meta_path).ok();
1099                    fs::remove_file(&entry.bin_path).ok();
1100                    self.metadata_index_remove(&entry.meta_path);
1101                } else {
1102                    survivors.push(entry);
1103                }
1104            }
1105            if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1106                self.store_dir_scan(dir, mtime, &survivors);
1107            }
1108            return survivors;
1109        }
1110        let read_dir = match fs::read_dir(dir) {
1111            Ok(rd) => rd,
1112            Err(_) => return Vec::new(),
1113        };
1114        let mut entries = Vec::new();
1115        let mut mutated = false;
1116        for f in read_dir {
1117            let path = match f {
1118                Ok(e) => e.path(),
1119                Err(_) => continue,
1120            };
1121            let name = match path.file_name().and_then(|s| s.to_str()) {
1122                Some(s) => s,
1123                None => continue,
1124            };
1125            if name.contains(".tmp.") {
1126                if let Some(pid) = parse_tmp_pid(name)
1127                    && pid != std::process::id()
1128                {
1129                    fs::remove_file(&path).ok();
1130                    mutated = true;
1131                }
1132                continue;
1133            }
1134            if path.extension().and_then(|s| s.to_str()) != Some("json") {
1135                continue;
1136            }
1137            let meta_md = match fs::metadata(&path) {
1138                Ok(m) => m,
1139                Err(_) => continue,
1140            };
1141            let bin = path.with_extension("bin");
1142            let bin_md = match fs::metadata(&bin) {
1143                Ok(m) => m,
1144                Err(_) => {
1145                    fs::remove_file(&path).ok();
1146                    self.metadata_index_remove(&path);
1147                    mutated = true;
1148                    continue;
1149                }
1150            };
1151            let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1152                Ok(m) => m,
1153                Err(_) => {
1154                    fs::remove_file(&path).ok();
1155                    fs::remove_file(&bin).ok();
1156                    self.metadata_index_remove(&path);
1157                    mutated = true;
1158                    continue;
1159                }
1160            };
1161            if meta.schema_version != SCHEMA_VERSION {
1162                fs::remove_file(&path).ok();
1163                fs::remove_file(&bin).ok();
1164                self.metadata_index_remove(&path);
1165                mutated = true;
1166                continue;
1167            }
1168            if meta_expired(
1169                meta.written_unix_secs,
1170                meta.written_nanos,
1171                self.opts.ttl,
1172                now_nanos,
1173            ) {
1174                fs::remove_file(&path).ok();
1175                fs::remove_file(&bin).ok();
1176                self.metadata_index_remove(&path);
1177                mutated = true;
1178                continue;
1179            }
1180            entries.push(ScannedEntry {
1181                meta_path: path,
1182                bin_path: bin,
1183                meta_len: meta_md.len(),
1184                bin_len: bin_md.len(),
1185                meta_mtime: meta_md.modified().ok(),
1186                bin_mtime: bin_md.modified().ok(),
1187                meta,
1188            });
1189        }
1190        // Cache keyed by the mtime *after* any cleanup so the next unchanged
1191        // call is a cache hit. If cleanup mutated the dir, re-stat to capture
1192        // the post-mutation mtime; otherwise reuse the mtime we already read.
1193        let final_mtime = if mutated {
1194            fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1195        } else {
1196            dir_md.modified().ok()
1197        };
1198        if let Some(mtime) = final_mtime {
1199            self.store_dir_scan(dir, mtime, &entries);
1200        }
1201        entries
1202    }
1203
1204    fn test_time_offset_ns(&self) -> u64 {
1205        self.test_time_offset_ns.load(Ordering::Relaxed)
1206    }
1207
1208    fn unix_now_parts(&self) -> (u64, u32) {
1209        let base = SystemTime::now()
1210            .duration_since(UNIX_EPOCH)
1211            .map(|d| d.as_nanos())
1212            .unwrap_or(0);
1213        let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1214        let secs = (total / 1_000_000_000u128) as u64;
1215        let nanos = (total % 1_000_000_000u128) as u32;
1216        (secs, nanos)
1217    }
1218
1219    fn nanos_now(&self) -> u128 {
1220        let base = SystemTime::now()
1221            .duration_since(UNIX_EPOCH)
1222            .map(|d| d.as_nanos())
1223            .unwrap_or(0);
1224        base.saturating_add(u128::from(self.test_time_offset_ns()))
1225    }
1226
1227    fn fresh_run_id(&self) -> String {
1228        let pid = std::process::id();
1229        let nanos = self.nanos_now();
1230        format!("r{pid:x}-{nanos:x}")
1231    }
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236    use super::*;
1237    use crate::warm_start::key::Fingerprinter;
1238
1239    impl WarmStartStore {
1240        /// Advance this store's simulated monotonic clock by `dur`. Only
1241        /// available in tests — production code reads the real wall clock and
1242        /// never mutates the per-store offset.
1243        fn test_advance_time(&self, dur: Duration) {
1244            self.test_time_offset_ns
1245                .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1246        }
1247    }
1248
1249    fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1250        let dir = tempfile::tempdir().unwrap();
1251        let store = WarmStartStore::open(
1252            dir.path().to_path_buf(),
1253            StoreOptions {
1254                size_budget_bytes: 1024 * 1024,
1255                ttl: Duration::from_secs(60),
1256            },
1257        )
1258        .unwrap();
1259        (dir, store)
1260    }
1261
1262    fn key_for(s: &str) -> Fingerprint {
1263        let mut fp = Fingerprinter::new();
1264        fp.absorb_str(b"test", s);
1265        fp.finalize()
1266    }
1267
1268    #[test]
1269    fn roundtrip_save_then_lookup() {
1270        let (_d, store) = temp_store();
1271        let key = key_for("roundtrip");
1272        store
1273            .save(
1274                &key,
1275                b"hello-warm",
1276                Some(1.5),
1277                Some(7),
1278                EntryKind::Checkpoint,
1279            )
1280            .unwrap();
1281        let got = store.lookup(&key).unwrap().unwrap();
1282        assert_eq!(got.payload, b"hello-warm");
1283        assert_eq!(got.objective, Some(1.5));
1284        assert_eq!(got.iteration, Some(7));
1285        assert_eq!(got.kind, EntryKind::Checkpoint);
1286    }
1287
1288    #[test]
1289    fn lookup_picks_lowest_objective() {
1290        let (_d, store) = temp_store();
1291        let key = key_for("multi");
1292        store
1293            .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1294            .unwrap();
1295        store
1296            .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1297            .unwrap();
1298        store
1299            .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1300            .unwrap();
1301        let got = store.lookup(&key).unwrap().unwrap();
1302        assert_eq!(got.payload, b"better");
1303        assert_eq!(got.objective, Some(1.0));
1304    }
1305
1306    #[test]
1307    fn lookup_latest_ignores_objective_ordering() {
1308        let (_d, store) = temp_store();
1309        let key = key_for("latest-vs-best");
1310        store
1311            .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1312            .unwrap();
1313        store.test_advance_time(Duration::from_millis(2));
1314        store
1315            .save(
1316                &key,
1317                b"newer-higher-objective",
1318                Some(10.0),
1319                Some(2),
1320                EntryKind::Checkpoint,
1321            )
1322            .unwrap();
1323
1324        let best = store.lookup(&key).unwrap().unwrap();
1325        assert_eq!(best.payload, b"low-objective");
1326
1327        let latest = store.lookup_latest(&key).unwrap().unwrap();
1328        assert_eq!(latest.payload, b"newer-higher-objective");
1329        assert_eq!(latest.iteration, Some(2));
1330    }
1331
1332    #[test]
1333    fn tiebreak_final_beats_checkpoint() {
1334        let (_d, store) = temp_store();
1335        let key = key_for("tie");
1336        store
1337            .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1338            .unwrap();
1339        // Same objective, different kind.
1340        store
1341            .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1342            .unwrap();
1343        let got = store.lookup(&key).unwrap().unwrap();
1344        assert_eq!(got.payload, b"final");
1345        assert_eq!(got.kind, EntryKind::Final);
1346    }
1347
1348    #[test]
1349    fn tiebreak_latest_mtime_when_no_objective() {
1350        let (_d, store) = temp_store();
1351        let key = key_for("latest");
1352        store
1353            .save(&key, b"first", None, None, EntryKind::Checkpoint)
1354            .unwrap();
1355        store.test_advance_time(Duration::from_millis(1_100));
1356        store
1357            .save(&key, b"second", None, None, EntryKind::Checkpoint)
1358            .unwrap();
1359        let got = store.lookup(&key).unwrap().unwrap();
1360        assert_eq!(got.payload, b"second");
1361    }
1362
1363    #[test]
1364    fn corrupt_payload_is_cleaned_up() {
1365        let (_d, store) = temp_store();
1366        let key = key_for("corrupt");
1367        store
1368            .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1369            .unwrap();
1370        // Tamper with the .bin file.
1371        let dir = store.key_dir(&key);
1372        for entry in fs::read_dir(&dir).unwrap() {
1373            let p = entry.unwrap().path();
1374            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1375                fs::write(&p, b"tampered!").unwrap();
1376            }
1377        }
1378        let got = store.lookup(&key).unwrap();
1379        assert!(got.is_none(), "tampered entry must be rejected");
1380        // The corrupt files should be cleaned up so they don't accumulate.
1381        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1382        assert!(remaining.is_empty(), "corrupt entry should be removed");
1383    }
1384
1385    #[test]
1386    fn corrupt_meta_json_is_cleaned_up() {
1387        let (_d, store) = temp_store();
1388        let key = key_for("badjson");
1389        store
1390            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1391            .unwrap();
1392        let dir = store.key_dir(&key);
1393        for entry in fs::read_dir(&dir).unwrap() {
1394            let p = entry.unwrap().path();
1395            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1396                fs::write(&p, b"{not valid json").unwrap();
1397            }
1398        }
1399        let got = store.lookup(&key).unwrap();
1400        assert!(got.is_none());
1401    }
1402
1403    #[test]
1404    fn schema_mismatched_entry_is_cleaned_up() {
1405        let (_d, store) = temp_store();
1406        let key = key_for("schema");
1407        store
1408            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1409            .unwrap();
1410        let dir = store.key_dir(&key);
1411        for entry in fs::read_dir(&dir).unwrap() {
1412            let p = entry.unwrap().path();
1413            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1414                let raw = fs::read(&p).unwrap();
1415                let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1416                parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1417                fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1418            }
1419        }
1420        assert!(store.lookup(&key).unwrap().is_none());
1421        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1422        assert!(
1423            remaining.is_empty(),
1424            "schema-mismatched entry should be removed"
1425        );
1426    }
1427
1428    #[test]
1429    fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1430        let dir = tempfile::tempdir().unwrap();
1431        let store = WarmStartStore::open(
1432            dir.path().to_path_buf(),
1433            StoreOptions {
1434                size_budget_bytes: 6 * 1024,
1435                ttl: Duration::from_secs(3600),
1436            },
1437        )
1438        .unwrap();
1439        let stale_key = key_for("schema-size-stale");
1440        store
1441            .save(
1442                &stale_key,
1443                &vec![0u8; 4 * 1024],
1444                None,
1445                None,
1446                EntryKind::Checkpoint,
1447            )
1448            .unwrap();
1449
1450        let stale_dir = store.key_dir(&stale_key);
1451        let mut stale_meta = None;
1452        let mut stale_bin = None;
1453        for entry in fs::read_dir(&stale_dir).unwrap() {
1454            let p = entry.unwrap().path();
1455            match p.extension().and_then(|s| s.to_str()) {
1456                Some("json") => {
1457                    let raw = fs::read(&p).unwrap();
1458                    let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1459                    parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1460                    fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1461                    stale_meta = Some(p);
1462                }
1463                Some("bin") => stale_bin = Some(p),
1464                _ => {}
1465            }
1466        }
1467        let stale_meta = stale_meta.expect("saved entry should have metadata");
1468        let stale_bin = stale_bin.expect("saved entry should have payload");
1469
1470        let fresh_key = key_for("schema-size-fresh");
1471        store
1472            .save(
1473                &fresh_key,
1474                &vec![1u8; 2 * 1024],
1475                None,
1476                None,
1477                EntryKind::Checkpoint,
1478            )
1479            .unwrap();
1480
1481        assert!(
1482            !stale_meta.exists(),
1483            "schema-mismatched metadata should be removed during eviction scan"
1484        );
1485        assert!(
1486            !stale_bin.exists(),
1487            "schema-mismatched payload should be removed during eviction scan"
1488        );
1489
1490        let mut total = 0u64;
1491        for key_dir in fs::read_dir(store.root()).unwrap() {
1492            let key_dir = key_dir.unwrap().path();
1493            if key_dir.is_dir() {
1494                for entry in fs::read_dir(key_dir).unwrap() {
1495                    total += fs::metadata(entry.unwrap().path()).unwrap().len();
1496                }
1497            }
1498        }
1499        assert!(
1500            total <= store.options().size_budget_bytes,
1501            "schema-mismatched bytes must not leak past size accounting (got {total})"
1502        );
1503        assert!(store.lookup(&stale_key).unwrap().is_none());
1504        assert!(store.lookup(&fresh_key).unwrap().is_some());
1505    }
1506
1507    #[test]
1508    fn missing_bin_treated_as_missing() {
1509        let (_d, store) = temp_store();
1510        let key = key_for("nobin");
1511        store
1512            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1513            .unwrap();
1514        let dir = store.key_dir(&key);
1515        for entry in fs::read_dir(&dir).unwrap() {
1516            let p = entry.unwrap().path();
1517            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1518                fs::remove_file(&p).unwrap();
1519            }
1520        }
1521        assert!(store.lookup(&key).unwrap().is_none());
1522    }
1523
1524    #[test]
1525    fn missing_key_returns_none() {
1526        let (_d, store) = temp_store();
1527        let key = key_for("absent");
1528        assert!(store.lookup(&key).unwrap().is_none());
1529    }
1530
1531    #[test]
1532    fn lru_eviction_under_size_budget() {
1533        let dir = tempfile::tempdir().unwrap();
1534        // Tiny budget: 4 KiB. Each entry payload + meta JSON is ~600 B.
1535        let store = WarmStartStore::open(
1536            dir.path().to_path_buf(),
1537            StoreOptions {
1538                size_budget_bytes: 4 * 1024,
1539                ttl: Duration::from_secs(3600),
1540            },
1541        )
1542        .unwrap();
1543        let mut keys = Vec::new();
1544        for i in 0..20 {
1545            let mut fp = Fingerprinter::new();
1546            fp.absorb_u64(b"i", i);
1547            let key = fp.finalize();
1548            keys.push(key);
1549            let payload = vec![0u8; 256];
1550            store
1551                .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1552                .unwrap();
1553        }
1554        // Walk the store root and confirm total bytes is bounded.
1555        let mut total = 0u64;
1556        for kd in fs::read_dir(store.root()).unwrap() {
1557            let kd = kd.unwrap().path();
1558            if kd.is_dir() {
1559                for f in fs::read_dir(&kd).unwrap() {
1560                    total += fs::metadata(f.unwrap().path()).unwrap().len();
1561                }
1562            }
1563        }
1564        assert!(
1565            total <= 8 * 1024,
1566            "eviction failed to bound size (got {total})"
1567        );
1568        // Earliest keys must have been evicted; latest survive.
1569        assert!(store.lookup(&keys[0]).unwrap().is_none());
1570        assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1571    }
1572
1573    #[test]
1574    fn ttl_drops_old_entries() {
1575        // Expiration is driven by `test_advance_time` (additive simulated time
1576        // on top of the wall clock), so the TTL itself only needs to be larger
1577        // than any plausible save→lookup wall-time on the CI runner. The
1578        // 1-second TTL the original fixture used was tighter than the worst
1579        // ext4 fsync this image sees (see `save_overwrite`'s late-stamp
1580        // comment), so the first `is_some()` check would flake to "expired"
1581        // before any time advance ever ran. 60 s clears that race with margin.
1582        let dir = tempfile::tempdir().unwrap();
1583        let ttl = Duration::from_secs(60);
1584        let store = WarmStartStore::open(
1585            dir.path().to_path_buf(),
1586            StoreOptions {
1587                size_budget_bytes: 1024 * 1024,
1588                ttl,
1589            },
1590        )
1591        .unwrap();
1592        let key = key_for("ttl");
1593        store
1594            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1595            .unwrap();
1596        assert!(store.lookup(&key).unwrap().is_some());
1597        store.test_advance_time(ttl + Duration::from_secs(5));
1598        // Trigger eviction via a save under an unrelated key.
1599        let other = key_for("ttl-other");
1600        store
1601            .save(&other, b"y", None, None, EntryKind::Checkpoint)
1602            .unwrap();
1603        // Original now expired.
1604        assert!(store.lookup(&key).unwrap().is_none());
1605        assert!(store.lookup(&other).unwrap().is_some());
1606    }
1607
1608    #[test]
1609    fn orphan_temp_files_from_dead_processes_are_swept() {
1610        let (_d, store) = temp_store();
1611        let key = key_for("tmp");
1612        let dir = store.key_dir(&key);
1613        fs::create_dir_all(&dir).unwrap();
1614        // Use PID 1 — never the current process, so it counts as "other".
1615        let orphan_other = dir.join("r0-0.json.tmp.1.0");
1616        let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1617        fs::write(&orphan_other, b"orphan").unwrap();
1618        fs::write(&mine, b"mine").unwrap();
1619        store.evict_overflow().unwrap();
1620        assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1621        assert!(mine.exists(), "same-PID tmp file must be left alone");
1622    }
1623
1624    #[test]
1625    fn tmp_filenames_without_pid_are_skipped() {
1626        // Malformed tmp names (no parseable pid) must not crash the sweep.
1627        let (_d, store) = temp_store();
1628        let key = key_for("malformed");
1629        let dir = store.key_dir(&key);
1630        fs::create_dir_all(&dir).unwrap();
1631        let weird = dir.join("garbage.tmp.notapid.suffix");
1632        fs::write(&weird, b"x").unwrap();
1633        // Must not panic.
1634        store.evict_overflow().unwrap();
1635        assert!(weird.exists());
1636    }
1637
1638    #[test]
1639    fn save_overwrite_keeps_single_entry() {
1640        let (_d, store) = temp_store();
1641        let key = key_for("overwrite");
1642        let id = store
1643            .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1644            .unwrap();
1645        store
1646            .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1647            .unwrap();
1648        // Only one (meta, bin) pair on disk.
1649        let dir = store.key_dir(&key);
1650        let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1651        assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1652        let got = store.lookup(&key).unwrap().unwrap();
1653        assert_eq!(got.payload, b"v2");
1654        assert_eq!(got.objective, Some(1.0));
1655    }
1656
1657    #[test]
1658    fn write_and_promote_recreates_dir_removed_before_write() {
1659        // gam#868: a sibling process' eviction can `remove_dir` the key dir the
1660        // instant it observes it empty, racing every write step in `save`. The
1661        // promote helper must recreate the dir rather than failing with ENOENT.
1662        let (_d, store) = temp_store();
1663        let key = key_for("race-recreate");
1664        let dir = store.key_dir(&key);
1665        // Dir does NOT exist yet (simulates eviction having removed it after a
1666        // prior `create_dir_all`). The helper must create it and succeed.
1667        assert!(!dir.exists());
1668        let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1669        let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1670        let bin_final = dir.join("r0.bin");
1671        let meta_final = dir.join("r0.json");
1672        let stamp_fn = || (0u64, 0u32);
1673        let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1674        write_and_promote_entry(&EntryWrite {
1675            dir: &dir,
1676            bin_tmp: &bin_tmp,
1677            meta_tmp: &meta_tmp,
1678            payload: b"payload",
1679            bin_final: &bin_final,
1680            meta_final: &meta_final,
1681            stamp_fn: &stamp_fn,
1682            build_meta_json: &build_meta_json,
1683        })
1684        .expect("promote into a missing dir must recreate it and succeed");
1685        assert!(bin_final.exists() && meta_final.exists());
1686        assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1687    }
1688
1689    #[test]
1690    fn save_survives_concurrent_eviction_removing_key_dir() {
1691        // gam#868 end-to-end: hammer the same key with concurrent saves while a
1692        // sibling thread repeatedly runs `evict_overflow` (which `remove_dir`s
1693        // now-empty key dirs). Before the atomic-retry fix, a save whose
1694        // `create_dir_all`→write/rename window straddled a `remove_dir` failed
1695        // with `io: No such file or directory (os error 2)`. Every save must now
1696        // succeed; we assert no save returns an error.
1697        use std::sync::Arc;
1698        use std::sync::atomic::AtomicBool;
1699
1700        let dir = tempfile::tempdir().unwrap();
1701        // Zero size budget forces `evict_overflow` to delete entries (and then
1702        // sweep the emptied key dir) on essentially every sweep, maximizing the
1703        // race window.
1704        let store = Arc::new(
1705            WarmStartStore::open(
1706                dir.path().to_path_buf(),
1707                StoreOptions {
1708                    size_budget_bytes: 0,
1709                    ttl: Duration::from_secs(60),
1710                },
1711            )
1712            .unwrap(),
1713        );
1714        let key = key_for("concurrent-evict");
1715        let stop = Arc::new(AtomicBool::new(false));
1716
1717        let evictor = {
1718            let store = Arc::clone(&store);
1719            let stop = Arc::clone(&stop);
1720            std::thread::spawn(move || {
1721                while !stop.load(Ordering::Relaxed) {
1722                    store.evict_overflow().ok();
1723                }
1724            })
1725        };
1726
1727        let writers: Vec<_> = (0..4)
1728            .map(|w| {
1729                let store = Arc::clone(&store);
1730                std::thread::spawn(move || {
1731                    for i in 0..200u32 {
1732                        let payload = format!("w{w}-i{i}");
1733                        store
1734                            .save(
1735                                &key,
1736                                payload.as_bytes(),
1737                                Some(i as f64),
1738                                Some(i as u64),
1739                                EntryKind::Checkpoint,
1740                            )
1741                            .expect("save must not fail with ENOENT under concurrent eviction");
1742                    }
1743                })
1744            })
1745            .collect();
1746
1747        for h in writers {
1748            h.join().unwrap();
1749        }
1750        stop.store(true, Ordering::Relaxed);
1751        evictor.join().unwrap();
1752    }
1753
1754    #[test]
1755    fn keys_are_isolated() {
1756        let (_d, store) = temp_store();
1757        let a = key_for("a");
1758        let b = key_for("b");
1759        store
1760            .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1761            .unwrap();
1762        store
1763            .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1764            .unwrap();
1765        assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1766        assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1767    }
1768}