1use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33 #[error("io: {0}")]
34 Io(#[from] io::Error),
35 #[error("json: {0}")]
36 Json(#[from] serde_json::Error),
37}
38
39#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42 pub payload: Vec<u8>,
43 pub objective: Option<f64>,
44 pub iteration: Option<u64>,
45 pub written_unix_secs: u64,
46 pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51 Checkpoint,
53 Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59 schema_version: u32,
60 written_unix_secs: u64,
61 #[serde(default)]
65 written_nanos: u32,
66 objective: Option<f64>,
67 iteration: Option<u64>,
68 kind: EntryKind,
69 checksum_hex: String,
70 payload_bytes: u64,
71 #[serde(default)]
74 accessed: bool,
75}
76
77#[derive(Debug, Clone)]
78pub struct StoreOptions {
79 pub size_budget_bytes: u64,
80 pub ttl: Duration,
81}
82
83impl Default for StoreOptions {
84 fn default() -> Self {
85 Self {
86 size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
87 ttl: Duration::from_secs(DEFAULT_TTL_SECS),
88 }
89 }
90}
91
92#[derive(Debug)]
93pub struct WarmStartStore {
94 root: PathBuf,
95 opts: StoreOptions,
96 index: Arc<Mutex<MetadataIndex>>,
99 byte_total: Arc<AtomicU64>,
108 save_counter: Arc<AtomicU64>,
111 last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
126 test_time_offset_ns: AtomicU64,
135}
136
137impl Clone for WarmStartStore {
138 fn clone(&self) -> Self {
139 Self {
140 root: self.root.clone(),
141 opts: self.opts.clone(),
142 index: Arc::clone(&self.index),
143 byte_total: Arc::clone(&self.byte_total),
148 save_counter: Arc::clone(&self.save_counter),
149 last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
150 test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
151 }
152 }
153}
154
155impl WarmStartStore {
156 pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
158 fs::create_dir_all(&root)?;
159 Ok(Self {
160 root,
161 opts,
162 index: Arc::new(Mutex::new(MetadataIndex::default())),
163 byte_total: Arc::new(AtomicU64::new(0)),
164 save_counter: Arc::new(AtomicU64::new(0)),
165 last_evict_root_mtime: Arc::new(Mutex::new(None)),
166 test_time_offset_ns: AtomicU64::new(0),
167 })
168 }
169
170 pub fn root(&self) -> &Path {
171 &self.root
172 }
173
174 pub fn options(&self) -> &StoreOptions {
175 &self.opts
176 }
177
178 fn key_dir(&self, key: &Fingerprint) -> PathBuf {
179 self.root.join(key.to_hex())
180 }
181
182 pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
190 self.lookup_with(key, LookupMode::Best)
191 }
192
193 pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
201 self.lookup_with(key, LookupMode::Latest)
202 }
203
204 fn lookup_with(
205 &self,
206 key: &Fingerprint,
207 mode: LookupMode,
208 ) -> Result<Option<WarmStartEntry>, StoreError> {
209 let dir = self.key_dir(key);
210 if !dir.exists() {
211 lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
215 self.metadata_index_remove_key(key);
216 return Ok(None);
217 }
218 let cache_key = LookupCacheKey { fp: *key, mode };
227 let now_nanos = self.nanos_now();
228 if let Some(hit) = lookup_cache_get(&cache_key) {
229 if let Ok(md) = fs::metadata(&hit.meta_path)
230 && md.modified().ok() == Some(hit.meta_mtime)
231 {
232 let expired = self.opts.ttl.as_nanos() > 0
233 && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
234 if !expired {
235 let entry = self.touch_lookup_hit(&hit.meta_path, hit.entry)?;
236 return Ok(Some(entry));
237 }
238 lookup_cache_invalidate(&cache_key);
239 let bin = hit.meta_path.with_extension("bin");
240 fs::remove_file(&hit.meta_path).ok();
241 fs::remove_file(&bin).ok();
242 self.metadata_index_remove(&hit.meta_path);
244 return Ok(None);
245 }
246 lookup_cache_invalidate(&cache_key);
247 }
248 let mut best: Option<(OnDiskMeta, PathBuf)> = None;
254 for scanned in self.scan_key_dir(&dir, now_nanos) {
255 let take = match best {
256 None => true,
257 Some((ref cur, _)) => mode.better(&scanned.meta, cur),
258 };
259 if take {
260 best = Some((scanned.meta, scanned.meta_path));
261 }
262 }
263 let (meta, meta_path) = match best {
264 Some(b) => b,
265 None => {
266 lookup_cache_invalidate(&cache_key);
267 return Ok(None);
268 }
269 };
270 let bin_path = meta_path.with_extension("bin");
271 let payload = match fs::read(&bin_path) {
272 Ok(v) => v,
273 Err(_) => return Ok(None),
274 };
275 if checksum_hex(&payload) != meta.checksum_hex {
277 fs::remove_file(&meta_path).ok();
278 fs::remove_file(&bin_path).ok();
279 lookup_cache_invalidate(&cache_key);
280 self.metadata_index_remove(&meta_path);
281 return Ok(None);
282 }
283 let entry = WarmStartEntry {
284 payload,
285 objective: meta.objective,
286 iteration: meta.iteration,
287 written_unix_secs: meta.written_unix_secs,
288 kind: meta.kind,
289 };
290 let (meta, entry) = self.touch_lookup_meta(&meta_path, meta, entry)?;
291 if let Ok(md) = fs::metadata(&meta_path)
296 && let Ok(mtime) = md.modified()
297 {
298 let write_nanos =
299 (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
300 lookup_cache_insert(
301 cache_key,
302 CachedLookup {
303 meta_path: meta_path.clone(),
304 meta_mtime: mtime,
305 write_nanos,
306 entry: entry.clone(),
307 },
308 );
309 }
310 Ok(Some(entry))
311 }
312
313 pub fn save(
316 &self,
317 key: &Fingerprint,
318 payload: &[u8],
319 objective: Option<f64>,
320 iteration: Option<u64>,
321 kind: EntryKind,
322 ) -> Result<String, StoreError> {
323 let run_id = self.fresh_run_id();
324 self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
325 Ok(run_id)
326 }
327
328 pub fn save_overwrite(
331 &self,
332 key: &Fingerprint,
333 run_id: &str,
334 payload: &[u8],
335 objective: Option<f64>,
336 iteration: Option<u64>,
337 kind: EntryKind,
338 ) -> Result<(), StoreError> {
339 lookup_cache_invalidate(&LookupCacheKey {
346 fp: *key,
347 mode: LookupMode::Best,
348 });
349 lookup_cache_invalidate(&LookupCacheKey {
350 fp: *key,
351 mode: LookupMode::Latest,
352 });
353 let dir = self.key_dir(key);
354 let pid = std::process::id();
355 let checksum = checksum_hex(payload);
357 let objective_finite = objective.filter(|o| o.is_finite());
358 let nonce = self.nanos_now();
389 let bin_final = dir.join(format!("{run_id}.bin"));
390 let meta_final = dir.join(format!("{run_id}.json"));
391 let mut attempt = 0u8;
392 let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
393 let meta = OnDiskMeta {
394 schema_version: SCHEMA_VERSION,
395 written_unix_secs: secs,
396 written_nanos: subsec_nanos,
397 objective: objective_finite,
398 iteration,
399 kind,
400 checksum_hex: checksum.clone(),
401 payload_bytes: payload.len() as u64,
402 accessed: false,
403 };
404 Ok(serde_json::to_vec_pretty(&meta)?)
405 };
406 loop {
407 let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
408 let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
409 let stamp_fn = || self.unix_now_parts();
410 let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
411 build_meta_json(secs, subsec_nanos)
412 .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
413 };
414 match write_and_promote_entry(&EntryWrite {
415 dir: &dir,
416 bin_tmp: &bin_tmp,
417 meta_tmp: &meta_tmp,
418 payload,
419 bin_final: &bin_final,
420 meta_final: &meta_final,
421 stamp_fn: &stamp_fn,
422 build_meta_json: &build_meta_for_io,
423 }) {
424 Ok(()) => break,
425 Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
426 fs::remove_file(&bin_tmp).ok();
430 fs::remove_file(&meta_tmp).ok();
431 attempt += 1;
432 continue;
433 }
434 Err(e) => {
435 fs::remove_file(&bin_tmp).ok();
436 fs::remove_file(&meta_tmp).ok();
437 fs::remove_file(&bin_final).ok();
438 return Err(StoreError::Io(e));
439 }
440 }
441 }
442 if let Ok(d) = fs::File::open(&dir) {
449 d.sync_all().ok();
450 }
451 self.metadata_index_upsert(&meta_final, &bin_final).ok();
452 let approx_added = payload.len() as u64 + APPROX_META_BYTES;
462 self.byte_total.fetch_add(approx_added, Ordering::Relaxed);
463 let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
464 if n == 0 || n.is_multiple_of(EVICT_EVERY_N_SAVES) {
465 self.evict_overflow().ok();
466 }
467 Ok(())
468 }
469
470 pub fn evict_overflow(&self) -> Result<(), StoreError> {
480 let current_root_mtime = fs::metadata(&self.root)
493 .ok()
494 .and_then(|m| m.modified().ok());
495 if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
496 && let Some(now_mtime) = current_root_mtime
497 && let Ok(last) = self.last_evict_root_mtime.lock()
498 && *last == Some(now_mtime)
499 {
500 return Ok(());
501 }
502 let read_dir = match fs::read_dir(&self.root) {
503 Ok(rd) => rd,
504 Err(_) => return Ok(()),
505 };
506 let mut all: Vec<(PathBuf, PathBuf, u64, u128, bool)> = Vec::new();
508 let now_nanos = self.nanos_now();
509 for key_dir_entry in read_dir {
510 let key_dir = match key_dir_entry {
511 Ok(e) => e.path(),
512 Err(_) => continue,
513 };
514 if !key_dir.is_dir() {
515 continue;
516 }
517 let scanned = self.scan_key_dir(&key_dir, now_nanos);
523 for entry in &scanned {
524 let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
525 + entry.meta.written_nanos as u128;
526 let total_bytes = entry.meta_len + entry.bin_len;
527 all.push((
528 entry.meta_path.clone(),
529 entry.bin_path.clone(),
530 total_bytes,
531 write_nanos,
532 entry.meta.accessed,
533 ));
534 }
535 if scanned.is_empty()
537 && fs::read_dir(&key_dir)
538 .map(|mut it| it.next().is_none())
539 .unwrap_or(false)
540 {
541 fs::remove_dir(&key_dir).ok();
542 if let Ok(mut index) = self.index.lock() {
543 index.by_key_dir.remove(&key_dir);
544 }
545 }
546 }
547 let total: u64 = all.iter().map(|e| e.2).sum();
548 if total <= self.opts.size_budget_bytes {
549 self.byte_total.store(total, Ordering::Relaxed);
556 if let (Ok(mut last), Some(m)) = (
563 self.last_evict_root_mtime.lock(),
564 fs::metadata(&self.root)
565 .ok()
566 .and_then(|m| m.modified().ok()),
567 ) {
568 *last = Some(m);
569 }
570 return Ok(());
571 }
572 all.sort_by(|a, b| {
573 a.4.cmp(&b.4)
574 .then_with(|| a.3.cmp(&b.3))
575 .then_with(|| a.0.cmp(&b.0))
576 });
577 let mut remaining = total;
578 for (meta, bin, bytes, _, _) in all.into_iter() {
579 if remaining <= self.opts.size_budget_bytes {
580 break;
581 }
582 fs::remove_file(&meta).ok();
583 fs::remove_file(&bin).ok();
584 self.metadata_index_remove(&meta);
585 remaining = remaining.saturating_sub(bytes);
586 }
587 self.byte_total.store(remaining, Ordering::Relaxed);
590 Ok(())
591 }
592}
593
594struct EntryWrite<'a> {
606 dir: &'a Path,
607 bin_tmp: &'a Path,
608 meta_tmp: &'a Path,
609 payload: &'a [u8],
610 bin_final: &'a Path,
611 meta_final: &'a Path,
612 stamp_fn: &'a dyn Fn() -> (u64, u32),
619 build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
622}
623
624fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
625 fs::create_dir_all(w.dir)?;
629 {
630 let mut f = fs::File::create(w.bin_tmp)?;
631 f.write_all(w.payload)?;
632 f.sync_all().ok();
633 }
634 fs::rename(w.bin_tmp, w.bin_final)?;
638 let (secs, subsec_nanos) = (w.stamp_fn)();
645 let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
646 {
647 let mut f = fs::File::create(w.meta_tmp)?;
648 f.write_all(&meta_json)?;
649 f.sync_all().ok();
650 }
651 if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
652 fs::remove_file(w.bin_final).ok();
655 return Err(e);
656 }
657 Ok(())
658}
659
660const APPROX_META_BYTES: u64 = 512;
664
665#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
667enum LookupMode {
668 Best,
670 Latest,
672}
673
674impl LookupMode {
675 fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
676 match self {
677 LookupMode::Best => entry_better(candidate, current),
678 LookupMode::Latest => entry_newer(candidate, current),
679 }
680 }
681}
682
683#[derive(Clone, Copy, PartialEq, Eq, Hash)]
684struct LookupCacheKey {
685 fp: Fingerprint,
686 mode: LookupMode,
687}
688
689#[derive(Clone)]
690struct CachedLookup {
691 meta_path: PathBuf,
692 meta_mtime: SystemTime,
693 write_nanos: u128,
697 entry: WarmStartEntry,
698}
699
700#[derive(Debug, Default)]
701struct MetadataIndex {
702 by_meta_path: HashMap<PathBuf, IndexedMeta>,
703 by_key_dir: HashMap<PathBuf, ScannedDir>,
714}
715
716#[derive(Debug, Clone)]
717struct IndexedMeta {
718 meta_mtime: SystemTime,
719 meta_len: u64,
720 bin_len: u64,
721 meta: OnDiskMeta,
722}
723
724impl IndexedMeta {
725 fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
726 meta_md.modified().ok() == Some(self.meta_mtime)
727 && meta_md.len() == self.meta_len
728 && bin_md.len() == self.bin_len
729 }
730}
731
732#[derive(Debug, Clone)]
735struct ScannedDir {
736 dir_mtime: SystemTime,
737 entries: Vec<ScannedEntry>,
738}
739
740#[derive(Debug, Clone)]
744struct ScannedEntry {
745 meta_path: PathBuf,
746 bin_path: PathBuf,
747 meta_len: u64,
748 bin_len: u64,
749 meta_mtime: Option<SystemTime>,
750 bin_mtime: Option<SystemTime>,
751 meta: OnDiskMeta,
752}
753
754impl ScannedEntry {
755 fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
756 meta_md.len() == self.meta_len
757 && bin_md.len() == self.bin_len
758 && meta_md.modified().ok() == self.meta_mtime
759 && bin_md.modified().ok() == self.bin_mtime
760 }
761}
762
763const fn meta_expired(secs: u64, nanos: u32, ttl: Duration, now_nanos: u128) -> bool {
768 let ttl_nanos = ttl.as_nanos();
769 if ttl_nanos == 0 {
770 return false;
771 }
772 let write_nanos = (secs as u128) * 1_000_000_000u128 + nanos as u128;
773 now_nanos.saturating_sub(write_nanos) >= ttl_nanos
774}
775
776fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
783 static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
784 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
785}
786
787const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
788const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
789
790const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
791 std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
792}
793
794fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
795 let guard = lookup_cache().lock().ok()?;
796 guard.get(key).cloned()
797}
798
799fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
800 if let Ok(mut guard) = lookup_cache().lock() {
801 let new_bytes = cached_lookup_resident_bytes(&val);
802 if new_bytes > LOOKUP_CACHE_MAX_BYTES {
803 return;
804 }
805 let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
806 if let Some(old) = guard.remove(&key) {
807 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
808 }
809 while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
810 || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
811 {
812 let oldest = guard
813 .iter()
814 .min_by_key(|(_, cached)| cached.write_nanos)
815 .map(|(old_key, _)| *old_key);
816 let Some(oldest) = oldest else {
817 break;
818 };
819 if let Some(old) = guard.remove(&oldest) {
820 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
821 }
822 }
823 guard.insert(key, val);
824 }
825}
826
827fn lookup_cache_invalidate(key: &LookupCacheKey) {
828 if let Ok(mut guard) = lookup_cache().lock() {
829 guard.remove(key);
830 }
831}
832
833const EVICT_EVERY_N_SAVES: u64 = 32;
838
839fn parse_tmp_pid(name: &str) -> Option<u32> {
840 let tail = name.split(".tmp.").nth(1)?;
844 let pid_str = tail.split('.').next()?;
845 pid_str.parse::<u32>().ok()
846}
847
848fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
849 let bytes = fs::read(path)?;
850 let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
851 Ok(parsed)
852}
853
854fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
855 match (candidate.objective, current.objective) {
856 (Some(c), Some(d)) => {
857 if (c - d).abs() < 1e-12 {
858 match (candidate.kind, current.kind) {
859 (EntryKind::Final, EntryKind::Checkpoint) => true,
860 (EntryKind::Checkpoint, EntryKind::Final) => false,
861 _ => entry_newer(candidate, current),
862 }
863 } else {
864 c < d
865 }
866 }
867 (Some(_), None) => true,
868 (None, Some(_)) => false,
869 (None, None) => entry_newer(candidate, current),
870 }
871}
872
873fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
874 let candidate_stamp = (
875 candidate.written_unix_secs,
876 candidate.written_nanos,
877 candidate_kind_rank(candidate.kind),
878 );
879 let current_stamp = (
880 current.written_unix_secs,
881 current.written_nanos,
882 candidate_kind_rank(current.kind),
883 );
884 candidate_stamp > current_stamp
885}
886
887const fn candidate_kind_rank(kind: EntryKind) -> u8 {
888 match kind {
889 EntryKind::Checkpoint => 0,
890 EntryKind::Final => 1,
891 }
892}
893
894fn checksum_hex(payload: &[u8]) -> String {
895 let mut h = Sha256::new();
896 h.update(payload);
897 let out = h.finalize();
898 let mut s = String::with_capacity(out.len() * 2);
899 for b in out.iter() {
900 use std::fmt::Write;
901 write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
902 }
903 s
904}
905
906impl WarmStartStore {
907 fn touch_lookup_hit(
908 &self,
909 meta_path: &Path,
910 entry: WarmStartEntry,
911 ) -> Result<WarmStartEntry, StoreError> {
912 let meta = read_meta(meta_path)?;
913 let (_meta, entry) = self.touch_lookup_meta(meta_path, meta, entry)?;
914 Ok(entry)
915 }
916
917 fn touch_lookup_meta(
918 &self,
919 meta_path: &Path,
920 mut meta: OnDiskMeta,
921 mut entry: WarmStartEntry,
922 ) -> Result<(OnDiskMeta, WarmStartEntry), StoreError> {
923 let now = self.nanos_now();
924 let old = (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
925 let touched = now.max(old.saturating_add(1));
926 let secs = (touched / 1_000_000_000u128) as u64;
927 let nanos = (touched % 1_000_000_000u128) as u32;
928 meta.written_unix_secs = secs;
929 meta.written_nanos = nanos;
930 meta.accessed = true;
931 let json = serde_json::to_vec_pretty(&meta)?;
932 let tmp = meta_path.with_extension(format!(
933 "json.touch.tmp.{}.{}",
934 std::process::id(),
935 self.nanos_now()
936 ));
937 {
938 let mut f = fs::File::create(&tmp)?;
939 f.write_all(&json)?;
940 f.sync_all()?;
941 }
942 fs::rename(&tmp, meta_path)?;
943 if let Some(dir) = meta_path.parent()
944 && let Ok(d) = fs::File::open(dir)
945 {
946 d.sync_all().ok();
947 }
948 self.metadata_index_remove(meta_path);
949 entry.written_unix_secs = secs;
950 Ok((meta, entry))
951 }
952
953 fn read_meta_indexed(
954 &self,
955 path: &Path,
956 meta_md: &fs::Metadata,
957 bin_md: &fs::Metadata,
958 ) -> Result<OnDiskMeta, StoreError> {
959 if let Ok(index) = self.index.lock()
960 && let Some(cached) = index.by_meta_path.get(path)
961 && cached.matches(meta_md, bin_md)
962 {
963 return Ok(cached.meta.clone());
964 }
965
966 let meta = read_meta(path)?;
967 let Some(meta_mtime) = meta_md.modified().ok() else {
968 return Ok(meta);
969 };
970 if let Ok(mut index) = self.index.lock() {
971 index.by_meta_path.insert(
972 path.to_path_buf(),
973 IndexedMeta {
974 meta_mtime,
975 meta_len: meta_md.len(),
976 bin_len: bin_md.len(),
977 meta: meta.clone(),
978 },
979 );
980 }
981 Ok(meta)
982 }
983
984 fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
985 let meta_md = fs::metadata(meta_path)?;
986 let bin_md = fs::metadata(bin_path)?;
987 self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
988 if let Some(parent) = meta_path.parent()
991 && let Ok(mut index) = self.index.lock()
992 {
993 index.by_key_dir.remove(parent);
994 }
995 Ok(())
996 }
997
998 fn metadata_index_remove(&self, meta_path: &Path) {
999 if let Ok(mut index) = self.index.lock() {
1000 index.by_meta_path.remove(meta_path);
1001 if let Some(parent) = meta_path.parent() {
1002 index.by_key_dir.remove(parent);
1003 }
1004 }
1005 }
1006
1007 fn metadata_index_remove_key(&self, key: &Fingerprint) {
1008 let dir = self.key_dir(key);
1009 if let Ok(mut index) = self.index.lock() {
1010 index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
1011 index.by_key_dir.remove(&dir);
1012 }
1013 }
1014
1015 fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
1026 let dir_mtime = dir_md.modified().ok()?;
1027 let index = self.index.lock().ok()?;
1028 let cached = index.by_key_dir.get(dir)?;
1029 if cached.dir_mtime != dir_mtime {
1030 return None;
1031 }
1032 for entry in &cached.entries {
1033 let meta_md = fs::metadata(&entry.meta_path).ok()?;
1034 let bin_md = fs::metadata(&entry.bin_path).ok()?;
1035 if !entry.matches_files(&meta_md, &bin_md) {
1036 return None;
1037 }
1038 }
1039 Some(cached.entries.clone())
1040 }
1041
1042 fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
1043 if let Ok(mut index) = self.index.lock() {
1044 index.by_key_dir.insert(
1045 dir.to_path_buf(),
1046 ScannedDir {
1047 dir_mtime,
1048 entries: entries.to_vec(),
1049 },
1050 );
1051 }
1052 }
1053
1054 fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1068 let dir_md = match fs::metadata(dir) {
1069 Ok(m) => m,
1070 Err(_) => return Vec::new(),
1071 };
1072 if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1073 let any_expired = cached.iter().any(|e| {
1080 meta_expired(
1081 e.meta.written_unix_secs,
1082 e.meta.written_nanos,
1083 self.opts.ttl,
1084 now_nanos,
1085 )
1086 });
1087 if !any_expired {
1088 return cached;
1089 }
1090 let mut survivors = Vec::with_capacity(cached.len());
1091 for entry in cached {
1092 if meta_expired(
1093 entry.meta.written_unix_secs,
1094 entry.meta.written_nanos,
1095 self.opts.ttl,
1096 now_nanos,
1097 ) {
1098 fs::remove_file(&entry.meta_path).ok();
1099 fs::remove_file(&entry.bin_path).ok();
1100 self.metadata_index_remove(&entry.meta_path);
1101 } else {
1102 survivors.push(entry);
1103 }
1104 }
1105 if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1106 self.store_dir_scan(dir, mtime, &survivors);
1107 }
1108 return survivors;
1109 }
1110 let read_dir = match fs::read_dir(dir) {
1111 Ok(rd) => rd,
1112 Err(_) => return Vec::new(),
1113 };
1114 let mut entries = Vec::new();
1115 let mut mutated = false;
1116 for f in read_dir {
1117 let path = match f {
1118 Ok(e) => e.path(),
1119 Err(_) => continue,
1120 };
1121 let name = match path.file_name().and_then(|s| s.to_str()) {
1122 Some(s) => s,
1123 None => continue,
1124 };
1125 if name.contains(".tmp.") {
1126 if let Some(pid) = parse_tmp_pid(name)
1127 && pid != std::process::id()
1128 {
1129 fs::remove_file(&path).ok();
1130 mutated = true;
1131 }
1132 continue;
1133 }
1134 if path.extension().and_then(|s| s.to_str()) != Some("json") {
1135 continue;
1136 }
1137 let meta_md = match fs::metadata(&path) {
1138 Ok(m) => m,
1139 Err(_) => continue,
1140 };
1141 let bin = path.with_extension("bin");
1142 let bin_md = match fs::metadata(&bin) {
1143 Ok(m) => m,
1144 Err(_) => {
1145 fs::remove_file(&path).ok();
1146 self.metadata_index_remove(&path);
1147 mutated = true;
1148 continue;
1149 }
1150 };
1151 let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1152 Ok(m) => m,
1153 Err(_) => {
1154 fs::remove_file(&path).ok();
1155 fs::remove_file(&bin).ok();
1156 self.metadata_index_remove(&path);
1157 mutated = true;
1158 continue;
1159 }
1160 };
1161 if meta.schema_version != SCHEMA_VERSION {
1162 fs::remove_file(&path).ok();
1163 fs::remove_file(&bin).ok();
1164 self.metadata_index_remove(&path);
1165 mutated = true;
1166 continue;
1167 }
1168 if meta_expired(
1169 meta.written_unix_secs,
1170 meta.written_nanos,
1171 self.opts.ttl,
1172 now_nanos,
1173 ) {
1174 fs::remove_file(&path).ok();
1175 fs::remove_file(&bin).ok();
1176 self.metadata_index_remove(&path);
1177 mutated = true;
1178 continue;
1179 }
1180 entries.push(ScannedEntry {
1181 meta_path: path,
1182 bin_path: bin,
1183 meta_len: meta_md.len(),
1184 bin_len: bin_md.len(),
1185 meta_mtime: meta_md.modified().ok(),
1186 bin_mtime: bin_md.modified().ok(),
1187 meta,
1188 });
1189 }
1190 let final_mtime = if mutated {
1194 fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1195 } else {
1196 dir_md.modified().ok()
1197 };
1198 if let Some(mtime) = final_mtime {
1199 self.store_dir_scan(dir, mtime, &entries);
1200 }
1201 entries
1202 }
1203
1204 fn test_time_offset_ns(&self) -> u64 {
1205 self.test_time_offset_ns.load(Ordering::Relaxed)
1206 }
1207
1208 fn unix_now_parts(&self) -> (u64, u32) {
1209 let base = SystemTime::now()
1210 .duration_since(UNIX_EPOCH)
1211 .map(|d| d.as_nanos())
1212 .unwrap_or(0);
1213 let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1214 let secs = (total / 1_000_000_000u128) as u64;
1215 let nanos = (total % 1_000_000_000u128) as u32;
1216 (secs, nanos)
1217 }
1218
1219 fn nanos_now(&self) -> u128 {
1220 let base = SystemTime::now()
1221 .duration_since(UNIX_EPOCH)
1222 .map(|d| d.as_nanos())
1223 .unwrap_or(0);
1224 base.saturating_add(u128::from(self.test_time_offset_ns()))
1225 }
1226
1227 fn fresh_run_id(&self) -> String {
1228 let pid = std::process::id();
1229 let nanos = self.nanos_now();
1230 format!("r{pid:x}-{nanos:x}")
1231 }
1232}
1233
1234#[cfg(test)]
1235mod tests {
1236 use super::*;
1237 use crate::warm_start::key::Fingerprinter;
1238
1239 impl WarmStartStore {
1240 fn test_advance_time(&self, dur: Duration) {
1244 self.test_time_offset_ns
1245 .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1246 }
1247 }
1248
1249 fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1250 let dir = tempfile::tempdir().unwrap();
1251 let store = WarmStartStore::open(
1252 dir.path().to_path_buf(),
1253 StoreOptions {
1254 size_budget_bytes: 1024 * 1024,
1255 ttl: Duration::from_secs(60),
1256 },
1257 )
1258 .unwrap();
1259 (dir, store)
1260 }
1261
1262 fn key_for(s: &str) -> Fingerprint {
1263 let mut fp = Fingerprinter::new();
1264 fp.absorb_str(b"test", s);
1265 fp.finalize()
1266 }
1267
1268 #[test]
1269 fn roundtrip_save_then_lookup() {
1270 let (_d, store) = temp_store();
1271 let key = key_for("roundtrip");
1272 store
1273 .save(
1274 &key,
1275 b"hello-warm",
1276 Some(1.5),
1277 Some(7),
1278 EntryKind::Checkpoint,
1279 )
1280 .unwrap();
1281 let got = store.lookup(&key).unwrap().unwrap();
1282 assert_eq!(got.payload, b"hello-warm");
1283 assert_eq!(got.objective, Some(1.5));
1284 assert_eq!(got.iteration, Some(7));
1285 assert_eq!(got.kind, EntryKind::Checkpoint);
1286 }
1287
1288 #[test]
1289 fn lookup_picks_lowest_objective() {
1290 let (_d, store) = temp_store();
1291 let key = key_for("multi");
1292 store
1293 .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1294 .unwrap();
1295 store
1296 .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1297 .unwrap();
1298 store
1299 .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1300 .unwrap();
1301 let got = store.lookup(&key).unwrap().unwrap();
1302 assert_eq!(got.payload, b"better");
1303 assert_eq!(got.objective, Some(1.0));
1304 }
1305
1306 #[test]
1307 fn lookup_latest_ignores_objective_ordering() {
1308 let (_d, store) = temp_store();
1309 let key = key_for("latest-vs-best");
1310 store
1311 .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1312 .unwrap();
1313 store.test_advance_time(Duration::from_millis(2));
1314 store
1315 .save(
1316 &key,
1317 b"newer-higher-objective",
1318 Some(10.0),
1319 Some(2),
1320 EntryKind::Checkpoint,
1321 )
1322 .unwrap();
1323
1324 let best = store.lookup(&key).unwrap().unwrap();
1325 assert_eq!(best.payload, b"low-objective");
1326
1327 let latest = store.lookup_latest(&key).unwrap().unwrap();
1328 assert_eq!(latest.payload, b"newer-higher-objective");
1329 assert_eq!(latest.iteration, Some(2));
1330 }
1331
1332 #[test]
1333 fn tiebreak_final_beats_checkpoint() {
1334 let (_d, store) = temp_store();
1335 let key = key_for("tie");
1336 store
1337 .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1338 .unwrap();
1339 store
1341 .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1342 .unwrap();
1343 let got = store.lookup(&key).unwrap().unwrap();
1344 assert_eq!(got.payload, b"final");
1345 assert_eq!(got.kind, EntryKind::Final);
1346 }
1347
1348 #[test]
1349 fn tiebreak_latest_mtime_when_no_objective() {
1350 let (_d, store) = temp_store();
1351 let key = key_for("latest");
1352 store
1353 .save(&key, b"first", None, None, EntryKind::Checkpoint)
1354 .unwrap();
1355 store.test_advance_time(Duration::from_millis(1_100));
1356 store
1357 .save(&key, b"second", None, None, EntryKind::Checkpoint)
1358 .unwrap();
1359 let got = store.lookup(&key).unwrap().unwrap();
1360 assert_eq!(got.payload, b"second");
1361 }
1362
1363 #[test]
1364 fn corrupt_payload_is_cleaned_up() {
1365 let (_d, store) = temp_store();
1366 let key = key_for("corrupt");
1367 store
1368 .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1369 .unwrap();
1370 let dir = store.key_dir(&key);
1372 for entry in fs::read_dir(&dir).unwrap() {
1373 let p = entry.unwrap().path();
1374 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1375 fs::write(&p, b"tampered!").unwrap();
1376 }
1377 }
1378 let got = store.lookup(&key).unwrap();
1379 assert!(got.is_none(), "tampered entry must be rejected");
1380 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1382 assert!(remaining.is_empty(), "corrupt entry should be removed");
1383 }
1384
1385 #[test]
1386 fn corrupt_meta_json_is_cleaned_up() {
1387 let (_d, store) = temp_store();
1388 let key = key_for("badjson");
1389 store
1390 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1391 .unwrap();
1392 let dir = store.key_dir(&key);
1393 for entry in fs::read_dir(&dir).unwrap() {
1394 let p = entry.unwrap().path();
1395 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1396 fs::write(&p, b"{not valid json").unwrap();
1397 }
1398 }
1399 let got = store.lookup(&key).unwrap();
1400 assert!(got.is_none());
1401 }
1402
1403 #[test]
1404 fn schema_mismatched_entry_is_cleaned_up() {
1405 let (_d, store) = temp_store();
1406 let key = key_for("schema");
1407 store
1408 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1409 .unwrap();
1410 let dir = store.key_dir(&key);
1411 for entry in fs::read_dir(&dir).unwrap() {
1412 let p = entry.unwrap().path();
1413 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1414 let raw = fs::read(&p).unwrap();
1415 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1416 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1417 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1418 }
1419 }
1420 assert!(store.lookup(&key).unwrap().is_none());
1421 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1422 assert!(
1423 remaining.is_empty(),
1424 "schema-mismatched entry should be removed"
1425 );
1426 }
1427
1428 #[test]
1429 fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1430 let dir = tempfile::tempdir().unwrap();
1431 let store = WarmStartStore::open(
1432 dir.path().to_path_buf(),
1433 StoreOptions {
1434 size_budget_bytes: 6 * 1024,
1435 ttl: Duration::from_secs(3600),
1436 },
1437 )
1438 .unwrap();
1439 let stale_key = key_for("schema-size-stale");
1440 store
1441 .save(
1442 &stale_key,
1443 &vec![0u8; 4 * 1024],
1444 None,
1445 None,
1446 EntryKind::Checkpoint,
1447 )
1448 .unwrap();
1449
1450 let stale_dir = store.key_dir(&stale_key);
1451 let mut stale_meta = None;
1452 let mut stale_bin = None;
1453 for entry in fs::read_dir(&stale_dir).unwrap() {
1454 let p = entry.unwrap().path();
1455 match p.extension().and_then(|s| s.to_str()) {
1456 Some("json") => {
1457 let raw = fs::read(&p).unwrap();
1458 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1459 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1460 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1461 stale_meta = Some(p);
1462 }
1463 Some("bin") => stale_bin = Some(p),
1464 _ => {}
1465 }
1466 }
1467 let stale_meta = stale_meta.expect("saved entry should have metadata");
1468 let stale_bin = stale_bin.expect("saved entry should have payload");
1469
1470 let fresh_key = key_for("schema-size-fresh");
1471 store
1472 .save(
1473 &fresh_key,
1474 &vec![1u8; 2 * 1024],
1475 None,
1476 None,
1477 EntryKind::Checkpoint,
1478 )
1479 .unwrap();
1480
1481 assert!(
1482 !stale_meta.exists(),
1483 "schema-mismatched metadata should be removed during eviction scan"
1484 );
1485 assert!(
1486 !stale_bin.exists(),
1487 "schema-mismatched payload should be removed during eviction scan"
1488 );
1489
1490 let mut total = 0u64;
1491 for key_dir in fs::read_dir(store.root()).unwrap() {
1492 let key_dir = key_dir.unwrap().path();
1493 if key_dir.is_dir() {
1494 for entry in fs::read_dir(key_dir).unwrap() {
1495 total += fs::metadata(entry.unwrap().path()).unwrap().len();
1496 }
1497 }
1498 }
1499 assert!(
1500 total <= store.options().size_budget_bytes,
1501 "schema-mismatched bytes must not leak past size accounting (got {total})"
1502 );
1503 assert!(store.lookup(&stale_key).unwrap().is_none());
1504 assert!(store.lookup(&fresh_key).unwrap().is_some());
1505 }
1506
1507 #[test]
1508 fn missing_bin_treated_as_missing() {
1509 let (_d, store) = temp_store();
1510 let key = key_for("nobin");
1511 store
1512 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1513 .unwrap();
1514 let dir = store.key_dir(&key);
1515 for entry in fs::read_dir(&dir).unwrap() {
1516 let p = entry.unwrap().path();
1517 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1518 fs::remove_file(&p).unwrap();
1519 }
1520 }
1521 assert!(store.lookup(&key).unwrap().is_none());
1522 }
1523
1524 #[test]
1525 fn missing_key_returns_none() {
1526 let (_d, store) = temp_store();
1527 let key = key_for("absent");
1528 assert!(store.lookup(&key).unwrap().is_none());
1529 }
1530
1531 #[test]
1532 fn lru_eviction_under_size_budget() {
1533 let dir = tempfile::tempdir().unwrap();
1534 let store = WarmStartStore::open(
1536 dir.path().to_path_buf(),
1537 StoreOptions {
1538 size_budget_bytes: 4 * 1024,
1539 ttl: Duration::from_secs(3600),
1540 },
1541 )
1542 .unwrap();
1543 let mut keys = Vec::new();
1544 for i in 0..20 {
1545 let mut fp = Fingerprinter::new();
1546 fp.absorb_u64(b"i", i);
1547 let key = fp.finalize();
1548 keys.push(key);
1549 let payload = vec![0u8; 256];
1550 store
1551 .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1552 .unwrap();
1553 }
1554 let mut total = 0u64;
1556 for kd in fs::read_dir(store.root()).unwrap() {
1557 let kd = kd.unwrap().path();
1558 if kd.is_dir() {
1559 for f in fs::read_dir(&kd).unwrap() {
1560 total += fs::metadata(f.unwrap().path()).unwrap().len();
1561 }
1562 }
1563 }
1564 assert!(
1565 total <= 8 * 1024,
1566 "eviction failed to bound size (got {total})"
1567 );
1568 assert!(store.lookup(&keys[0]).unwrap().is_none());
1570 assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1571 }
1572
1573 #[test]
1574 fn ttl_drops_old_entries() {
1575 let dir = tempfile::tempdir().unwrap();
1583 let ttl = Duration::from_secs(60);
1584 let store = WarmStartStore::open(
1585 dir.path().to_path_buf(),
1586 StoreOptions {
1587 size_budget_bytes: 1024 * 1024,
1588 ttl,
1589 },
1590 )
1591 .unwrap();
1592 let key = key_for("ttl");
1593 store
1594 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1595 .unwrap();
1596 assert!(store.lookup(&key).unwrap().is_some());
1597 store.test_advance_time(ttl + Duration::from_secs(5));
1598 let other = key_for("ttl-other");
1600 store
1601 .save(&other, b"y", None, None, EntryKind::Checkpoint)
1602 .unwrap();
1603 assert!(store.lookup(&key).unwrap().is_none());
1605 assert!(store.lookup(&other).unwrap().is_some());
1606 }
1607
1608 #[test]
1609 fn orphan_temp_files_from_dead_processes_are_swept() {
1610 let (_d, store) = temp_store();
1611 let key = key_for("tmp");
1612 let dir = store.key_dir(&key);
1613 fs::create_dir_all(&dir).unwrap();
1614 let orphan_other = dir.join("r0-0.json.tmp.1.0");
1616 let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1617 fs::write(&orphan_other, b"orphan").unwrap();
1618 fs::write(&mine, b"mine").unwrap();
1619 store.evict_overflow().unwrap();
1620 assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1621 assert!(mine.exists(), "same-PID tmp file must be left alone");
1622 }
1623
1624 #[test]
1625 fn tmp_filenames_without_pid_are_skipped() {
1626 let (_d, store) = temp_store();
1628 let key = key_for("malformed");
1629 let dir = store.key_dir(&key);
1630 fs::create_dir_all(&dir).unwrap();
1631 let weird = dir.join("garbage.tmp.notapid.suffix");
1632 fs::write(&weird, b"x").unwrap();
1633 store.evict_overflow().unwrap();
1635 assert!(weird.exists());
1636 }
1637
1638 #[test]
1639 fn save_overwrite_keeps_single_entry() {
1640 let (_d, store) = temp_store();
1641 let key = key_for("overwrite");
1642 let id = store
1643 .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1644 .unwrap();
1645 store
1646 .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1647 .unwrap();
1648 let dir = store.key_dir(&key);
1650 let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1651 assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1652 let got = store.lookup(&key).unwrap().unwrap();
1653 assert_eq!(got.payload, b"v2");
1654 assert_eq!(got.objective, Some(1.0));
1655 }
1656
1657 #[test]
1658 fn write_and_promote_recreates_dir_removed_before_write() {
1659 let (_d, store) = temp_store();
1663 let key = key_for("race-recreate");
1664 let dir = store.key_dir(&key);
1665 assert!(!dir.exists());
1668 let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1669 let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1670 let bin_final = dir.join("r0.bin");
1671 let meta_final = dir.join("r0.json");
1672 let stamp_fn = || (0u64, 0u32);
1673 let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1674 write_and_promote_entry(&EntryWrite {
1675 dir: &dir,
1676 bin_tmp: &bin_tmp,
1677 meta_tmp: &meta_tmp,
1678 payload: b"payload",
1679 bin_final: &bin_final,
1680 meta_final: &meta_final,
1681 stamp_fn: &stamp_fn,
1682 build_meta_json: &build_meta_json,
1683 })
1684 .expect("promote into a missing dir must recreate it and succeed");
1685 assert!(bin_final.exists() && meta_final.exists());
1686 assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1687 }
1688
1689 #[test]
1690 fn save_survives_concurrent_eviction_removing_key_dir() {
1691 use std::sync::Arc;
1698 use std::sync::atomic::AtomicBool;
1699
1700 let dir = tempfile::tempdir().unwrap();
1701 let store = Arc::new(
1705 WarmStartStore::open(
1706 dir.path().to_path_buf(),
1707 StoreOptions {
1708 size_budget_bytes: 0,
1709 ttl: Duration::from_secs(60),
1710 },
1711 )
1712 .unwrap(),
1713 );
1714 let key = key_for("concurrent-evict");
1715 let stop = Arc::new(AtomicBool::new(false));
1716
1717 let evictor = {
1718 let store = Arc::clone(&store);
1719 let stop = Arc::clone(&stop);
1720 std::thread::spawn(move || {
1721 while !stop.load(Ordering::Relaxed) {
1722 store.evict_overflow().ok();
1723 }
1724 })
1725 };
1726
1727 let writers: Vec<_> = (0..4)
1728 .map(|w| {
1729 let store = Arc::clone(&store);
1730 std::thread::spawn(move || {
1731 for i in 0..200u32 {
1732 let payload = format!("w{w}-i{i}");
1733 store
1734 .save(
1735 &key,
1736 payload.as_bytes(),
1737 Some(i as f64),
1738 Some(i as u64),
1739 EntryKind::Checkpoint,
1740 )
1741 .expect("save must not fail with ENOENT under concurrent eviction");
1742 }
1743 })
1744 })
1745 .collect();
1746
1747 for h in writers {
1748 h.join().unwrap();
1749 }
1750 stop.store(true, Ordering::Relaxed);
1751 evictor.join().unwrap();
1752 }
1753
1754 #[test]
1755 fn keys_are_isolated() {
1756 let (_d, store) = temp_store();
1757 let a = key_for("a");
1758 let b = key_for("b");
1759 store
1760 .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1761 .unwrap();
1762 store
1763 .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1764 .unwrap();
1765 assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1766 assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1767 }
1768}