1use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33 #[error("io: {0}")]
34 Io(#[from] io::Error),
35 #[error("json: {0}")]
36 Json(#[from] serde_json::Error),
37}
38
39#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42 pub payload: Vec<u8>,
43 pub objective: Option<f64>,
44 pub iteration: Option<u64>,
45 pub written_unix_secs: u64,
46 pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51 Checkpoint,
53 Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59 schema_version: u32,
60 written_unix_secs: u64,
61 #[serde(default)]
65 written_nanos: u32,
66 objective: Option<f64>,
67 iteration: Option<u64>,
68 kind: EntryKind,
69 checksum_hex: String,
70 payload_bytes: u64,
71 #[serde(default)]
74 accessed: bool,
75 #[serde(default)]
86 accessed_unix_secs: u64,
87 #[serde(default)]
88 accessed_nanos: u32,
89}
90
91fn meta_activity_nanos(meta: &OnDiskMeta) -> u128 {
96 let written =
97 (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
98 let accessed =
99 (meta.accessed_unix_secs as u128) * 1_000_000_000u128 + meta.accessed_nanos as u128;
100 written.max(accessed)
101}
102
103#[derive(Debug, Clone)]
104pub struct StoreOptions {
105 pub size_budget_bytes: u64,
106 pub ttl: Duration,
107}
108
109impl Default for StoreOptions {
110 fn default() -> Self {
111 Self {
112 size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
113 ttl: Duration::from_secs(DEFAULT_TTL_SECS),
114 }
115 }
116}
117
118#[derive(Debug)]
119pub struct WarmStartStore {
120 root: PathBuf,
121 opts: StoreOptions,
122 index: Arc<Mutex<MetadataIndex>>,
125 byte_total: Arc<AtomicU64>,
134 save_counter: Arc<AtomicU64>,
137 last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
152 test_time_offset_ns: AtomicU64,
161}
162
163impl Clone for WarmStartStore {
164 fn clone(&self) -> Self {
165 Self {
166 root: self.root.clone(),
167 opts: self.opts.clone(),
168 index: Arc::clone(&self.index),
169 byte_total: Arc::clone(&self.byte_total),
174 save_counter: Arc::clone(&self.save_counter),
175 last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
176 test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
177 }
178 }
179}
180
181impl WarmStartStore {
182 pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
184 fs::create_dir_all(&root)?;
185 Ok(Self {
186 root,
187 opts,
188 index: Arc::new(Mutex::new(MetadataIndex::default())),
189 byte_total: Arc::new(AtomicU64::new(0)),
190 save_counter: Arc::new(AtomicU64::new(0)),
191 last_evict_root_mtime: Arc::new(Mutex::new(None)),
192 test_time_offset_ns: AtomicU64::new(0),
193 })
194 }
195
196 pub fn root(&self) -> &Path {
197 &self.root
198 }
199
200 pub fn options(&self) -> &StoreOptions {
201 &self.opts
202 }
203
204 fn key_dir(&self, key: &Fingerprint) -> PathBuf {
205 self.root.join(key.to_hex())
206 }
207
208 pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
216 self.lookup_with(key, LookupMode::Best)
217 }
218
219 pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
227 self.lookup_with(key, LookupMode::Latest)
228 }
229
230 fn lookup_with(
231 &self,
232 key: &Fingerprint,
233 mode: LookupMode,
234 ) -> Result<Option<WarmStartEntry>, StoreError> {
235 let dir = self.key_dir(key);
236 if !dir.exists() {
237 lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
241 self.metadata_index_remove_key(key);
242 return Ok(None);
243 }
244 let cache_key = LookupCacheKey { fp: *key, mode };
253 let now_nanos = self.nanos_now();
254 if let Some(hit) = lookup_cache_get(&cache_key) {
255 if let Ok(md) = fs::metadata(&hit.meta_path)
256 && md.modified().ok() == Some(hit.meta_mtime)
257 {
258 let expired = self.opts.ttl.as_nanos() > 0
259 && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
260 if !expired {
261 let entry = self.touch_lookup_hit(&hit.meta_path, hit.entry)?;
262 return Ok(Some(entry));
263 }
264 lookup_cache_invalidate(&cache_key);
265 let bin = hit.meta_path.with_extension("bin");
266 fs::remove_file(&hit.meta_path).ok();
267 fs::remove_file(&bin).ok();
268 self.metadata_index_remove(&hit.meta_path);
270 return Ok(None);
271 }
272 lookup_cache_invalidate(&cache_key);
273 }
274 let mut best: Option<(OnDiskMeta, PathBuf)> = None;
280 for scanned in self.scan_key_dir(&dir, now_nanos) {
281 let take = match best {
282 None => true,
283 Some((ref cur, _)) => mode.better(&scanned.meta, cur),
284 };
285 if take {
286 best = Some((scanned.meta, scanned.meta_path));
287 }
288 }
289 let (meta, meta_path) = match best {
290 Some(b) => b,
291 None => {
292 lookup_cache_invalidate(&cache_key);
293 return Ok(None);
294 }
295 };
296 let bin_path = meta_path.with_extension("bin");
297 let payload = match fs::read(&bin_path) {
298 Ok(v) => v,
299 Err(_) => return Ok(None),
300 };
301 if checksum_hex(&payload) != meta.checksum_hex {
303 fs::remove_file(&meta_path).ok();
304 fs::remove_file(&bin_path).ok();
305 lookup_cache_invalidate(&cache_key);
306 self.metadata_index_remove(&meta_path);
307 return Ok(None);
308 }
309 let entry = WarmStartEntry {
310 payload,
311 objective: meta.objective,
312 iteration: meta.iteration,
313 written_unix_secs: meta.written_unix_secs,
314 kind: meta.kind,
315 };
316 let (meta, entry) = self.touch_lookup_meta(&meta_path, meta, entry)?;
317 if let Ok(md) = fs::metadata(&meta_path)
323 && let Ok(mtime) = md.modified()
324 {
325 let write_nanos = meta_activity_nanos(&meta);
326 lookup_cache_insert(
327 cache_key,
328 CachedLookup {
329 meta_path: meta_path.clone(),
330 meta_mtime: mtime,
331 write_nanos,
332 entry: entry.clone(),
333 },
334 );
335 }
336 Ok(Some(entry))
337 }
338
339 pub fn save(
342 &self,
343 key: &Fingerprint,
344 payload: &[u8],
345 objective: Option<f64>,
346 iteration: Option<u64>,
347 kind: EntryKind,
348 ) -> Result<String, StoreError> {
349 let run_id = self.fresh_run_id();
350 self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
351 Ok(run_id)
352 }
353
354 pub fn save_overwrite(
357 &self,
358 key: &Fingerprint,
359 run_id: &str,
360 payload: &[u8],
361 objective: Option<f64>,
362 iteration: Option<u64>,
363 kind: EntryKind,
364 ) -> Result<(), StoreError> {
365 lookup_cache_invalidate(&LookupCacheKey {
372 fp: *key,
373 mode: LookupMode::Best,
374 });
375 lookup_cache_invalidate(&LookupCacheKey {
376 fp: *key,
377 mode: LookupMode::Latest,
378 });
379 let dir = self.key_dir(key);
380 let pid = std::process::id();
381 let checksum = checksum_hex(payload);
383 let objective_finite = objective.filter(|o| o.is_finite());
384 let nonce = self.nanos_now();
415 let bin_final = dir.join(format!("{run_id}.bin"));
416 let meta_final = dir.join(format!("{run_id}.json"));
417 let mut attempt = 0u8;
418 let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
419 let meta = OnDiskMeta {
420 schema_version: SCHEMA_VERSION,
421 written_unix_secs: secs,
422 written_nanos: subsec_nanos,
423 objective: objective_finite,
424 iteration,
425 kind,
426 checksum_hex: checksum.clone(),
427 payload_bytes: payload.len() as u64,
428 accessed: false,
429 accessed_unix_secs: 0,
430 accessed_nanos: 0,
431 };
432 Ok(serde_json::to_vec_pretty(&meta)?)
433 };
434 loop {
435 let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
436 let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
437 let stamp_fn = || self.unix_now_parts();
438 let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
439 build_meta_json(secs, subsec_nanos)
440 .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
441 };
442 match write_and_promote_entry(&EntryWrite {
443 dir: &dir,
444 bin_tmp: &bin_tmp,
445 meta_tmp: &meta_tmp,
446 payload,
447 bin_final: &bin_final,
448 meta_final: &meta_final,
449 stamp_fn: &stamp_fn,
450 build_meta_json: &build_meta_for_io,
451 }) {
452 Ok(()) => break,
453 Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
454 fs::remove_file(&bin_tmp).ok();
458 fs::remove_file(&meta_tmp).ok();
459 attempt += 1;
460 continue;
461 }
462 Err(e) => {
463 fs::remove_file(&bin_tmp).ok();
464 fs::remove_file(&meta_tmp).ok();
465 fs::remove_file(&bin_final).ok();
466 return Err(StoreError::Io(e));
467 }
468 }
469 }
470 if let Ok(d) = fs::File::open(&dir) {
477 d.sync_all().ok();
478 }
479 self.metadata_index_upsert(&meta_final, &bin_final).ok();
480 let approx_added = payload.len() as u64 + APPROX_META_BYTES;
501 let new_total = self.byte_total.fetch_add(approx_added, Ordering::Relaxed) + approx_added;
502 let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
503 if n == 0
504 || n.is_multiple_of(EVICT_EVERY_N_SAVES)
505 || new_total > self.opts.size_budget_bytes
506 {
507 self.evict_overflow().ok();
508 }
509 Ok(())
510 }
511
512 pub fn evict_overflow(&self) -> Result<(), StoreError> {
522 let current_root_mtime = fs::metadata(&self.root)
535 .ok()
536 .and_then(|m| m.modified().ok());
537 if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
538 && let Some(now_mtime) = current_root_mtime
539 && let Ok(last) = self.last_evict_root_mtime.lock()
540 && *last == Some(now_mtime)
541 {
542 return Ok(());
543 }
544 let read_dir = match fs::read_dir(&self.root) {
545 Ok(rd) => rd,
546 Err(_) => return Ok(()),
547 };
548 let mut all: Vec<(PathBuf, PathBuf, u64, u128, bool)> = Vec::new();
550 let now_nanos = self.nanos_now();
551 for key_dir_entry in read_dir {
552 let key_dir = match key_dir_entry {
553 Ok(e) => e.path(),
554 Err(_) => continue,
555 };
556 if !key_dir.is_dir() {
557 continue;
558 }
559 let scanned = self.scan_key_dir(&key_dir, now_nanos);
565 for entry in &scanned {
566 let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
567 + entry.meta.written_nanos as u128;
568 let total_bytes = entry.meta_len + entry.bin_len;
569 all.push((
570 entry.meta_path.clone(),
571 entry.bin_path.clone(),
572 total_bytes,
573 write_nanos,
574 entry.meta.accessed,
575 ));
576 }
577 if scanned.is_empty()
579 && fs::read_dir(&key_dir)
580 .map(|mut it| it.next().is_none())
581 .unwrap_or(false)
582 {
583 fs::remove_dir(&key_dir).ok();
584 if let Ok(mut index) = self.index.lock() {
585 index.by_key_dir.remove(&key_dir);
586 }
587 }
588 }
589 let total: u64 = all.iter().map(|e| e.2).sum();
590 if total <= self.opts.size_budget_bytes {
591 self.byte_total.store(total, Ordering::Relaxed);
598 if let (Ok(mut last), Some(m)) = (
605 self.last_evict_root_mtime.lock(),
606 fs::metadata(&self.root)
607 .ok()
608 .and_then(|m| m.modified().ok()),
609 ) {
610 *last = Some(m);
611 }
612 return Ok(());
613 }
614 all.sort_by(|a, b| {
615 a.4.cmp(&b.4)
616 .then_with(|| a.3.cmp(&b.3))
617 .then_with(|| a.0.cmp(&b.0))
618 });
619 let mut remaining = total;
620 for (meta, bin, bytes, _, _) in all.into_iter() {
621 if remaining <= self.opts.size_budget_bytes {
622 break;
623 }
624 fs::remove_file(&meta).ok();
625 fs::remove_file(&bin).ok();
626 self.metadata_index_remove(&meta);
627 remaining = remaining.saturating_sub(bytes);
628 }
629 self.byte_total.store(remaining, Ordering::Relaxed);
632 Ok(())
633 }
634}
635
636struct EntryWrite<'a> {
648 dir: &'a Path,
649 bin_tmp: &'a Path,
650 meta_tmp: &'a Path,
651 payload: &'a [u8],
652 bin_final: &'a Path,
653 meta_final: &'a Path,
654 stamp_fn: &'a dyn Fn() -> (u64, u32),
661 build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
664}
665
666fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
667 fs::create_dir_all(w.dir)?;
671 {
672 let mut f = fs::File::create(w.bin_tmp)?;
673 f.write_all(w.payload)?;
674 f.sync_all().ok();
675 }
676 fs::rename(w.bin_tmp, w.bin_final)?;
680 let (secs, subsec_nanos) = (w.stamp_fn)();
687 let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
688 {
689 let mut f = fs::File::create(w.meta_tmp)?;
690 f.write_all(&meta_json)?;
691 f.sync_all().ok();
692 }
693 if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
694 fs::remove_file(w.bin_final).ok();
697 return Err(e);
698 }
699 Ok(())
700}
701
702const APPROX_META_BYTES: u64 = 512;
706
707#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
709enum LookupMode {
710 Best,
712 Latest,
714}
715
716impl LookupMode {
717 fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
718 match self {
719 LookupMode::Best => entry_better(candidate, current),
720 LookupMode::Latest => entry_newer(candidate, current),
721 }
722 }
723}
724
725#[derive(Clone, Copy, PartialEq, Eq, Hash)]
726struct LookupCacheKey {
727 fp: Fingerprint,
728 mode: LookupMode,
729}
730
731#[derive(Clone)]
732struct CachedLookup {
733 meta_path: PathBuf,
734 meta_mtime: SystemTime,
735 write_nanos: u128,
739 entry: WarmStartEntry,
740}
741
742#[derive(Debug, Default)]
743struct MetadataIndex {
744 by_meta_path: HashMap<PathBuf, IndexedMeta>,
745 by_key_dir: HashMap<PathBuf, ScannedDir>,
756}
757
758#[derive(Debug, Clone)]
759struct IndexedMeta {
760 meta_mtime: SystemTime,
761 meta_len: u64,
762 bin_len: u64,
763 meta: OnDiskMeta,
764}
765
766impl IndexedMeta {
767 fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
768 meta_md.modified().ok() == Some(self.meta_mtime)
769 && meta_md.len() == self.meta_len
770 && bin_md.len() == self.bin_len
771 }
772}
773
774#[derive(Debug, Clone)]
777struct ScannedDir {
778 dir_mtime: SystemTime,
779 entries: Vec<ScannedEntry>,
780}
781
782#[derive(Debug, Clone)]
786struct ScannedEntry {
787 meta_path: PathBuf,
788 bin_path: PathBuf,
789 meta_len: u64,
790 bin_len: u64,
791 meta_mtime: Option<SystemTime>,
792 bin_mtime: Option<SystemTime>,
793 meta: OnDiskMeta,
794}
795
796impl ScannedEntry {
797 fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
798 meta_md.len() == self.meta_len
799 && bin_md.len() == self.bin_len
800 && meta_md.modified().ok() == self.meta_mtime
801 && bin_md.modified().ok() == self.bin_mtime
802 }
803}
804
805const fn meta_expired(activity_nanos: u128, ttl: Duration, now_nanos: u128) -> bool {
813 let ttl_nanos = ttl.as_nanos();
814 if ttl_nanos == 0 {
815 return false;
816 }
817 now_nanos.saturating_sub(activity_nanos) >= ttl_nanos
818}
819
820fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
827 static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
828 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
829}
830
831const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
832const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
833
834const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
835 std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
836}
837
838fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
839 let guard = lookup_cache().lock().ok()?;
840 guard.get(key).cloned()
841}
842
843fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
844 if let Ok(mut guard) = lookup_cache().lock() {
845 let new_bytes = cached_lookup_resident_bytes(&val);
846 if new_bytes > LOOKUP_CACHE_MAX_BYTES {
847 return;
848 }
849 let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
850 if let Some(old) = guard.remove(&key) {
851 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
852 }
853 while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
854 || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
855 {
856 let oldest = guard
857 .iter()
858 .min_by_key(|(_, cached)| cached.write_nanos)
859 .map(|(old_key, _)| *old_key);
860 let Some(oldest) = oldest else {
861 break;
862 };
863 if let Some(old) = guard.remove(&oldest) {
864 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
865 }
866 }
867 guard.insert(key, val);
868 }
869}
870
871fn lookup_cache_invalidate(key: &LookupCacheKey) {
872 if let Ok(mut guard) = lookup_cache().lock() {
873 guard.remove(key);
874 }
875}
876
877const EVICT_EVERY_N_SAVES: u64 = 32;
882
883fn parse_tmp_pid(name: &str) -> Option<u32> {
884 let tail = name.split(".tmp.").nth(1)?;
888 let pid_str = tail.split('.').next()?;
889 pid_str.parse::<u32>().ok()
890}
891
892fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
893 let bytes = fs::read(path)?;
894 let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
895 Ok(parsed)
896}
897
898fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
899 match (candidate.objective, current.objective) {
900 (Some(c), Some(d)) => {
901 if (c - d).abs() < 1e-12 {
902 match (candidate.kind, current.kind) {
903 (EntryKind::Final, EntryKind::Checkpoint) => true,
904 (EntryKind::Checkpoint, EntryKind::Final) => false,
905 _ => entry_newer(candidate, current),
906 }
907 } else {
908 c < d
909 }
910 }
911 (Some(_), None) => true,
912 (None, Some(_)) => false,
913 (None, None) => entry_newer(candidate, current),
914 }
915}
916
917fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
918 let candidate_stamp = (
919 candidate.written_unix_secs,
920 candidate.written_nanos,
921 candidate_kind_rank(candidate.kind),
922 );
923 let current_stamp = (
924 current.written_unix_secs,
925 current.written_nanos,
926 candidate_kind_rank(current.kind),
927 );
928 candidate_stamp > current_stamp
929}
930
931const fn candidate_kind_rank(kind: EntryKind) -> u8 {
932 match kind {
933 EntryKind::Checkpoint => 0,
934 EntryKind::Final => 1,
935 }
936}
937
938fn checksum_hex(payload: &[u8]) -> String {
939 let mut h = Sha256::new();
940 h.update(payload);
941 let out = h.finalize();
942 let mut s = String::with_capacity(out.len() * 2);
943 for b in out.iter() {
944 use std::fmt::Write;
945 write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
946 }
947 s
948}
949
950impl WarmStartStore {
951 fn touch_lookup_hit(
952 &self,
953 meta_path: &Path,
954 entry: WarmStartEntry,
955 ) -> Result<WarmStartEntry, StoreError> {
956 let meta = read_meta(meta_path)?;
957 let (_meta, entry) = self.touch_lookup_meta(meta_path, meta, entry)?;
958 Ok(entry)
959 }
960
961 fn touch_lookup_meta(
962 &self,
963 meta_path: &Path,
964 mut meta: OnDiskMeta,
965 entry: WarmStartEntry,
966 ) -> Result<(OnDiskMeta, WarmStartEntry), StoreError> {
967 let now = self.nanos_now();
968 let old_access =
974 (meta.accessed_unix_secs as u128) * 1_000_000_000u128 + meta.accessed_nanos as u128;
975 let touched = now.max(old_access.saturating_add(1));
976 meta.accessed_unix_secs = (touched / 1_000_000_000u128) as u64;
977 meta.accessed_nanos = (touched % 1_000_000_000u128) as u32;
978 meta.accessed = true;
979 let json = serde_json::to_vec_pretty(&meta)?;
980 let tmp = meta_path.with_extension(format!(
981 "json.touch.tmp.{}.{}",
982 std::process::id(),
983 self.nanos_now()
984 ));
985 {
986 let mut f = fs::File::create(&tmp)?;
987 f.write_all(&json)?;
988 f.sync_all()?;
989 }
990 fs::rename(&tmp, meta_path)?;
991 if let Some(dir) = meta_path.parent()
992 && let Ok(d) = fs::File::open(dir)
993 {
994 d.sync_all().ok();
995 }
996 self.metadata_index_remove(meta_path);
997 Ok((meta, entry))
1000 }
1001
1002 fn read_meta_indexed(
1003 &self,
1004 path: &Path,
1005 meta_md: &fs::Metadata,
1006 bin_md: &fs::Metadata,
1007 ) -> Result<OnDiskMeta, StoreError> {
1008 if let Ok(index) = self.index.lock()
1009 && let Some(cached) = index.by_meta_path.get(path)
1010 && cached.matches(meta_md, bin_md)
1011 {
1012 return Ok(cached.meta.clone());
1013 }
1014
1015 let meta = read_meta(path)?;
1016 let Some(meta_mtime) = meta_md.modified().ok() else {
1017 return Ok(meta);
1018 };
1019 if let Ok(mut index) = self.index.lock() {
1020 index.by_meta_path.insert(
1021 path.to_path_buf(),
1022 IndexedMeta {
1023 meta_mtime,
1024 meta_len: meta_md.len(),
1025 bin_len: bin_md.len(),
1026 meta: meta.clone(),
1027 },
1028 );
1029 }
1030 Ok(meta)
1031 }
1032
1033 fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
1034 let meta_md = fs::metadata(meta_path)?;
1035 let bin_md = fs::metadata(bin_path)?;
1036 self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
1037 if let Some(parent) = meta_path.parent()
1040 && let Ok(mut index) = self.index.lock()
1041 {
1042 index.by_key_dir.remove(parent);
1043 }
1044 Ok(())
1045 }
1046
1047 fn metadata_index_remove(&self, meta_path: &Path) {
1048 if let Ok(mut index) = self.index.lock() {
1049 index.by_meta_path.remove(meta_path);
1050 if let Some(parent) = meta_path.parent() {
1051 index.by_key_dir.remove(parent);
1052 }
1053 }
1054 }
1055
1056 fn metadata_index_remove_key(&self, key: &Fingerprint) {
1057 let dir = self.key_dir(key);
1058 if let Ok(mut index) = self.index.lock() {
1059 index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
1060 index.by_key_dir.remove(&dir);
1061 }
1062 }
1063
1064 fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
1075 let dir_mtime = dir_md.modified().ok()?;
1076 let index = self.index.lock().ok()?;
1077 let cached = index.by_key_dir.get(dir)?;
1078 if cached.dir_mtime != dir_mtime {
1079 return None;
1080 }
1081 for entry in &cached.entries {
1082 let meta_md = fs::metadata(&entry.meta_path).ok()?;
1083 let bin_md = fs::metadata(&entry.bin_path).ok()?;
1084 if !entry.matches_files(&meta_md, &bin_md) {
1085 return None;
1086 }
1087 }
1088 Some(cached.entries.clone())
1089 }
1090
1091 fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
1092 if let Ok(mut index) = self.index.lock() {
1093 index.by_key_dir.insert(
1094 dir.to_path_buf(),
1095 ScannedDir {
1096 dir_mtime,
1097 entries: entries.to_vec(),
1098 },
1099 );
1100 }
1101 }
1102
1103 fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1117 let dir_md = match fs::metadata(dir) {
1118 Ok(m) => m,
1119 Err(_) => return Vec::new(),
1120 };
1121 if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1122 let any_expired = cached
1129 .iter()
1130 .any(|e| meta_expired(meta_activity_nanos(&e.meta), self.opts.ttl, now_nanos));
1131 if !any_expired {
1132 return cached;
1133 }
1134 let mut survivors = Vec::with_capacity(cached.len());
1135 for entry in cached {
1136 if meta_expired(meta_activity_nanos(&entry.meta), self.opts.ttl, now_nanos) {
1137 fs::remove_file(&entry.meta_path).ok();
1138 fs::remove_file(&entry.bin_path).ok();
1139 self.metadata_index_remove(&entry.meta_path);
1140 } else {
1141 survivors.push(entry);
1142 }
1143 }
1144 if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1145 self.store_dir_scan(dir, mtime, &survivors);
1146 }
1147 return survivors;
1148 }
1149 let read_dir = match fs::read_dir(dir) {
1150 Ok(rd) => rd,
1151 Err(_) => return Vec::new(),
1152 };
1153 let mut entries = Vec::new();
1154 let mut mutated = false;
1155 for f in read_dir {
1156 let path = match f {
1157 Ok(e) => e.path(),
1158 Err(_) => continue,
1159 };
1160 let name = match path.file_name().and_then(|s| s.to_str()) {
1161 Some(s) => s,
1162 None => continue,
1163 };
1164 if name.contains(".tmp.") {
1165 if let Some(pid) = parse_tmp_pid(name)
1166 && pid != std::process::id()
1167 {
1168 fs::remove_file(&path).ok();
1169 mutated = true;
1170 }
1171 continue;
1172 }
1173 if path.extension().and_then(|s| s.to_str()) != Some("json") {
1174 continue;
1175 }
1176 let meta_md = match fs::metadata(&path) {
1177 Ok(m) => m,
1178 Err(_) => continue,
1179 };
1180 let bin = path.with_extension("bin");
1181 let bin_md = match fs::metadata(&bin) {
1182 Ok(m) => m,
1183 Err(_) => {
1184 fs::remove_file(&path).ok();
1185 self.metadata_index_remove(&path);
1186 mutated = true;
1187 continue;
1188 }
1189 };
1190 let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1191 Ok(m) => m,
1192 Err(_) => {
1193 fs::remove_file(&path).ok();
1194 fs::remove_file(&bin).ok();
1195 self.metadata_index_remove(&path);
1196 mutated = true;
1197 continue;
1198 }
1199 };
1200 if meta.schema_version != SCHEMA_VERSION {
1201 fs::remove_file(&path).ok();
1202 fs::remove_file(&bin).ok();
1203 self.metadata_index_remove(&path);
1204 mutated = true;
1205 continue;
1206 }
1207 if meta_expired(meta_activity_nanos(&meta), self.opts.ttl, now_nanos) {
1208 fs::remove_file(&path).ok();
1209 fs::remove_file(&bin).ok();
1210 self.metadata_index_remove(&path);
1211 mutated = true;
1212 continue;
1213 }
1214 entries.push(ScannedEntry {
1215 meta_path: path,
1216 bin_path: bin,
1217 meta_len: meta_md.len(),
1218 bin_len: bin_md.len(),
1219 meta_mtime: meta_md.modified().ok(),
1220 bin_mtime: bin_md.modified().ok(),
1221 meta,
1222 });
1223 }
1224 let final_mtime = if mutated {
1228 fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1229 } else {
1230 dir_md.modified().ok()
1231 };
1232 if let Some(mtime) = final_mtime {
1233 self.store_dir_scan(dir, mtime, &entries);
1234 }
1235 entries
1236 }
1237
1238 fn test_time_offset_ns(&self) -> u64 {
1239 self.test_time_offset_ns.load(Ordering::Relaxed)
1240 }
1241
1242 fn unix_now_parts(&self) -> (u64, u32) {
1243 let base = SystemTime::now()
1244 .duration_since(UNIX_EPOCH)
1245 .map(|d| d.as_nanos())
1246 .unwrap_or(0);
1247 let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1248 let secs = (total / 1_000_000_000u128) as u64;
1249 let nanos = (total % 1_000_000_000u128) as u32;
1250 (secs, nanos)
1251 }
1252
1253 fn nanos_now(&self) -> u128 {
1254 let base = SystemTime::now()
1255 .duration_since(UNIX_EPOCH)
1256 .map(|d| d.as_nanos())
1257 .unwrap_or(0);
1258 base.saturating_add(u128::from(self.test_time_offset_ns()))
1259 }
1260
1261 fn fresh_run_id(&self) -> String {
1262 let pid = std::process::id();
1263 let nanos = self.nanos_now();
1264 format!("r{pid:x}-{nanos:x}")
1265 }
1266}
1267
1268#[cfg(test)]
1269mod tests {
1270 use super::*;
1271 use crate::warm_start::key::Fingerprinter;
1272
1273 impl WarmStartStore {
1274 fn test_advance_time(&self, dur: Duration) {
1278 self.test_time_offset_ns
1279 .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1280 }
1281 }
1282
1283 fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1284 let dir = tempfile::tempdir().unwrap();
1285 let store = WarmStartStore::open(
1286 dir.path().to_path_buf(),
1287 StoreOptions {
1288 size_budget_bytes: 1024 * 1024,
1289 ttl: Duration::from_secs(60),
1290 },
1291 )
1292 .unwrap();
1293 (dir, store)
1294 }
1295
1296 fn key_for(s: &str) -> Fingerprint {
1297 let mut fp = Fingerprinter::new();
1298 fp.absorb_str(b"test", s);
1299 fp.finalize()
1300 }
1301
1302 #[test]
1303 fn roundtrip_save_then_lookup() {
1304 let (_d, store) = temp_store();
1305 let key = key_for("roundtrip");
1306 store
1307 .save(
1308 &key,
1309 b"hello-warm",
1310 Some(1.5),
1311 Some(7),
1312 EntryKind::Checkpoint,
1313 )
1314 .unwrap();
1315 let got = store.lookup(&key).unwrap().unwrap();
1316 assert_eq!(got.payload, b"hello-warm");
1317 assert_eq!(got.objective, Some(1.5));
1318 assert_eq!(got.iteration, Some(7));
1319 assert_eq!(got.kind, EntryKind::Checkpoint);
1320 }
1321
1322 #[test]
1323 fn lookup_picks_lowest_objective() {
1324 let (_d, store) = temp_store();
1325 let key = key_for("multi");
1326 store
1327 .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1328 .unwrap();
1329 store
1330 .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1331 .unwrap();
1332 store
1333 .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1334 .unwrap();
1335 let got = store.lookup(&key).unwrap().unwrap();
1336 assert_eq!(got.payload, b"better");
1337 assert_eq!(got.objective, Some(1.0));
1338 }
1339
1340 #[test]
1341 fn lookup_latest_ignores_objective_ordering() {
1342 let (_d, store) = temp_store();
1343 let key = key_for("latest-vs-best");
1344 store
1345 .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1346 .unwrap();
1347 store.test_advance_time(Duration::from_millis(2));
1348 store
1349 .save(
1350 &key,
1351 b"newer-higher-objective",
1352 Some(10.0),
1353 Some(2),
1354 EntryKind::Checkpoint,
1355 )
1356 .unwrap();
1357
1358 let best = store.lookup(&key).unwrap().unwrap();
1359 assert_eq!(best.payload, b"low-objective");
1360
1361 let latest = store.lookup_latest(&key).unwrap().unwrap();
1362 assert_eq!(latest.payload, b"newer-higher-objective");
1363 assert_eq!(latest.iteration, Some(2));
1364 }
1365
1366 #[test]
1367 fn tiebreak_final_beats_checkpoint() {
1368 let (_d, store) = temp_store();
1369 let key = key_for("tie");
1370 store
1371 .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1372 .unwrap();
1373 store
1375 .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1376 .unwrap();
1377 let got = store.lookup(&key).unwrap().unwrap();
1378 assert_eq!(got.payload, b"final");
1379 assert_eq!(got.kind, EntryKind::Final);
1380 }
1381
1382 #[test]
1383 fn tiebreak_latest_mtime_when_no_objective() {
1384 let (_d, store) = temp_store();
1385 let key = key_for("latest");
1386 store
1387 .save(&key, b"first", None, None, EntryKind::Checkpoint)
1388 .unwrap();
1389 store.test_advance_time(Duration::from_millis(1_100));
1390 store
1391 .save(&key, b"second", None, None, EntryKind::Checkpoint)
1392 .unwrap();
1393 let got = store.lookup(&key).unwrap().unwrap();
1394 assert_eq!(got.payload, b"second");
1395 }
1396
1397 #[test]
1398 fn corrupt_payload_is_cleaned_up() {
1399 let (_d, store) = temp_store();
1400 let key = key_for("corrupt");
1401 store
1402 .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1403 .unwrap();
1404 let dir = store.key_dir(&key);
1406 for entry in fs::read_dir(&dir).unwrap() {
1407 let p = entry.unwrap().path();
1408 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1409 fs::write(&p, b"tampered!").unwrap();
1410 }
1411 }
1412 let got = store.lookup(&key).unwrap();
1413 assert!(got.is_none(), "tampered entry must be rejected");
1414 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1416 assert!(remaining.is_empty(), "corrupt entry should be removed");
1417 }
1418
1419 #[test]
1420 fn corrupt_meta_json_is_cleaned_up() {
1421 let (_d, store) = temp_store();
1422 let key = key_for("badjson");
1423 store
1424 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1425 .unwrap();
1426 let dir = store.key_dir(&key);
1427 for entry in fs::read_dir(&dir).unwrap() {
1428 let p = entry.unwrap().path();
1429 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1430 fs::write(&p, b"{not valid json").unwrap();
1431 }
1432 }
1433 let got = store.lookup(&key).unwrap();
1434 assert!(got.is_none());
1435 }
1436
1437 #[test]
1438 fn schema_mismatched_entry_is_cleaned_up() {
1439 let (_d, store) = temp_store();
1440 let key = key_for("schema");
1441 store
1442 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1443 .unwrap();
1444 let dir = store.key_dir(&key);
1445 for entry in fs::read_dir(&dir).unwrap() {
1446 let p = entry.unwrap().path();
1447 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1448 let raw = fs::read(&p).unwrap();
1449 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1450 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1451 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1452 }
1453 }
1454 assert!(store.lookup(&key).unwrap().is_none());
1455 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1456 assert!(
1457 remaining.is_empty(),
1458 "schema-mismatched entry should be removed"
1459 );
1460 }
1461
1462 #[test]
1463 fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1464 let dir = tempfile::tempdir().unwrap();
1465 let store = WarmStartStore::open(
1466 dir.path().to_path_buf(),
1467 StoreOptions {
1468 size_budget_bytes: 6 * 1024,
1469 ttl: Duration::from_secs(3600),
1470 },
1471 )
1472 .unwrap();
1473 let stale_key = key_for("schema-size-stale");
1474 store
1475 .save(
1476 &stale_key,
1477 &vec![0u8; 4 * 1024],
1478 None,
1479 None,
1480 EntryKind::Checkpoint,
1481 )
1482 .unwrap();
1483
1484 let stale_dir = store.key_dir(&stale_key);
1485 let mut stale_meta = None;
1486 let mut stale_bin = None;
1487 for entry in fs::read_dir(&stale_dir).unwrap() {
1488 let p = entry.unwrap().path();
1489 match p.extension().and_then(|s| s.to_str()) {
1490 Some("json") => {
1491 let raw = fs::read(&p).unwrap();
1492 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1493 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1494 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1495 stale_meta = Some(p);
1496 }
1497 Some("bin") => stale_bin = Some(p),
1498 _ => {}
1499 }
1500 }
1501 let stale_meta = stale_meta.expect("saved entry should have metadata");
1502 let stale_bin = stale_bin.expect("saved entry should have payload");
1503
1504 let fresh_key = key_for("schema-size-fresh");
1505 store
1506 .save(
1507 &fresh_key,
1508 &vec![1u8; 2 * 1024],
1509 None,
1510 None,
1511 EntryKind::Checkpoint,
1512 )
1513 .unwrap();
1514
1515 assert!(
1516 !stale_meta.exists(),
1517 "schema-mismatched metadata should be removed during eviction scan"
1518 );
1519 assert!(
1520 !stale_bin.exists(),
1521 "schema-mismatched payload should be removed during eviction scan"
1522 );
1523
1524 let mut total = 0u64;
1525 for key_dir in fs::read_dir(store.root()).unwrap() {
1526 let key_dir = key_dir.unwrap().path();
1527 if key_dir.is_dir() {
1528 for entry in fs::read_dir(key_dir).unwrap() {
1529 total += fs::metadata(entry.unwrap().path()).unwrap().len();
1530 }
1531 }
1532 }
1533 assert!(
1534 total <= store.options().size_budget_bytes,
1535 "schema-mismatched bytes must not leak past size accounting (got {total})"
1536 );
1537 assert!(store.lookup(&stale_key).unwrap().is_none());
1538 assert!(store.lookup(&fresh_key).unwrap().is_some());
1539 }
1540
1541 #[test]
1542 fn missing_bin_treated_as_missing() {
1543 let (_d, store) = temp_store();
1544 let key = key_for("nobin");
1545 store
1546 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1547 .unwrap();
1548 let dir = store.key_dir(&key);
1549 for entry in fs::read_dir(&dir).unwrap() {
1550 let p = entry.unwrap().path();
1551 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1552 fs::remove_file(&p).unwrap();
1553 }
1554 }
1555 assert!(store.lookup(&key).unwrap().is_none());
1556 }
1557
1558 #[test]
1559 fn missing_key_returns_none() {
1560 let (_d, store) = temp_store();
1561 let key = key_for("absent");
1562 assert!(store.lookup(&key).unwrap().is_none());
1563 }
1564
1565 #[test]
1566 fn lru_eviction_under_size_budget() {
1567 let dir = tempfile::tempdir().unwrap();
1568 let store = WarmStartStore::open(
1570 dir.path().to_path_buf(),
1571 StoreOptions {
1572 size_budget_bytes: 4 * 1024,
1573 ttl: Duration::from_secs(3600),
1574 },
1575 )
1576 .unwrap();
1577 let mut keys = Vec::new();
1578 for i in 0..20 {
1579 let mut fp = Fingerprinter::new();
1580 fp.absorb_u64(b"i", i);
1581 let key = fp.finalize();
1582 keys.push(key);
1583 let payload = vec![0u8; 256];
1584 store
1585 .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1586 .unwrap();
1587 }
1588 let mut total = 0u64;
1590 for kd in fs::read_dir(store.root()).unwrap() {
1591 let kd = kd.unwrap().path();
1592 if kd.is_dir() {
1593 for f in fs::read_dir(&kd).unwrap() {
1594 total += fs::metadata(f.unwrap().path()).unwrap().len();
1595 }
1596 }
1597 }
1598 assert!(
1599 total <= 8 * 1024,
1600 "eviction failed to bound size (got {total})"
1601 );
1602 assert!(store.lookup(&keys[0]).unwrap().is_none());
1604 assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1605 }
1606
1607 #[test]
1608 fn ttl_drops_old_entries() {
1609 let dir = tempfile::tempdir().unwrap();
1617 let ttl = Duration::from_secs(60);
1618 let store = WarmStartStore::open(
1619 dir.path().to_path_buf(),
1620 StoreOptions {
1621 size_budget_bytes: 1024 * 1024,
1622 ttl,
1623 },
1624 )
1625 .unwrap();
1626 let key = key_for("ttl");
1627 store
1628 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1629 .unwrap();
1630 assert!(store.lookup(&key).unwrap().is_some());
1631 store.test_advance_time(ttl + Duration::from_secs(5));
1632 let other = key_for("ttl-other");
1634 store
1635 .save(&other, b"y", None, None, EntryKind::Checkpoint)
1636 .unwrap();
1637 assert!(store.lookup(&key).unwrap().is_none());
1639 assert!(store.lookup(&other).unwrap().is_some());
1640 }
1641
1642 #[test]
1643 fn orphan_temp_files_from_dead_processes_are_swept() {
1644 let (_d, store) = temp_store();
1645 let key = key_for("tmp");
1646 let dir = store.key_dir(&key);
1647 fs::create_dir_all(&dir).unwrap();
1648 let orphan_other = dir.join("r0-0.json.tmp.1.0");
1650 let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1651 fs::write(&orphan_other, b"orphan").unwrap();
1652 fs::write(&mine, b"mine").unwrap();
1653 store.evict_overflow().unwrap();
1654 assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1655 assert!(mine.exists(), "same-PID tmp file must be left alone");
1656 }
1657
1658 #[test]
1659 fn tmp_filenames_without_pid_are_skipped() {
1660 let (_d, store) = temp_store();
1662 let key = key_for("malformed");
1663 let dir = store.key_dir(&key);
1664 fs::create_dir_all(&dir).unwrap();
1665 let weird = dir.join("garbage.tmp.notapid.suffix");
1666 fs::write(&weird, b"x").unwrap();
1667 store.evict_overflow().unwrap();
1669 assert!(weird.exists());
1670 }
1671
1672 #[test]
1673 fn save_overwrite_keeps_single_entry() {
1674 let (_d, store) = temp_store();
1675 let key = key_for("overwrite");
1676 let id = store
1677 .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1678 .unwrap();
1679 store
1680 .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1681 .unwrap();
1682 let dir = store.key_dir(&key);
1684 let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1685 assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1686 let got = store.lookup(&key).unwrap().unwrap();
1687 assert_eq!(got.payload, b"v2");
1688 assert_eq!(got.objective, Some(1.0));
1689 }
1690
1691 #[test]
1692 fn write_and_promote_recreates_dir_removed_before_write() {
1693 let (_d, store) = temp_store();
1697 let key = key_for("race-recreate");
1698 let dir = store.key_dir(&key);
1699 assert!(!dir.exists());
1702 let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1703 let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1704 let bin_final = dir.join("r0.bin");
1705 let meta_final = dir.join("r0.json");
1706 let stamp_fn = || (0u64, 0u32);
1707 let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1708 write_and_promote_entry(&EntryWrite {
1709 dir: &dir,
1710 bin_tmp: &bin_tmp,
1711 meta_tmp: &meta_tmp,
1712 payload: b"payload",
1713 bin_final: &bin_final,
1714 meta_final: &meta_final,
1715 stamp_fn: &stamp_fn,
1716 build_meta_json: &build_meta_json,
1717 })
1718 .expect("promote into a missing dir must recreate it and succeed");
1719 assert!(bin_final.exists() && meta_final.exists());
1720 assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1721 }
1722
1723 #[test]
1724 fn save_survives_concurrent_eviction_removing_key_dir() {
1725 use std::sync::Arc;
1732 use std::sync::atomic::AtomicBool;
1733
1734 let dir = tempfile::tempdir().unwrap();
1735 let store = Arc::new(
1739 WarmStartStore::open(
1740 dir.path().to_path_buf(),
1741 StoreOptions {
1742 size_budget_bytes: 0,
1743 ttl: Duration::from_secs(60),
1744 },
1745 )
1746 .unwrap(),
1747 );
1748 let key = key_for("concurrent-evict");
1749 let stop = Arc::new(AtomicBool::new(false));
1750
1751 let evictor = {
1752 let store = Arc::clone(&store);
1753 let stop = Arc::clone(&stop);
1754 std::thread::spawn(move || {
1755 while !stop.load(Ordering::Relaxed) {
1756 store.evict_overflow().ok();
1757 }
1758 })
1759 };
1760
1761 let writers: Vec<_> = (0..4)
1762 .map(|w| {
1763 let store = Arc::clone(&store);
1764 std::thread::spawn(move || {
1765 for i in 0..200u32 {
1766 let payload = format!("w{w}-i{i}");
1767 store
1768 .save(
1769 &key,
1770 payload.as_bytes(),
1771 Some(i as f64),
1772 Some(i as u64),
1773 EntryKind::Checkpoint,
1774 )
1775 .expect("save must not fail with ENOENT under concurrent eviction");
1776 }
1777 })
1778 })
1779 .collect();
1780
1781 for h in writers {
1782 h.join().unwrap();
1783 }
1784 stop.store(true, Ordering::Relaxed);
1785 evictor.join().unwrap();
1786 }
1787
1788 #[test]
1789 fn keys_are_isolated() {
1790 let (_d, store) = temp_store();
1791 let a = key_for("a");
1792 let b = key_for("b");
1793 store
1794 .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1795 .unwrap();
1796 store
1797 .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1798 .unwrap();
1799 assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1800 assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1801 }
1802}