Skip to main content

gam_runtime/warm_start/
store.rs

1//! Filesystem store for warm-start entries.
2//!
3//! Each entry is a `(<runid>.json, <runid>.bin)` pair inside a per-key
4//! directory. Writes go through a temp-file → fsync → rename sequence so a
5//! crash mid-write never leaves a half-written entry visible to readers.
6//! Per-entry SHA-256 checksums catch any residual corruption.
7//!
8//! See [`crate::warm_start`] for the public API summary.
9
10use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21/// On-disk schema version. Bump on incompatible format changes; old entries
22/// are then ignored at read time and evicted on the next save.
23pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25/// Default disk-budget for the whole warm-start store root (~1 GiB).
26pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28/// Default TTL — entries untouched for this long are dropped.
29pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33    #[error("io: {0}")]
34    Io(#[from] io::Error),
35    #[error("json: {0}")]
36    Json(#[from] serde_json::Error),
37}
38
39/// Entry returned from [`WarmStartStore::lookup`].
40#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42    pub payload: Vec<u8>,
43    pub objective: Option<f64>,
44    pub iteration: Option<u64>,
45    pub written_unix_secs: u64,
46    pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51    /// Mid-fit checkpoint — fit was alive when written.
52    Checkpoint,
53    /// End-of-fit — fit terminated successfully.
54    Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59    schema_version: u32,
60    written_unix_secs: u64,
61    /// Nanosecond component of the write timestamp. Used to break ties in
62    /// LRU eviction so entries written within the same second don't sort
63    /// arbitrarily.
64    #[serde(default)]
65    written_nanos: u32,
66    objective: Option<f64>,
67    iteration: Option<u64>,
68    kind: EntryKind,
69    checksum_hex: String,
70    payload_bytes: u64,
71}
72
73#[derive(Debug, Clone)]
74pub struct StoreOptions {
75    pub size_budget_bytes: u64,
76    pub ttl: Duration,
77}
78
79impl Default for StoreOptions {
80    fn default() -> Self {
81        Self {
82            size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
83            ttl: Duration::from_secs(DEFAULT_TTL_SECS),
84        }
85    }
86}
87
88#[derive(Debug)]
89pub struct WarmStartStore {
90    root: PathBuf,
91    opts: StoreOptions,
92    /// Per-store metadata index. It is populated lazily and shared by clones so
93    /// checkpoint-heavy sessions do not repeatedly open every metadata JSON.
94    index: Arc<Mutex<MetadataIndex>>,
95    /// Approximate sum of bytes written under `root`. Used to throttle the
96    /// full directory-scanning eviction in [`Self::save_overwrite`] — see
97    /// `EVICT_EVERY_N_SAVES`. The counter resyncs to ground truth after every
98    /// triggered sweep. Shared across clones (`Arc`) so the eviction throttle
99    /// survives the per-operation store reuse in
100    /// `solver::persistent_warm_start::persistent_store` — otherwise every
101    /// fit reset the counter and ran a full eviction walk on its first save
102    /// (gam#1114).
103    byte_total: Arc<AtomicU64>,
104    /// Monotonically increasing save counter, shared across clones. Used
105    /// together with `byte_total` to throttle the eviction directory walk.
106    save_counter: Arc<AtomicU64>,
107    /// Root-directory mtime observed at the last completed eviction sweep,
108    /// shared across clones. When a throttled sweep fires while the store is
109    /// comfortably *under* the size budget, the only work left for it is a
110    /// TTL/byte resync over every key dir — an N-dir `read_dir` + `stat` walk
111    /// that, with thousands of fingerprint dirs in a long CI run, dominates
112    /// the per-32-save sweep even after the per-dir listing cache lands (the
113    /// residual #1114 walk). The root dir's mtime is bumped by the OS whenever
114    /// a key dir is created or removed under it, so an unchanged root mtime
115    /// means no key dir was added/dropped since our last sweep; combined with
116    /// a comfortably-under-budget byte total, the size-eviction walk is then a
117    /// guaranteed no-op and is skipped. TTL expiry of *existing* entries does
118    /// not change the root mtime, but it is already performed lazily on every
119    /// `lookup_with` and on the next root-changing save, so skipping it here is
120    /// behaviour-neutral (no entry the gate skips could be returned stale).
121    last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
122    /// Per-store test-only monotonic time offset (nanoseconds) added to every
123    /// `*_now` reading. Always zero in production. Tests mutate it through
124    /// [`Self::test_advance_time`] to simulate elapsed time without
125    /// `thread::sleep`. Lives on the store rather than as a process-wide
126    /// static so parallel tests with their own stores cannot pollute each
127    /// other's clocks — a global clock made `cargo test` non-deterministic
128    /// (gam test infra: one test's +1.5s TTL advance was bumping another
129    /// test's just-saved entry past its 1s TTL on immediate lookup).
130    test_time_offset_ns: AtomicU64,
131}
132
133impl Clone for WarmStartStore {
134    fn clone(&self) -> Self {
135        Self {
136            root: self.root.clone(),
137            opts: self.opts.clone(),
138            index: Arc::clone(&self.index),
139            // Throttle counters are shared across clones so the eviction
140            // directory walk stays throttled to every Nth save across the
141            // whole process, even though `persistent_store` hands out a fresh
142            // clone per save/lookup.
143            byte_total: Arc::clone(&self.byte_total),
144            save_counter: Arc::clone(&self.save_counter),
145            last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
146            test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
147        }
148    }
149}
150
151impl WarmStartStore {
152    /// Open (or create) a store rooted at `root`.
153    pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
154        fs::create_dir_all(&root)?;
155        Ok(Self {
156            root,
157            opts,
158            index: Arc::new(Mutex::new(MetadataIndex::default())),
159            byte_total: Arc::new(AtomicU64::new(0)),
160            save_counter: Arc::new(AtomicU64::new(0)),
161            last_evict_root_mtime: Arc::new(Mutex::new(None)),
162            test_time_offset_ns: AtomicU64::new(0),
163        })
164    }
165
166    pub fn root(&self) -> &Path {
167        &self.root
168    }
169
170    pub fn options(&self) -> &StoreOptions {
171        &self.opts
172    }
173
174    fn key_dir(&self, key: &Fingerprint) -> PathBuf {
175        self.root.join(key.to_hex())
176    }
177
178    /// Look up the best entry for `key`, or `None` if no valid entry exists.
179    ///
180    /// Selection: lowest `objective` first; ties prefer [`EntryKind::Final`]
181    /// over [`EntryKind::Checkpoint`], then latest `written_unix_secs`. If
182    /// every candidate has `objective = None`, picks the latest write.
183    /// Corrupt or schema-mismatched candidates are silently cleaned up and
184    /// skipped.
185    pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
186        self.lookup_with(key, LookupMode::Best)
187    }
188
189    /// Look up the newest valid entry for `key`, or `None` if no valid entry
190    /// exists.
191    ///
192    /// Unlike [`Self::lookup`], this deliberately ignores objective values.
193    /// Use this for near-match seed namespaces where entries may come from
194    /// different folds, diseases, or row sets, and objective magnitudes are
195    /// not comparable. Exact-key resume should keep using [`Self::lookup`].
196    pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
197        self.lookup_with(key, LookupMode::Latest)
198    }
199
200    fn lookup_with(
201        &self,
202        key: &Fingerprint,
203        mode: LookupMode,
204    ) -> Result<Option<WarmStartEntry>, StoreError> {
205        let dir = self.key_dir(key);
206        if !dir.exists() {
207            // A stale in-memory cache entry could outlive its directory if
208            // another process evicted us. Drop it so we don't return data
209            // for a key whose backing files are gone.
210            lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
211            self.metadata_index_remove_key(key);
212            return Ok(None);
213        }
214        // Fast path: if the same (key, mode) was looked up before and the
215        // chosen meta file's mtime is unchanged, return the cached entry
216        // without re-reading any JSON or re-checksumming the .bin payload.
217        // A separate writer (this process or another) bumps mtime on
218        // rename → mismatch → we fall through to the slow path. The TTL
219        // cutoff is also re-checked here against `nanos_now()` so a hot
220        // poll loop cannot keep returning an expired entry between eviction
221        // sweeps (eviction is throttled via `EVICT_EVERY_N_SAVES`).
222        let cache_key = LookupCacheKey { fp: *key, mode };
223        let now_nanos = self.nanos_now();
224        if let Some(hit) = lookup_cache_get(&cache_key) {
225            if let Ok(md) = fs::metadata(&hit.meta_path)
226                && md.modified().ok() == Some(hit.meta_mtime)
227            {
228                let expired = self.opts.ttl.as_nanos() > 0
229                    && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
230                if !expired {
231                    return Ok(Some(hit.entry));
232                }
233                lookup_cache_invalidate(&cache_key);
234                let bin = hit.meta_path.with_extension("bin");
235                fs::remove_file(&hit.meta_path).ok();
236                fs::remove_file(&bin).ok();
237                // Removing the entry stales any cached directory listing.
238                self.metadata_index_remove(&hit.meta_path);
239                return Ok(None);
240            }
241            lookup_cache_invalidate(&cache_key);
242        }
243        // Resolve all valid entries for this key directory. `scan_key_dir`
244        // serves the listing from the per-store directory cache when the dir's
245        // mtime is unchanged since the last scan (no re-`read_dir`, no per-file
246        // `stat`, no JSON re-parse), and drops TTL-expired / corrupt entries in
247        // passing — exactly the syscall storm #1114 traced.
248        let mut best: Option<(OnDiskMeta, PathBuf)> = None;
249        for scanned in self.scan_key_dir(&dir, now_nanos) {
250            let take = match best {
251                None => true,
252                Some((ref cur, _)) => mode.better(&scanned.meta, cur),
253            };
254            if take {
255                best = Some((scanned.meta, scanned.meta_path));
256            }
257        }
258        let (meta, meta_path) = match best {
259            Some(b) => b,
260            None => {
261                lookup_cache_invalidate(&cache_key);
262                return Ok(None);
263            }
264        };
265        let bin_path = meta_path.with_extension("bin");
266        let payload = match fs::read(&bin_path) {
267            Ok(v) => v,
268            Err(_) => return Ok(None),
269        };
270        // Validate checksum
271        if checksum_hex(&payload) != meta.checksum_hex {
272            fs::remove_file(&meta_path).ok();
273            fs::remove_file(&bin_path).ok();
274            lookup_cache_invalidate(&cache_key);
275            self.metadata_index_remove(&meta_path);
276            return Ok(None);
277        }
278        let entry = WarmStartEntry {
279            payload,
280            objective: meta.objective,
281            iteration: meta.iteration,
282            written_unix_secs: meta.written_unix_secs,
283            kind: meta.kind,
284        };
285        // Record (meta_path, mtime) → entry so subsequent identical lookups
286        // short-circuit until the meta file's mtime changes. The full
287        // nanosecond write timestamp is cached alongside so the fast path
288        // can re-apply the TTL cutoff without re-reading the JSON.
289        if let Ok(md) = fs::metadata(&meta_path)
290            && let Ok(mtime) = md.modified()
291        {
292            let write_nanos =
293                (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
294            lookup_cache_insert(
295                cache_key,
296                CachedLookup {
297                    meta_path: meta_path.clone(),
298                    meta_mtime: mtime,
299                    write_nanos,
300                    entry: entry.clone(),
301                },
302            );
303        }
304        Ok(Some(entry))
305    }
306
307    /// Save a new entry with a fresh run-id. Returns the run-id (caller may
308    /// hand it to [`Self::save_overwrite`] for periodic in-place updates).
309    pub fn save(
310        &self,
311        key: &Fingerprint,
312        payload: &[u8],
313        objective: Option<f64>,
314        iteration: Option<u64>,
315        kind: EntryKind,
316    ) -> Result<String, StoreError> {
317        let run_id = self.fresh_run_id();
318        self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
319        Ok(run_id)
320    }
321
322    /// Save under a specific run-id (overwrites an existing entry with the
323    /// same id atomically).
324    pub fn save_overwrite(
325        &self,
326        key: &Fingerprint,
327        run_id: &str,
328        payload: &[u8],
329        objective: Option<f64>,
330        iteration: Option<u64>,
331        kind: EntryKind,
332    ) -> Result<(), StoreError> {
333        // Any new write under this key may change which entry wins both
334        // `LookupMode::Best` and `LookupMode::Latest`, so drop both cached
335        // rows before touching disk. A pure save_overwrite of the same
336        // run_id would also bump mtime and self-invalidate, but a save()
337        // with a fresh run_id leaves the old meta file unchanged — only
338        // explicit invalidation catches that.
339        lookup_cache_invalidate(&LookupCacheKey {
340            fp: *key,
341            mode: LookupMode::Best,
342        });
343        lookup_cache_invalidate(&LookupCacheKey {
344            fp: *key,
345            mode: LookupMode::Latest,
346        });
347        let dir = self.key_dir(key);
348        let pid = std::process::id();
349        // 1. Compute checksum from payload.
350        let checksum = checksum_hex(payload);
351        let objective_finite = objective.filter(|o| o.is_finite());
352        // The meta's `written_unix_secs`/`written_nanos` are captured INSIDE the
353        // write loop — just before the meta_tmp is written, AFTER the bin write
354        // has completed. The stored timestamp drives the TTL contract: an
355        // entry's clock should start ticking from when the entry becomes
356        // (nearly) visible to lookups, not from `save_overwrite`'s entry. On
357        // slow disks the bin write + fsync + rename can take longer than the
358        // entire TTL window itself (the warm-start test fixture pins TTL=1s
359        // while the ext4-backed CI image takes >1s on small writes), so an
360        // up-front stamp causes the entry to be classified as expired the
361        // moment `save_overwrite` returns. Pushing the stamp past the bin
362        // fsync removes that systemic drift from the cost of writing the
363        // entry — only the meta fsync + final rename + dir fsync still
364        // elapse between the stamp and the entry becoming visible.
365
366        // 3. Write both temp files and atomically rename them into place. The
367        //    whole "ensure dir → write temps → rename" sequence is retried once
368        //    as a unit on `ErrorKind::NotFound`, because a concurrent process'
369        //    `evict_overflow` can `remove_dir` this key dir the instant it
370        //    observes it empty (store.rs `evict_overflow`, "Sweep now-empty key
371        //    dirs"). That removal races every write step here: it can vanish the
372        //    dir after `create_dir_all` but before a temp `File::create`, or
373        //    take the dir *and our just-written temps with it* before the
374        //    rename, surfacing as `io: No such file or directory (os error 2)`
375        //    under parallel CV / bootstrap fitting (gam#868). Retrying the
376        //    sequence (not an individual step) is the only correct response: a
377        //    bare rename retry can't recover once the source temp was swept with
378        //    the dir, so we recreate the dir and rewrite the temps from the
379        //    in-memory `payload` / `meta_json` we still hold. A single retry is
380        //    sufficient — the eviction window is one `remove_dir` syscall wide —
381        //    and a second genuine `NotFound` is propagated as before.
382        let nonce = self.nanos_now();
383        let bin_final = dir.join(format!("{run_id}.bin"));
384        let meta_final = dir.join(format!("{run_id}.json"));
385        let mut attempt = 0u8;
386        let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
387            let meta = OnDiskMeta {
388                schema_version: SCHEMA_VERSION,
389                written_unix_secs: secs,
390                written_nanos: subsec_nanos,
391                objective: objective_finite,
392                iteration,
393                kind,
394                checksum_hex: checksum.clone(),
395                payload_bytes: payload.len() as u64,
396            };
397            Ok(serde_json::to_vec_pretty(&meta)?)
398        };
399        loop {
400            let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
401            let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
402            let stamp_fn = || self.unix_now_parts();
403            let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
404                build_meta_json(secs, subsec_nanos)
405                    .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
406            };
407            match write_and_promote_entry(&EntryWrite {
408                dir: &dir,
409                bin_tmp: &bin_tmp,
410                meta_tmp: &meta_tmp,
411                payload,
412                bin_final: &bin_final,
413                meta_final: &meta_final,
414                stamp_fn: &stamp_fn,
415                build_meta_json: &build_meta_for_io,
416            }) {
417                Ok(()) => break,
418                Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
419                    // A sibling process' eviction removed the key dir mid-write.
420                    // Clean up any partial temps, then retry the whole sequence
421                    // once after recreating the dir inside `write_and_promote_entry`.
422                    fs::remove_file(&bin_tmp).ok();
423                    fs::remove_file(&meta_tmp).ok();
424                    attempt += 1;
425                    continue;
426                }
427                Err(e) => {
428                    fs::remove_file(&bin_tmp).ok();
429                    fs::remove_file(&meta_tmp).ok();
430                    fs::remove_file(&bin_final).ok();
431                    return Err(StoreError::Io(e));
432                }
433            }
434        }
435        // Fsync the containing directory so the rename itself is durable
436        // across a power loss / hard crash. fs::File::sync_all on the
437        // payload only guarantees the file content reaches disk; without
438        // also fsyncing the directory inode, the *rename* (which is what
439        // makes the entry visible to lookups) can be lost. Best-effort on
440        // platforms where opening a directory for fsync is not supported.
441        if let Ok(d) = fs::File::open(&dir) {
442            d.sync_all().ok();
443        }
444        self.metadata_index_upsert(&meta_final, &bin_final).ok();
445        // 5. Best-effort eviction; failure here is non-fatal. Throttle the
446        // full directory scan: maintain a process-wide approximate byte
447        // total and only run eviction when the total may have exceeded the
448        // budget, when the per-save counter wraps `EVICT_EVERY_N_SAVES` as
449        // a drift-resync trigger, or on the very first save (so a fresh
450        // process inheriting a populated store root sweeps once). The
451        // counter is best-effort: it can drift relative to disk truth
452        // because other processes may write/evict, but every triggered
453        // sweep resyncs it to ground truth.
454        let approx_added = payload.len() as u64 + APPROX_META_BYTES;
455        let prev_total = self.byte_total.fetch_add(approx_added, Ordering::Relaxed);
456        let new_total = prev_total + approx_added;
457        let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
458        let over_budget = new_total > self.opts.size_budget_bytes;
459        if n == 0 || over_budget || n.is_multiple_of(EVICT_EVERY_N_SAVES) {
460            self.evict_overflow().ok();
461        }
462        Ok(())
463    }
464
465    /// Drop entries older than TTL, then evict by recorded write-time
466    /// ascending until total bytes ≤ `opts.size_budget_bytes`. Idempotent;
467    /// safe under concurrent processes (worst case some entries are
468    /// double-removed, which is a no-op).
469    ///
470    /// Sort key is the `(written_unix_secs, written_nanos)` recorded in
471    /// each entry's meta, not the filesystem mtime — at second-resolution
472    /// mtime, batches of writes within the same second would sort
473    /// arbitrarily and could evict the most recent entry.
474    pub fn evict_overflow(&self) -> Result<(), StoreError> {
475        // Root-mtime short-circuit. A throttled sweep that fires while the
476        // approximate byte total is comfortably under budget has no size
477        // eviction to do; its only residual work is the TTL/byte resync walk
478        // over every key dir. The root mtime is bumped whenever a key dir is
479        // created/removed beneath it, so if it is unchanged since our last
480        // completed sweep AND we are under budget, no key dir was added or
481        // dropped and the size-eviction walk is provably a no-op — skip the
482        // N-dir `read_dir`+`stat` storm. (TTL expiry of existing entries does
483        // not move the root mtime, but it is already enforced lazily on every
484        // `lookup_with` and on the next root-changing save, so the gate cannot
485        // surface a stale entry.) This trims the residual #1114 walk in long
486        // refit-heavy CI runs where thousands of fingerprint dirs accumulate.
487        let current_root_mtime = fs::metadata(&self.root)
488            .ok()
489            .and_then(|m| m.modified().ok());
490        if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
491            && let Some(now_mtime) = current_root_mtime
492            && let Ok(last) = self.last_evict_root_mtime.lock()
493            && *last == Some(now_mtime)
494        {
495            return Ok(());
496        }
497        let read_dir = match fs::read_dir(&self.root) {
498            Ok(rd) => rd,
499            Err(_) => return Ok(()),
500        };
501        // Collect (meta_path, bin_path, total_bytes, write_nanos_since_epoch).
502        let mut all: Vec<(PathBuf, PathBuf, u64, u128)> = Vec::new();
503        let now_nanos = self.nanos_now();
504        for key_dir_entry in read_dir {
505            let key_dir = match key_dir_entry {
506                Ok(e) => e.path(),
507                Err(_) => continue,
508            };
509            if !key_dir.is_dir() {
510                continue;
511            }
512            // `scan_key_dir` reuses the per-store directory-listing cache when
513            // the key dir's mtime is unchanged, so an unchanged dir costs a
514            // single `stat` rather than a `read_dir` + per-file `stat` + JSON
515            // read of every entry. It also sweeps foreign tmp files and drops
516            // corrupt / TTL-expired entries, mirroring the old inline pass.
517            let scanned = self.scan_key_dir(&key_dir, now_nanos);
518            for entry in &scanned {
519                let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
520                    + entry.meta.written_nanos as u128;
521                let total_bytes = entry.meta_len + entry.bin_len;
522                all.push((
523                    entry.meta_path.clone(),
524                    entry.bin_path.clone(),
525                    total_bytes,
526                    write_nanos,
527                ));
528            }
529            // Sweep now-empty key dirs.
530            if scanned.is_empty()
531                && fs::read_dir(&key_dir)
532                    .map(|mut it| it.next().is_none())
533                    .unwrap_or(false)
534            {
535                fs::remove_dir(&key_dir).ok();
536                if let Ok(mut index) = self.index.lock() {
537                    index.by_key_dir.remove(&key_dir);
538                }
539            }
540        }
541        let total: u64 = all.iter().map(|e| e.2).sum();
542        if total <= self.opts.size_budget_bytes {
543            // Resync the approximate byte counter even when no eviction was
544            // needed. Otherwise the in-memory `byte_total` only grows (it
545            // never observes deletions made by sibling processes or
546            // expiration sweeps), so after enough saves `new_total` exceeds
547            // the budget on every call and triggers a full directory walk
548            // on every save instead of every Nth save.
549            self.byte_total.store(total, Ordering::Relaxed);
550            // Record the root mtime observed by this completed under-budget
551            // sweep so a subsequent throttled sweep can short-circuit while
552            // the root is unchanged. Re-read after the walk: any key dir the
553            // walk removed (empty-dir sweep above) bumps the root mtime, and
554            // capturing the post-walk value keeps the gate from skipping a
555            // genuinely-changed root on the next call.
556            if let (Ok(mut last), Some(m)) = (
557                self.last_evict_root_mtime.lock(),
558                fs::metadata(&self.root)
559                    .ok()
560                    .and_then(|m| m.modified().ok()),
561            ) {
562                *last = Some(m);
563            }
564            return Ok(());
565        }
566        all.sort_by_key(|e| e.3);
567        let mut remaining = total;
568        for (meta, bin, bytes, _) in all.into_iter() {
569            if remaining <= self.opts.size_budget_bytes {
570                break;
571            }
572            fs::remove_file(&meta).ok();
573            fs::remove_file(&bin).ok();
574            self.metadata_index_remove(&meta);
575            remaining = remaining.saturating_sub(bytes);
576        }
577        // Resync the approximate byte counter to ground truth. Subsequent
578        // saves increment from here until the next sweep.
579        self.byte_total.store(remaining, Ordering::Relaxed);
580        Ok(())
581    }
582}
583
584/// Ensure the key dir exists, write the `.bin` and `.json` temp files, and
585/// atomically rename both into place. Returns the raw `io::Error` (not a
586/// `StoreError`) so the caller can branch on `ErrorKind::NotFound` to retry the
587/// whole sequence after a concurrent eviction removed the dir mid-write
588/// (gam#868). Idempotent across a retry: every path is derived from the caller's
589/// stable args and the temps are rewritten from the in-memory payload, so a
590/// second pass into a freshly recreated dir produces the same final entry.
591///
592/// `.bin` is renamed before `.json` so a meta-pointing-to-missing-bin window is
593/// impossible on the happy path; a reader that catches `.bin`-missing treats the
594/// entry as corrupt and cleans it up.
595struct EntryWrite<'a> {
596    dir: &'a Path,
597    bin_tmp: &'a Path,
598    meta_tmp: &'a Path,
599    payload: &'a [u8],
600    bin_final: &'a Path,
601    meta_final: &'a Path,
602    /// Read the current wall clock as `(unix_secs, subsec_nanos)`. Called
603    /// AFTER the bin write and bin fsync complete (and after the bin rename)
604    /// so the recorded write time tracks when the entry actually becomes
605    /// (nearly) visible, not when `save_overwrite` was first invoked. On
606    /// slow disks the bin fsync can dominate save latency and a pre-write
607    /// stamp would burn TTL the caller never sees.
608    stamp_fn: &'a dyn Fn() -> (u64, u32),
609    /// Build the meta JSON given the captured `(secs, subsec_nanos)`. The
610    /// closure folds those values into `OnDiskMeta` and serializes it.
611    build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
612}
613
614fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
615    // Recreate the dir up front: on the first attempt this is the original
616    // `create_dir_all`; on a retry it re-establishes the dir a sibling
617    // process' eviction removed.
618    fs::create_dir_all(w.dir)?;
619    {
620        let mut f = fs::File::create(w.bin_tmp)?;
621        f.write_all(w.payload)?;
622        f.sync_all().ok();
623    }
624    // Promote the bin first so a crash between the two renames leaves an
625    // orphan .bin (cleaned up by `evict_overflow`) rather than a meta
626    // pointing at a missing .bin (which the reader would mark corrupt).
627    fs::rename(w.bin_tmp, w.bin_final)?;
628    // Stamp the meta AFTER the bin promotion. This is the latest moment the
629    // timestamp can still be inlined into the meta JSON. The remaining gap
630    // before the entry is visible to lookups is one meta write+fsync + the
631    // meta rename + the caller's directory fsync — all bounded, so TTL is
632    // measured from a near-visible moment instead of from the entry to
633    // `save_overwrite`.
634    let (secs, subsec_nanos) = (w.stamp_fn)();
635    let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
636    {
637        let mut f = fs::File::create(w.meta_tmp)?;
638        f.write_all(&meta_json)?;
639        f.sync_all().ok();
640    }
641    if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
642        // Roll back the bin we just promoted to avoid orphaning it, then
643        // surface the error so the caller can retry or fail.
644        fs::remove_file(w.bin_final).ok();
645        return Err(e);
646    }
647    Ok(())
648}
649
650/// Conservative meta-JSON size used by the throttled save counter. Real
651/// meta files run ~250-400 bytes after pretty-printing; overestimating
652/// just means the throttle fires slightly earlier, never later.
653const APPROX_META_BYTES: u64 = 512;
654
655/// How [`WarmStartStore::lookup_with`] ranks candidate entries.
656#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
657enum LookupMode {
658    /// Lowest objective wins; ties to [`entry_better`].
659    Best,
660    /// Newest write wins; objectives ignored.
661    Latest,
662}
663
664impl LookupMode {
665    fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
666        match self {
667            LookupMode::Best => entry_better(candidate, current),
668            LookupMode::Latest => entry_newer(candidate, current),
669        }
670    }
671}
672
673#[derive(Clone, Copy, PartialEq, Eq, Hash)]
674struct LookupCacheKey {
675    fp: Fingerprint,
676    mode: LookupMode,
677}
678
679#[derive(Clone)]
680struct CachedLookup {
681    meta_path: PathBuf,
682    meta_mtime: SystemTime,
683    /// Full-precision nanosecond write timestamp from the on-disk meta,
684    /// kept alongside `entry.written_unix_secs` so the fast path can apply
685    /// the same TTL cutoff as `evict_overflow` without re-reading the JSON.
686    write_nanos: u128,
687    entry: WarmStartEntry,
688}
689
690#[derive(Debug, Default)]
691struct MetadataIndex {
692    by_meta_path: HashMap<PathBuf, IndexedMeta>,
693    /// Per-key-directory cached listing, keyed by the directory's mtime.
694    ///
695    /// A key dir's mtime is bumped by the OS whenever an entry is created,
696    /// renamed, or removed inside it (which is exactly when our entries
697    /// change). So a matching `dir_mtime` means the set of `<runid>.{json,bin}`
698    /// pairs is byte-for-byte what we scanned last time, letting
699    /// [`WarmStartStore::scan_key_dir`] return the cached `Vec<ScannedEntry>`
700    /// without a fresh `read_dir` or any per-file `stat`/JSON read. This is
701    /// what kills the metadata-syscall storm in repeated `lookup_with` /
702    /// `evict_overflow` calls within one fit (gam#1114).
703    by_key_dir: HashMap<PathBuf, ScannedDir>,
704}
705
706#[derive(Debug, Clone)]
707struct IndexedMeta {
708    meta_mtime: SystemTime,
709    meta_len: u64,
710    bin_len: u64,
711    meta: OnDiskMeta,
712}
713
714impl IndexedMeta {
715    fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
716        meta_md.modified().ok() == Some(self.meta_mtime)
717            && meta_md.len() == self.meta_len
718            && bin_md.len() == self.bin_len
719    }
720}
721
722/// Cached result of scanning one key directory: its mtime at scan time plus
723/// the resolved entries. Reused verbatim while the dir's mtime is unchanged.
724#[derive(Debug, Clone)]
725struct ScannedDir {
726    dir_mtime: SystemTime,
727    entries: Vec<ScannedEntry>,
728}
729
730/// One resolved `(meta, bin)` pair discovered during a key-dir scan. Carries
731/// everything both the lookup ranker and the eviction sweep need so neither
732/// has to re-`stat` or re-read the files when the dir is unchanged.
733#[derive(Debug, Clone)]
734struct ScannedEntry {
735    meta_path: PathBuf,
736    bin_path: PathBuf,
737    meta_len: u64,
738    bin_len: u64,
739    meta_mtime: Option<SystemTime>,
740    bin_mtime: Option<SystemTime>,
741    meta: OnDiskMeta,
742}
743
744impl ScannedEntry {
745    fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
746        meta_md.len() == self.meta_len
747            && bin_md.len() == self.bin_len
748            && meta_md.modified().ok() == self.meta_mtime
749            && bin_md.modified().ok() == self.bin_mtime
750    }
751}
752
753/// True iff a meta with the given (`secs`, `nanos`) write timestamp is older
754/// than `ttl` relative to `now_nanos`. Mirrors the cutoff in
755/// [`WarmStartStore::evict_overflow`] so `lookup_with` cannot return an entry
756/// that the eviction sweep would have dropped.
757const fn meta_expired(secs: u64, nanos: u32, ttl: Duration, now_nanos: u128) -> bool {
758    let ttl_nanos = ttl.as_nanos();
759    if ttl_nanos == 0 {
760        return false;
761    }
762    let write_nanos = (secs as u128) * 1_000_000_000u128 + nanos as u128;
763    now_nanos.saturating_sub(write_nanos) >= ttl_nanos
764}
765
766/// Process-wide in-memory cache for [`WarmStartStore::lookup_with`]. Hot poll
767/// loops hit the same (key, mode) repeatedly between writes, so caching the
768/// resolved entry behind an mtime check eliminates the per-call directory
769/// walk, JSON parse, and SHA-256 recomputation. Mtime mismatch — including
770/// writes from a sibling process — invalidates the row and falls back to
771/// the full slow path.
772fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
773    static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
774    CACHE.get_or_init(|| Mutex::new(HashMap::new()))
775}
776
777const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
778const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
779
780const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
781    std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
782}
783
784fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
785    let guard = lookup_cache().lock().ok()?;
786    guard.get(key).cloned()
787}
788
789fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
790    if let Ok(mut guard) = lookup_cache().lock() {
791        let new_bytes = cached_lookup_resident_bytes(&val);
792        if new_bytes > LOOKUP_CACHE_MAX_BYTES {
793            return;
794        }
795        let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
796        if let Some(old) = guard.remove(&key) {
797            resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
798        }
799        while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
800            || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
801        {
802            let oldest = guard
803                .iter()
804                .min_by_key(|(_, cached)| cached.write_nanos)
805                .map(|(old_key, _)| *old_key);
806            let Some(oldest) = oldest else {
807                break;
808            };
809            if let Some(old) = guard.remove(&oldest) {
810                resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
811            }
812        }
813        guard.insert(key, val);
814    }
815}
816
817fn lookup_cache_invalidate(key: &LookupCacheKey) {
818    if let Ok(mut guard) = lookup_cache().lock() {
819        guard.remove(key);
820    }
821}
822
823/// Run a full [`WarmStartStore::evict_overflow`] sweep every Nth save. The
824/// budget can briefly overshoot by K-1 payloads, which the next sweep
825/// reclaims. K=32 keeps the amortized cost negligible on hot checkpoint
826/// paths while still bounding worst-case disk drift.
827const EVICT_EVERY_N_SAVES: u64 = 32;
828
829fn parse_tmp_pid(name: &str) -> Option<u32> {
830    // Names look like "<runid>.bin.tmp.<pid>.<nonce>.<attempt>" or
831    // "<runid>.json.tmp.<pid>.<nonce>.<attempt>" (the trailing retry-attempt
832    // suffix is irrelevant here — only the first token after ".tmp." is the pid).
833    let tail = name.split(".tmp.").nth(1)?;
834    let pid_str = tail.split('.').next()?;
835    pid_str.parse::<u32>().ok()
836}
837
838fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
839    let bytes = fs::read(path)?;
840    let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
841    Ok(parsed)
842}
843
844fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
845    match (candidate.objective, current.objective) {
846        (Some(c), Some(d)) => {
847            if (c - d).abs() < 1e-12 {
848                match (candidate.kind, current.kind) {
849                    (EntryKind::Final, EntryKind::Checkpoint) => true,
850                    (EntryKind::Checkpoint, EntryKind::Final) => false,
851                    _ => entry_newer(candidate, current),
852                }
853            } else {
854                c < d
855            }
856        }
857        (Some(_), None) => true,
858        (None, Some(_)) => false,
859        (None, None) => entry_newer(candidate, current),
860    }
861}
862
863fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
864    let candidate_stamp = (
865        candidate.written_unix_secs,
866        candidate.written_nanos,
867        candidate_kind_rank(candidate.kind),
868    );
869    let current_stamp = (
870        current.written_unix_secs,
871        current.written_nanos,
872        candidate_kind_rank(current.kind),
873    );
874    candidate_stamp > current_stamp
875}
876
877const fn candidate_kind_rank(kind: EntryKind) -> u8 {
878    match kind {
879        EntryKind::Checkpoint => 0,
880        EntryKind::Final => 1,
881    }
882}
883
884fn checksum_hex(payload: &[u8]) -> String {
885    let mut h = Sha256::new();
886    h.update(payload);
887    let out = h.finalize();
888    let mut s = String::with_capacity(out.len() * 2);
889    for b in out.iter() {
890        use std::fmt::Write;
891        write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
892    }
893    s
894}
895
896impl WarmStartStore {
897    fn read_meta_indexed(
898        &self,
899        path: &Path,
900        meta_md: &fs::Metadata,
901        bin_md: &fs::Metadata,
902    ) -> Result<OnDiskMeta, StoreError> {
903        if let Ok(index) = self.index.lock()
904            && let Some(cached) = index.by_meta_path.get(path)
905            && cached.matches(meta_md, bin_md)
906        {
907            return Ok(cached.meta.clone());
908        }
909
910        let meta = read_meta(path)?;
911        let Some(meta_mtime) = meta_md.modified().ok() else {
912            return Ok(meta);
913        };
914        if let Ok(mut index) = self.index.lock() {
915            index.by_meta_path.insert(
916                path.to_path_buf(),
917                IndexedMeta {
918                    meta_mtime,
919                    meta_len: meta_md.len(),
920                    bin_len: bin_md.len(),
921                    meta: meta.clone(),
922                },
923            );
924        }
925        Ok(meta)
926    }
927
928    fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
929        let meta_md = fs::metadata(meta_path)?;
930        let bin_md = fs::metadata(bin_path)?;
931        self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
932        // A fresh entry just landed in this key dir, so any cached listing for
933        // the dir is stale. Drop it; the next scan rebuilds and re-caches.
934        if let Some(parent) = meta_path.parent()
935            && let Ok(mut index) = self.index.lock()
936        {
937            index.by_key_dir.remove(parent);
938        }
939        Ok(())
940    }
941
942    fn metadata_index_remove(&self, meta_path: &Path) {
943        if let Ok(mut index) = self.index.lock() {
944            index.by_meta_path.remove(meta_path);
945            if let Some(parent) = meta_path.parent() {
946                index.by_key_dir.remove(parent);
947            }
948        }
949    }
950
951    fn metadata_index_remove_key(&self, key: &Fingerprint) {
952        let dir = self.key_dir(key);
953        if let Ok(mut index) = self.index.lock() {
954            index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
955            index.by_key_dir.remove(&dir);
956        }
957    }
958
959    /// Cached listing lookup for one key directory.
960    ///
961    /// Returns the cached `Vec<ScannedEntry>` if the directory's current mtime
962    /// matches the cached scan (no entry added/removed since), otherwise
963    /// `None` so the caller performs a fresh scan via [`Self::scan_key_dir`].
964    ///
965    /// A matching dir mtime guarantees the *set* of files is unchanged, but TTL
966    /// is wall-clock relative, so an entry valid at scan time can expire while
967    /// the listing is still cached. The caller re-applies the TTL cutoff to the
968    /// returned entries; this only proves the file set is stable.
969    fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
970        let dir_mtime = dir_md.modified().ok()?;
971        let index = self.index.lock().ok()?;
972        let cached = index.by_key_dir.get(dir)?;
973        if cached.dir_mtime != dir_mtime {
974            return None;
975        }
976        for entry in &cached.entries {
977            let meta_md = fs::metadata(&entry.meta_path).ok()?;
978            let bin_md = fs::metadata(&entry.bin_path).ok()?;
979            if !entry.matches_files(&meta_md, &bin_md) {
980                return None;
981            }
982        }
983        Some(cached.entries.clone())
984    }
985
986    fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
987        if let Ok(mut index) = self.index.lock() {
988            index.by_key_dir.insert(
989                dir.to_path_buf(),
990                ScannedDir {
991                    dir_mtime,
992                    entries: entries.to_vec(),
993                },
994            );
995        }
996    }
997
998    /// Scan one key directory, resolving every valid `(meta, bin)` pair and
999    /// cleaning up corrupt / orphaned / schema-mismatched files in passing.
1000    ///
1001    /// Serves both [`Self::lookup_with`] and [`Self::evict_overflow`]: when the
1002    /// directory's mtime is unchanged since the previous scan it returns the
1003    /// cached listing without a single `read_dir`, `metadata`, or JSON read —
1004    /// the metadata-syscall storm that #1114 traced. A fresh scan re-caches the
1005    /// listing keyed by the dir mtime observed *after* any cleanup, so a later
1006    /// unchanged call hits the cache. (`now_nanos` drives the TTL drop; expired
1007    /// entries are removed and excluded from the result.)
1008    ///
1009    /// `.tmp.*` files belonging to other processes are swept; same-PID temps
1010    /// (in-flight writes from us) are left alone.
1011    fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1012        let dir_md = match fs::metadata(dir) {
1013            Ok(m) => m,
1014            Err(_) => return Vec::new(),
1015        };
1016        if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1017            // The file set is unchanged, but TTL is wall-clock relative: an
1018            // entry valid when scanned may have expired since. Re-apply the
1019            // cutoff against `now_nanos`, removing any that crossed it. If none
1020            // expired we return the cached listing untouched (the fast path);
1021            // otherwise the removals bump the dir mtime, so we drop the stale
1022            // cache and re-cache the survivors keyed by the post-removal mtime.
1023            let any_expired = cached.iter().any(|e| {
1024                meta_expired(
1025                    e.meta.written_unix_secs,
1026                    e.meta.written_nanos,
1027                    self.opts.ttl,
1028                    now_nanos,
1029                )
1030            });
1031            if !any_expired {
1032                return cached;
1033            }
1034            let mut survivors = Vec::with_capacity(cached.len());
1035            for entry in cached {
1036                if meta_expired(
1037                    entry.meta.written_unix_secs,
1038                    entry.meta.written_nanos,
1039                    self.opts.ttl,
1040                    now_nanos,
1041                ) {
1042                    fs::remove_file(&entry.meta_path).ok();
1043                    fs::remove_file(&entry.bin_path).ok();
1044                    self.metadata_index_remove(&entry.meta_path);
1045                } else {
1046                    survivors.push(entry);
1047                }
1048            }
1049            if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1050                self.store_dir_scan(dir, mtime, &survivors);
1051            }
1052            return survivors;
1053        }
1054        let read_dir = match fs::read_dir(dir) {
1055            Ok(rd) => rd,
1056            Err(_) => return Vec::new(),
1057        };
1058        let mut entries = Vec::new();
1059        let mut mutated = false;
1060        for f in read_dir {
1061            let path = match f {
1062                Ok(e) => e.path(),
1063                Err(_) => continue,
1064            };
1065            let name = match path.file_name().and_then(|s| s.to_str()) {
1066                Some(s) => s,
1067                None => continue,
1068            };
1069            if name.contains(".tmp.") {
1070                if let Some(pid) = parse_tmp_pid(name)
1071                    && pid != std::process::id()
1072                {
1073                    fs::remove_file(&path).ok();
1074                    mutated = true;
1075                }
1076                continue;
1077            }
1078            if path.extension().and_then(|s| s.to_str()) != Some("json") {
1079                continue;
1080            }
1081            let meta_md = match fs::metadata(&path) {
1082                Ok(m) => m,
1083                Err(_) => continue,
1084            };
1085            let bin = path.with_extension("bin");
1086            let bin_md = match fs::metadata(&bin) {
1087                Ok(m) => m,
1088                Err(_) => {
1089                    fs::remove_file(&path).ok();
1090                    self.metadata_index_remove(&path);
1091                    mutated = true;
1092                    continue;
1093                }
1094            };
1095            let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1096                Ok(m) => m,
1097                Err(_) => {
1098                    fs::remove_file(&path).ok();
1099                    fs::remove_file(&bin).ok();
1100                    self.metadata_index_remove(&path);
1101                    mutated = true;
1102                    continue;
1103                }
1104            };
1105            if meta.schema_version != SCHEMA_VERSION {
1106                fs::remove_file(&path).ok();
1107                fs::remove_file(&bin).ok();
1108                self.metadata_index_remove(&path);
1109                mutated = true;
1110                continue;
1111            }
1112            if meta_expired(
1113                meta.written_unix_secs,
1114                meta.written_nanos,
1115                self.opts.ttl,
1116                now_nanos,
1117            ) {
1118                fs::remove_file(&path).ok();
1119                fs::remove_file(&bin).ok();
1120                self.metadata_index_remove(&path);
1121                mutated = true;
1122                continue;
1123            }
1124            entries.push(ScannedEntry {
1125                meta_path: path,
1126                bin_path: bin,
1127                meta_len: meta_md.len(),
1128                bin_len: bin_md.len(),
1129                meta_mtime: meta_md.modified().ok(),
1130                bin_mtime: bin_md.modified().ok(),
1131                meta,
1132            });
1133        }
1134        // Cache keyed by the mtime *after* any cleanup so the next unchanged
1135        // call is a cache hit. If cleanup mutated the dir, re-stat to capture
1136        // the post-mutation mtime; otherwise reuse the mtime we already read.
1137        let final_mtime = if mutated {
1138            fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1139        } else {
1140            dir_md.modified().ok()
1141        };
1142        if let Some(mtime) = final_mtime {
1143            self.store_dir_scan(dir, mtime, &entries);
1144        }
1145        entries
1146    }
1147
1148    fn test_time_offset_ns(&self) -> u64 {
1149        self.test_time_offset_ns.load(Ordering::Relaxed)
1150    }
1151
1152    fn unix_now_parts(&self) -> (u64, u32) {
1153        let base = SystemTime::now()
1154            .duration_since(UNIX_EPOCH)
1155            .map(|d| d.as_nanos())
1156            .unwrap_or(0);
1157        let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1158        let secs = (total / 1_000_000_000u128) as u64;
1159        let nanos = (total % 1_000_000_000u128) as u32;
1160        (secs, nanos)
1161    }
1162
1163    fn nanos_now(&self) -> u128 {
1164        let base = SystemTime::now()
1165            .duration_since(UNIX_EPOCH)
1166            .map(|d| d.as_nanos())
1167            .unwrap_or(0);
1168        base.saturating_add(u128::from(self.test_time_offset_ns()))
1169    }
1170
1171    fn fresh_run_id(&self) -> String {
1172        let pid = std::process::id();
1173        let nanos = self.nanos_now();
1174        format!("r{pid:x}-{nanos:x}")
1175    }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180    use super::*;
1181    use crate::warm_start::key::Fingerprinter;
1182
1183    impl WarmStartStore {
1184        /// Advance this store's simulated monotonic clock by `dur`. Only
1185        /// available in tests — production code reads the real wall clock and
1186        /// never mutates the per-store offset.
1187        fn test_advance_time(&self, dur: Duration) {
1188            self.test_time_offset_ns
1189                .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1190        }
1191    }
1192
1193    fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1194        let dir = tempfile::tempdir().unwrap();
1195        let store = WarmStartStore::open(
1196            dir.path().to_path_buf(),
1197            StoreOptions {
1198                size_budget_bytes: 1024 * 1024,
1199                ttl: Duration::from_secs(60),
1200            },
1201        )
1202        .unwrap();
1203        (dir, store)
1204    }
1205
1206    fn key_for(s: &str) -> Fingerprint {
1207        let mut fp = Fingerprinter::new();
1208        fp.absorb_str(b"test", s);
1209        fp.finalize()
1210    }
1211
1212    #[test]
1213    fn roundtrip_save_then_lookup() {
1214        let (_d, store) = temp_store();
1215        let key = key_for("roundtrip");
1216        let _id = store
1217            .save(
1218                &key,
1219                b"hello-warm",
1220                Some(1.5),
1221                Some(7),
1222                EntryKind::Checkpoint,
1223            )
1224            .unwrap();
1225        let got = store.lookup(&key).unwrap().unwrap();
1226        assert_eq!(got.payload, b"hello-warm");
1227        assert_eq!(got.objective, Some(1.5));
1228        assert_eq!(got.iteration, Some(7));
1229        assert_eq!(got.kind, EntryKind::Checkpoint);
1230    }
1231
1232    #[test]
1233    fn lookup_picks_lowest_objective() {
1234        let (_d, store) = temp_store();
1235        let key = key_for("multi");
1236        store
1237            .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1238            .unwrap();
1239        store
1240            .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1241            .unwrap();
1242        store
1243            .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1244            .unwrap();
1245        let got = store.lookup(&key).unwrap().unwrap();
1246        assert_eq!(got.payload, b"better");
1247        assert_eq!(got.objective, Some(1.0));
1248    }
1249
1250    #[test]
1251    fn lookup_latest_ignores_objective_ordering() {
1252        let (_d, store) = temp_store();
1253        let key = key_for("latest-vs-best");
1254        store
1255            .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1256            .unwrap();
1257        store.test_advance_time(Duration::from_millis(2));
1258        store
1259            .save(
1260                &key,
1261                b"newer-higher-objective",
1262                Some(10.0),
1263                Some(2),
1264                EntryKind::Checkpoint,
1265            )
1266            .unwrap();
1267
1268        let best = store.lookup(&key).unwrap().unwrap();
1269        assert_eq!(best.payload, b"low-objective");
1270
1271        let latest = store.lookup_latest(&key).unwrap().unwrap();
1272        assert_eq!(latest.payload, b"newer-higher-objective");
1273        assert_eq!(latest.iteration, Some(2));
1274    }
1275
1276    #[test]
1277    fn tiebreak_final_beats_checkpoint() {
1278        let (_d, store) = temp_store();
1279        let key = key_for("tie");
1280        store
1281            .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1282            .unwrap();
1283        // Same objective, different kind.
1284        store
1285            .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1286            .unwrap();
1287        let got = store.lookup(&key).unwrap().unwrap();
1288        assert_eq!(got.payload, b"final");
1289        assert_eq!(got.kind, EntryKind::Final);
1290    }
1291
1292    #[test]
1293    fn tiebreak_latest_mtime_when_no_objective() {
1294        let (_d, store) = temp_store();
1295        let key = key_for("latest");
1296        store
1297            .save(&key, b"first", None, None, EntryKind::Checkpoint)
1298            .unwrap();
1299        store.test_advance_time(Duration::from_millis(1_100));
1300        store
1301            .save(&key, b"second", None, None, EntryKind::Checkpoint)
1302            .unwrap();
1303        let got = store.lookup(&key).unwrap().unwrap();
1304        assert_eq!(got.payload, b"second");
1305    }
1306
1307    #[test]
1308    fn corrupt_payload_is_cleaned_up() {
1309        let (_d, store) = temp_store();
1310        let key = key_for("corrupt");
1311        store
1312            .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1313            .unwrap();
1314        // Tamper with the .bin file.
1315        let dir = store.key_dir(&key);
1316        for entry in fs::read_dir(&dir).unwrap() {
1317            let p = entry.unwrap().path();
1318            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1319                fs::write(&p, b"tampered!").unwrap();
1320            }
1321        }
1322        let got = store.lookup(&key).unwrap();
1323        assert!(got.is_none(), "tampered entry must be rejected");
1324        // The corrupt files should be cleaned up so they don't accumulate.
1325        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1326        assert!(remaining.is_empty(), "corrupt entry should be removed");
1327    }
1328
1329    #[test]
1330    fn corrupt_meta_json_is_cleaned_up() {
1331        let (_d, store) = temp_store();
1332        let key = key_for("badjson");
1333        store
1334            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1335            .unwrap();
1336        let dir = store.key_dir(&key);
1337        for entry in fs::read_dir(&dir).unwrap() {
1338            let p = entry.unwrap().path();
1339            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1340                fs::write(&p, b"{not valid json").unwrap();
1341            }
1342        }
1343        let got = store.lookup(&key).unwrap();
1344        assert!(got.is_none());
1345    }
1346
1347    #[test]
1348    fn schema_mismatched_entry_is_cleaned_up() {
1349        let (_d, store) = temp_store();
1350        let key = key_for("schema");
1351        store
1352            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1353            .unwrap();
1354        let dir = store.key_dir(&key);
1355        for entry in fs::read_dir(&dir).unwrap() {
1356            let p = entry.unwrap().path();
1357            if p.extension().and_then(|s| s.to_str()) == Some("json") {
1358                let raw = fs::read(&p).unwrap();
1359                let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1360                parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1361                fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1362            }
1363        }
1364        assert!(store.lookup(&key).unwrap().is_none());
1365        let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1366        assert!(
1367            remaining.is_empty(),
1368            "schema-mismatched entry should be removed"
1369        );
1370    }
1371
1372    #[test]
1373    fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1374        let dir = tempfile::tempdir().unwrap();
1375        let store = WarmStartStore::open(
1376            dir.path().to_path_buf(),
1377            StoreOptions {
1378                size_budget_bytes: 6 * 1024,
1379                ttl: Duration::from_secs(3600),
1380            },
1381        )
1382        .unwrap();
1383        let stale_key = key_for("schema-size-stale");
1384        store
1385            .save(
1386                &stale_key,
1387                &vec![0u8; 4 * 1024],
1388                None,
1389                None,
1390                EntryKind::Checkpoint,
1391            )
1392            .unwrap();
1393
1394        let stale_dir = store.key_dir(&stale_key);
1395        let mut stale_meta = None;
1396        let mut stale_bin = None;
1397        for entry in fs::read_dir(&stale_dir).unwrap() {
1398            let p = entry.unwrap().path();
1399            match p.extension().and_then(|s| s.to_str()) {
1400                Some("json") => {
1401                    let raw = fs::read(&p).unwrap();
1402                    let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1403                    parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1404                    fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1405                    stale_meta = Some(p);
1406                }
1407                Some("bin") => stale_bin = Some(p),
1408                _ => {}
1409            }
1410        }
1411        let stale_meta = stale_meta.expect("saved entry should have metadata");
1412        let stale_bin = stale_bin.expect("saved entry should have payload");
1413
1414        let fresh_key = key_for("schema-size-fresh");
1415        store
1416            .save(
1417                &fresh_key,
1418                &vec![1u8; 2 * 1024],
1419                None,
1420                None,
1421                EntryKind::Checkpoint,
1422            )
1423            .unwrap();
1424
1425        assert!(
1426            !stale_meta.exists(),
1427            "schema-mismatched metadata should be removed during eviction scan"
1428        );
1429        assert!(
1430            !stale_bin.exists(),
1431            "schema-mismatched payload should be removed during eviction scan"
1432        );
1433
1434        let mut total = 0u64;
1435        for key_dir in fs::read_dir(store.root()).unwrap() {
1436            let key_dir = key_dir.unwrap().path();
1437            if key_dir.is_dir() {
1438                for entry in fs::read_dir(key_dir).unwrap() {
1439                    total += fs::metadata(entry.unwrap().path()).unwrap().len();
1440                }
1441            }
1442        }
1443        assert!(
1444            total <= store.options().size_budget_bytes,
1445            "schema-mismatched bytes must not leak past size accounting (got {total})"
1446        );
1447        assert!(store.lookup(&stale_key).unwrap().is_none());
1448        assert!(store.lookup(&fresh_key).unwrap().is_some());
1449    }
1450
1451    #[test]
1452    fn missing_bin_treated_as_missing() {
1453        let (_d, store) = temp_store();
1454        let key = key_for("nobin");
1455        store
1456            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1457            .unwrap();
1458        let dir = store.key_dir(&key);
1459        for entry in fs::read_dir(&dir).unwrap() {
1460            let p = entry.unwrap().path();
1461            if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1462                fs::remove_file(&p).unwrap();
1463            }
1464        }
1465        assert!(store.lookup(&key).unwrap().is_none());
1466    }
1467
1468    #[test]
1469    fn missing_key_returns_none() {
1470        let (_d, store) = temp_store();
1471        let key = key_for("absent");
1472        assert!(store.lookup(&key).unwrap().is_none());
1473    }
1474
1475    #[test]
1476    fn lru_eviction_under_size_budget() {
1477        let dir = tempfile::tempdir().unwrap();
1478        // Tiny budget: 4 KiB. Each entry payload + meta JSON is ~600 B.
1479        let store = WarmStartStore::open(
1480            dir.path().to_path_buf(),
1481            StoreOptions {
1482                size_budget_bytes: 4 * 1024,
1483                ttl: Duration::from_secs(3600),
1484            },
1485        )
1486        .unwrap();
1487        let mut keys = Vec::new();
1488        for i in 0..20 {
1489            let mut fp = Fingerprinter::new();
1490            fp.absorb_u64(b"i", i);
1491            let key = fp.finalize();
1492            keys.push(key);
1493            let payload = vec![0u8; 256];
1494            store
1495                .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1496                .unwrap();
1497        }
1498        // Walk the store root and confirm total bytes is bounded.
1499        let mut total = 0u64;
1500        for kd in fs::read_dir(store.root()).unwrap() {
1501            let kd = kd.unwrap().path();
1502            if kd.is_dir() {
1503                for f in fs::read_dir(&kd).unwrap() {
1504                    total += fs::metadata(f.unwrap().path()).unwrap().len();
1505                }
1506            }
1507        }
1508        assert!(
1509            total <= 8 * 1024,
1510            "eviction failed to bound size (got {total})"
1511        );
1512        // Earliest keys must have been evicted; latest survive.
1513        assert!(store.lookup(&keys[0]).unwrap().is_none());
1514        assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1515    }
1516
1517    #[test]
1518    fn ttl_drops_old_entries() {
1519        // Expiration is driven by `test_advance_time` (additive simulated time
1520        // on top of the wall clock), so the TTL itself only needs to be larger
1521        // than any plausible save→lookup wall-time on the CI runner. The
1522        // 1-second TTL the original fixture used was tighter than the worst
1523        // ext4 fsync this image sees (see `save_overwrite`'s late-stamp
1524        // comment), so the first `is_some()` check would flake to "expired"
1525        // before any time advance ever ran. 60 s clears that race with margin.
1526        let dir = tempfile::tempdir().unwrap();
1527        let ttl = Duration::from_secs(60);
1528        let store = WarmStartStore::open(
1529            dir.path().to_path_buf(),
1530            StoreOptions {
1531                size_budget_bytes: 1024 * 1024,
1532                ttl,
1533            },
1534        )
1535        .unwrap();
1536        let key = key_for("ttl");
1537        store
1538            .save(&key, b"x", None, None, EntryKind::Checkpoint)
1539            .unwrap();
1540        assert!(store.lookup(&key).unwrap().is_some());
1541        store.test_advance_time(ttl + Duration::from_secs(5));
1542        // Trigger eviction via a save under an unrelated key.
1543        let other = key_for("ttl-other");
1544        store
1545            .save(&other, b"y", None, None, EntryKind::Checkpoint)
1546            .unwrap();
1547        // Original now expired.
1548        assert!(store.lookup(&key).unwrap().is_none());
1549        assert!(store.lookup(&other).unwrap().is_some());
1550    }
1551
1552    #[test]
1553    fn orphan_temp_files_from_dead_processes_are_swept() {
1554        let (_d, store) = temp_store();
1555        let key = key_for("tmp");
1556        let dir = store.key_dir(&key);
1557        fs::create_dir_all(&dir).unwrap();
1558        // Use PID 1 — never the current process, so it counts as "other".
1559        let orphan_other = dir.join("r0-0.json.tmp.1.0");
1560        let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1561        fs::write(&orphan_other, b"orphan").unwrap();
1562        fs::write(&mine, b"mine").unwrap();
1563        store.evict_overflow().unwrap();
1564        assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1565        assert!(mine.exists(), "same-PID tmp file must be left alone");
1566    }
1567
1568    #[test]
1569    fn tmp_filenames_without_pid_are_skipped() {
1570        // Malformed tmp names (no parseable pid) must not crash the sweep.
1571        let (_d, store) = temp_store();
1572        let key = key_for("malformed");
1573        let dir = store.key_dir(&key);
1574        fs::create_dir_all(&dir).unwrap();
1575        let weird = dir.join("garbage.tmp.notapid.suffix");
1576        fs::write(&weird, b"x").unwrap();
1577        // Must not panic.
1578        store.evict_overflow().unwrap();
1579        assert!(weird.exists());
1580    }
1581
1582    #[test]
1583    fn save_overwrite_keeps_single_entry() {
1584        let (_d, store) = temp_store();
1585        let key = key_for("overwrite");
1586        let id = store
1587            .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1588            .unwrap();
1589        store
1590            .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1591            .unwrap();
1592        // Only one (meta, bin) pair on disk.
1593        let dir = store.key_dir(&key);
1594        let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1595        assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1596        let got = store.lookup(&key).unwrap().unwrap();
1597        assert_eq!(got.payload, b"v2");
1598        assert_eq!(got.objective, Some(1.0));
1599    }
1600
1601    #[test]
1602    fn write_and_promote_recreates_dir_removed_before_write() {
1603        // gam#868: a sibling process' eviction can `remove_dir` the key dir the
1604        // instant it observes it empty, racing every write step in `save`. The
1605        // promote helper must recreate the dir rather than failing with ENOENT.
1606        let (_d, store) = temp_store();
1607        let key = key_for("race-recreate");
1608        let dir = store.key_dir(&key);
1609        // Dir does NOT exist yet (simulates eviction having removed it after a
1610        // prior `create_dir_all`). The helper must create it and succeed.
1611        assert!(!dir.exists());
1612        let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1613        let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1614        let bin_final = dir.join("r0.bin");
1615        let meta_final = dir.join("r0.json");
1616        let stamp_fn = || (0u64, 0u32);
1617        let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1618        write_and_promote_entry(&EntryWrite {
1619            dir: &dir,
1620            bin_tmp: &bin_tmp,
1621            meta_tmp: &meta_tmp,
1622            payload: b"payload",
1623            bin_final: &bin_final,
1624            meta_final: &meta_final,
1625            stamp_fn: &stamp_fn,
1626            build_meta_json: &build_meta_json,
1627        })
1628        .expect("promote into a missing dir must recreate it and succeed");
1629        assert!(bin_final.exists() && meta_final.exists());
1630        assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1631    }
1632
1633    #[test]
1634    fn save_survives_concurrent_eviction_removing_key_dir() {
1635        // gam#868 end-to-end: hammer the same key with concurrent saves while a
1636        // sibling thread repeatedly runs `evict_overflow` (which `remove_dir`s
1637        // now-empty key dirs). Before the atomic-retry fix, a save whose
1638        // `create_dir_all`→write/rename window straddled a `remove_dir` failed
1639        // with `io: No such file or directory (os error 2)`. Every save must now
1640        // succeed; we assert no save returns an error.
1641        use std::sync::Arc;
1642        use std::sync::atomic::AtomicBool;
1643
1644        let dir = tempfile::tempdir().unwrap();
1645        // Zero size budget forces `evict_overflow` to delete entries (and then
1646        // sweep the emptied key dir) on essentially every sweep, maximizing the
1647        // race window.
1648        let store = Arc::new(
1649            WarmStartStore::open(
1650                dir.path().to_path_buf(),
1651                StoreOptions {
1652                    size_budget_bytes: 0,
1653                    ttl: Duration::from_secs(60),
1654                },
1655            )
1656            .unwrap(),
1657        );
1658        let key = key_for("concurrent-evict");
1659        let stop = Arc::new(AtomicBool::new(false));
1660
1661        let evictor = {
1662            let store = Arc::clone(&store);
1663            let stop = Arc::clone(&stop);
1664            std::thread::spawn(move || {
1665                while !stop.load(Ordering::Relaxed) {
1666                    store.evict_overflow().ok();
1667                }
1668            })
1669        };
1670
1671        let writers: Vec<_> = (0..4)
1672            .map(|w| {
1673                let store = Arc::clone(&store);
1674                std::thread::spawn(move || {
1675                    for i in 0..200u32 {
1676                        let payload = format!("w{w}-i{i}");
1677                        store
1678                            .save(
1679                                &key,
1680                                payload.as_bytes(),
1681                                Some(i as f64),
1682                                Some(i as u64),
1683                                EntryKind::Checkpoint,
1684                            )
1685                            .expect("save must not fail with ENOENT under concurrent eviction");
1686                    }
1687                })
1688            })
1689            .collect();
1690
1691        for h in writers {
1692            h.join().unwrap();
1693        }
1694        stop.store(true, Ordering::Relaxed);
1695        evictor.join().unwrap();
1696    }
1697
1698    #[test]
1699    fn keys_are_isolated() {
1700        let (_d, store) = temp_store();
1701        let a = key_for("a");
1702        let b = key_for("b");
1703        store
1704            .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1705            .unwrap();
1706        store
1707            .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1708            .unwrap();
1709        assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1710        assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1711    }
1712}