1use crate::warm_start::key::Fingerprint;
11use serde::{Deserialize, Serialize};
12use sha2::{Digest, Sha256};
13use std::collections::HashMap;
14use std::fs;
15use std::io::{self, Write as _};
16use std::path::{Path, PathBuf};
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex, OnceLock};
19use std::time::{Duration, SystemTime, UNIX_EPOCH};
20
21pub(crate) const SCHEMA_VERSION: u32 = 1;
24
25pub(crate) const DEFAULT_SIZE_BUDGET_BYTES: u64 = 1024 * 1024 * 1024;
27
28pub(crate) const DEFAULT_TTL_SECS: u64 = 60 * 60 * 24 * 30;
30
31#[derive(Debug, thiserror::Error)]
32pub enum StoreError {
33 #[error("io: {0}")]
34 Io(#[from] io::Error),
35 #[error("json: {0}")]
36 Json(#[from] serde_json::Error),
37}
38
39#[derive(Debug, Clone)]
41pub struct WarmStartEntry {
42 pub payload: Vec<u8>,
43 pub objective: Option<f64>,
44 pub iteration: Option<u64>,
45 pub written_unix_secs: u64,
46 pub kind: EntryKind,
47}
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50pub enum EntryKind {
51 Checkpoint,
53 Final,
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct OnDiskMeta {
59 schema_version: u32,
60 written_unix_secs: u64,
61 #[serde(default)]
65 written_nanos: u32,
66 objective: Option<f64>,
67 iteration: Option<u64>,
68 kind: EntryKind,
69 checksum_hex: String,
70 payload_bytes: u64,
71}
72
73#[derive(Debug, Clone)]
74pub struct StoreOptions {
75 pub size_budget_bytes: u64,
76 pub ttl: Duration,
77}
78
79impl Default for StoreOptions {
80 fn default() -> Self {
81 Self {
82 size_budget_bytes: DEFAULT_SIZE_BUDGET_BYTES,
83 ttl: Duration::from_secs(DEFAULT_TTL_SECS),
84 }
85 }
86}
87
88#[derive(Debug)]
89pub struct WarmStartStore {
90 root: PathBuf,
91 opts: StoreOptions,
92 index: Arc<Mutex<MetadataIndex>>,
95 byte_total: Arc<AtomicU64>,
104 save_counter: Arc<AtomicU64>,
107 last_evict_root_mtime: Arc<Mutex<Option<SystemTime>>>,
122 test_time_offset_ns: AtomicU64,
131}
132
133impl Clone for WarmStartStore {
134 fn clone(&self) -> Self {
135 Self {
136 root: self.root.clone(),
137 opts: self.opts.clone(),
138 index: Arc::clone(&self.index),
139 byte_total: Arc::clone(&self.byte_total),
144 save_counter: Arc::clone(&self.save_counter),
145 last_evict_root_mtime: Arc::clone(&self.last_evict_root_mtime),
146 test_time_offset_ns: AtomicU64::new(self.test_time_offset_ns.load(Ordering::Relaxed)),
147 }
148 }
149}
150
151impl WarmStartStore {
152 pub fn open(root: PathBuf, opts: StoreOptions) -> Result<Self, StoreError> {
154 fs::create_dir_all(&root)?;
155 Ok(Self {
156 root,
157 opts,
158 index: Arc::new(Mutex::new(MetadataIndex::default())),
159 byte_total: Arc::new(AtomicU64::new(0)),
160 save_counter: Arc::new(AtomicU64::new(0)),
161 last_evict_root_mtime: Arc::new(Mutex::new(None)),
162 test_time_offset_ns: AtomicU64::new(0),
163 })
164 }
165
166 pub fn root(&self) -> &Path {
167 &self.root
168 }
169
170 pub fn options(&self) -> &StoreOptions {
171 &self.opts
172 }
173
174 fn key_dir(&self, key: &Fingerprint) -> PathBuf {
175 self.root.join(key.to_hex())
176 }
177
178 pub fn lookup(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
186 self.lookup_with(key, LookupMode::Best)
187 }
188
189 pub fn lookup_latest(&self, key: &Fingerprint) -> Result<Option<WarmStartEntry>, StoreError> {
197 self.lookup_with(key, LookupMode::Latest)
198 }
199
200 fn lookup_with(
201 &self,
202 key: &Fingerprint,
203 mode: LookupMode,
204 ) -> Result<Option<WarmStartEntry>, StoreError> {
205 let dir = self.key_dir(key);
206 if !dir.exists() {
207 lookup_cache_invalidate(&LookupCacheKey { fp: *key, mode });
211 self.metadata_index_remove_key(key);
212 return Ok(None);
213 }
214 let cache_key = LookupCacheKey { fp: *key, mode };
223 let now_nanos = self.nanos_now();
224 if let Some(hit) = lookup_cache_get(&cache_key) {
225 if let Ok(md) = fs::metadata(&hit.meta_path)
226 && md.modified().ok() == Some(hit.meta_mtime)
227 {
228 let expired = self.opts.ttl.as_nanos() > 0
229 && now_nanos.saturating_sub(hit.write_nanos) >= self.opts.ttl.as_nanos();
230 if !expired {
231 return Ok(Some(hit.entry));
232 }
233 lookup_cache_invalidate(&cache_key);
234 let bin = hit.meta_path.with_extension("bin");
235 fs::remove_file(&hit.meta_path).ok();
236 fs::remove_file(&bin).ok();
237 self.metadata_index_remove(&hit.meta_path);
239 return Ok(None);
240 }
241 lookup_cache_invalidate(&cache_key);
242 }
243 let mut best: Option<(OnDiskMeta, PathBuf)> = None;
249 for scanned in self.scan_key_dir(&dir, now_nanos) {
250 let take = match best {
251 None => true,
252 Some((ref cur, _)) => mode.better(&scanned.meta, cur),
253 };
254 if take {
255 best = Some((scanned.meta, scanned.meta_path));
256 }
257 }
258 let (meta, meta_path) = match best {
259 Some(b) => b,
260 None => {
261 lookup_cache_invalidate(&cache_key);
262 return Ok(None);
263 }
264 };
265 let bin_path = meta_path.with_extension("bin");
266 let payload = match fs::read(&bin_path) {
267 Ok(v) => v,
268 Err(_) => return Ok(None),
269 };
270 if checksum_hex(&payload) != meta.checksum_hex {
272 fs::remove_file(&meta_path).ok();
273 fs::remove_file(&bin_path).ok();
274 lookup_cache_invalidate(&cache_key);
275 self.metadata_index_remove(&meta_path);
276 return Ok(None);
277 }
278 let entry = WarmStartEntry {
279 payload,
280 objective: meta.objective,
281 iteration: meta.iteration,
282 written_unix_secs: meta.written_unix_secs,
283 kind: meta.kind,
284 };
285 if let Ok(md) = fs::metadata(&meta_path)
290 && let Ok(mtime) = md.modified()
291 {
292 let write_nanos =
293 (meta.written_unix_secs as u128) * 1_000_000_000u128 + meta.written_nanos as u128;
294 lookup_cache_insert(
295 cache_key,
296 CachedLookup {
297 meta_path: meta_path.clone(),
298 meta_mtime: mtime,
299 write_nanos,
300 entry: entry.clone(),
301 },
302 );
303 }
304 Ok(Some(entry))
305 }
306
307 pub fn save(
310 &self,
311 key: &Fingerprint,
312 payload: &[u8],
313 objective: Option<f64>,
314 iteration: Option<u64>,
315 kind: EntryKind,
316 ) -> Result<String, StoreError> {
317 let run_id = self.fresh_run_id();
318 self.save_overwrite(key, &run_id, payload, objective, iteration, kind)?;
319 Ok(run_id)
320 }
321
322 pub fn save_overwrite(
325 &self,
326 key: &Fingerprint,
327 run_id: &str,
328 payload: &[u8],
329 objective: Option<f64>,
330 iteration: Option<u64>,
331 kind: EntryKind,
332 ) -> Result<(), StoreError> {
333 lookup_cache_invalidate(&LookupCacheKey {
340 fp: *key,
341 mode: LookupMode::Best,
342 });
343 lookup_cache_invalidate(&LookupCacheKey {
344 fp: *key,
345 mode: LookupMode::Latest,
346 });
347 let dir = self.key_dir(key);
348 let pid = std::process::id();
349 let checksum = checksum_hex(payload);
351 let objective_finite = objective.filter(|o| o.is_finite());
352 let nonce = self.nanos_now();
383 let bin_final = dir.join(format!("{run_id}.bin"));
384 let meta_final = dir.join(format!("{run_id}.json"));
385 let mut attempt = 0u8;
386 let build_meta_json = |secs: u64, subsec_nanos: u32| -> Result<Vec<u8>, StoreError> {
387 let meta = OnDiskMeta {
388 schema_version: SCHEMA_VERSION,
389 written_unix_secs: secs,
390 written_nanos: subsec_nanos,
391 objective: objective_finite,
392 iteration,
393 kind,
394 checksum_hex: checksum.clone(),
395 payload_bytes: payload.len() as u64,
396 };
397 Ok(serde_json::to_vec_pretty(&meta)?)
398 };
399 loop {
400 let bin_tmp = dir.join(format!("{run_id}.bin.tmp.{pid}.{nonce}.{attempt}"));
401 let meta_tmp = dir.join(format!("{run_id}.json.tmp.{pid}.{nonce}.{attempt}"));
402 let stamp_fn = || self.unix_now_parts();
403 let build_meta_for_io = |secs: u64, subsec_nanos: u32| -> io::Result<Vec<u8>> {
404 build_meta_json(secs, subsec_nanos)
405 .map_err(|e| io::Error::other(format!("meta build: {e:?}")))
406 };
407 match write_and_promote_entry(&EntryWrite {
408 dir: &dir,
409 bin_tmp: &bin_tmp,
410 meta_tmp: &meta_tmp,
411 payload,
412 bin_final: &bin_final,
413 meta_final: &meta_final,
414 stamp_fn: &stamp_fn,
415 build_meta_json: &build_meta_for_io,
416 }) {
417 Ok(()) => break,
418 Err(e) if e.kind() == io::ErrorKind::NotFound && attempt == 0 => {
419 fs::remove_file(&bin_tmp).ok();
423 fs::remove_file(&meta_tmp).ok();
424 attempt += 1;
425 continue;
426 }
427 Err(e) => {
428 fs::remove_file(&bin_tmp).ok();
429 fs::remove_file(&meta_tmp).ok();
430 fs::remove_file(&bin_final).ok();
431 return Err(StoreError::Io(e));
432 }
433 }
434 }
435 if let Ok(d) = fs::File::open(&dir) {
442 d.sync_all().ok();
443 }
444 self.metadata_index_upsert(&meta_final, &bin_final).ok();
445 let approx_added = payload.len() as u64 + APPROX_META_BYTES;
455 let prev_total = self.byte_total.fetch_add(approx_added, Ordering::Relaxed);
456 let new_total = prev_total + approx_added;
457 let n = self.save_counter.fetch_add(1, Ordering::Relaxed);
458 let over_budget = new_total > self.opts.size_budget_bytes;
459 if n == 0 || over_budget || n.is_multiple_of(EVICT_EVERY_N_SAVES) {
460 self.evict_overflow().ok();
461 }
462 Ok(())
463 }
464
465 pub fn evict_overflow(&self) -> Result<(), StoreError> {
475 let current_root_mtime = fs::metadata(&self.root)
488 .ok()
489 .and_then(|m| m.modified().ok());
490 if self.byte_total.load(Ordering::Relaxed) <= self.opts.size_budget_bytes
491 && let Some(now_mtime) = current_root_mtime
492 && let Ok(last) = self.last_evict_root_mtime.lock()
493 && *last == Some(now_mtime)
494 {
495 return Ok(());
496 }
497 let read_dir = match fs::read_dir(&self.root) {
498 Ok(rd) => rd,
499 Err(_) => return Ok(()),
500 };
501 let mut all: Vec<(PathBuf, PathBuf, u64, u128)> = Vec::new();
503 let now_nanos = self.nanos_now();
504 for key_dir_entry in read_dir {
505 let key_dir = match key_dir_entry {
506 Ok(e) => e.path(),
507 Err(_) => continue,
508 };
509 if !key_dir.is_dir() {
510 continue;
511 }
512 let scanned = self.scan_key_dir(&key_dir, now_nanos);
518 for entry in &scanned {
519 let write_nanos = (entry.meta.written_unix_secs as u128) * 1_000_000_000u128
520 + entry.meta.written_nanos as u128;
521 let total_bytes = entry.meta_len + entry.bin_len;
522 all.push((
523 entry.meta_path.clone(),
524 entry.bin_path.clone(),
525 total_bytes,
526 write_nanos,
527 ));
528 }
529 if scanned.is_empty()
531 && fs::read_dir(&key_dir)
532 .map(|mut it| it.next().is_none())
533 .unwrap_or(false)
534 {
535 fs::remove_dir(&key_dir).ok();
536 if let Ok(mut index) = self.index.lock() {
537 index.by_key_dir.remove(&key_dir);
538 }
539 }
540 }
541 let total: u64 = all.iter().map(|e| e.2).sum();
542 if total <= self.opts.size_budget_bytes {
543 self.byte_total.store(total, Ordering::Relaxed);
550 if let (Ok(mut last), Some(m)) = (
557 self.last_evict_root_mtime.lock(),
558 fs::metadata(&self.root)
559 .ok()
560 .and_then(|m| m.modified().ok()),
561 ) {
562 *last = Some(m);
563 }
564 return Ok(());
565 }
566 all.sort_by_key(|e| e.3);
567 let mut remaining = total;
568 for (meta, bin, bytes, _) in all.into_iter() {
569 if remaining <= self.opts.size_budget_bytes {
570 break;
571 }
572 fs::remove_file(&meta).ok();
573 fs::remove_file(&bin).ok();
574 self.metadata_index_remove(&meta);
575 remaining = remaining.saturating_sub(bytes);
576 }
577 self.byte_total.store(remaining, Ordering::Relaxed);
580 Ok(())
581 }
582}
583
584struct EntryWrite<'a> {
596 dir: &'a Path,
597 bin_tmp: &'a Path,
598 meta_tmp: &'a Path,
599 payload: &'a [u8],
600 bin_final: &'a Path,
601 meta_final: &'a Path,
602 stamp_fn: &'a dyn Fn() -> (u64, u32),
609 build_meta_json: &'a dyn Fn(u64, u32) -> io::Result<Vec<u8>>,
612}
613
614fn write_and_promote_entry(w: &EntryWrite<'_>) -> io::Result<()> {
615 fs::create_dir_all(w.dir)?;
619 {
620 let mut f = fs::File::create(w.bin_tmp)?;
621 f.write_all(w.payload)?;
622 f.sync_all().ok();
623 }
624 fs::rename(w.bin_tmp, w.bin_final)?;
628 let (secs, subsec_nanos) = (w.stamp_fn)();
635 let meta_json = (w.build_meta_json)(secs, subsec_nanos)?;
636 {
637 let mut f = fs::File::create(w.meta_tmp)?;
638 f.write_all(&meta_json)?;
639 f.sync_all().ok();
640 }
641 if let Err(e) = fs::rename(w.meta_tmp, w.meta_final) {
642 fs::remove_file(w.bin_final).ok();
645 return Err(e);
646 }
647 Ok(())
648}
649
650const APPROX_META_BYTES: u64 = 512;
654
655#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
657enum LookupMode {
658 Best,
660 Latest,
662}
663
664impl LookupMode {
665 fn better(&self, candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
666 match self {
667 LookupMode::Best => entry_better(candidate, current),
668 LookupMode::Latest => entry_newer(candidate, current),
669 }
670 }
671}
672
673#[derive(Clone, Copy, PartialEq, Eq, Hash)]
674struct LookupCacheKey {
675 fp: Fingerprint,
676 mode: LookupMode,
677}
678
679#[derive(Clone)]
680struct CachedLookup {
681 meta_path: PathBuf,
682 meta_mtime: SystemTime,
683 write_nanos: u128,
687 entry: WarmStartEntry,
688}
689
690#[derive(Debug, Default)]
691struct MetadataIndex {
692 by_meta_path: HashMap<PathBuf, IndexedMeta>,
693 by_key_dir: HashMap<PathBuf, ScannedDir>,
704}
705
706#[derive(Debug, Clone)]
707struct IndexedMeta {
708 meta_mtime: SystemTime,
709 meta_len: u64,
710 bin_len: u64,
711 meta: OnDiskMeta,
712}
713
714impl IndexedMeta {
715 fn matches(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
716 meta_md.modified().ok() == Some(self.meta_mtime)
717 && meta_md.len() == self.meta_len
718 && bin_md.len() == self.bin_len
719 }
720}
721
722#[derive(Debug, Clone)]
725struct ScannedDir {
726 dir_mtime: SystemTime,
727 entries: Vec<ScannedEntry>,
728}
729
730#[derive(Debug, Clone)]
734struct ScannedEntry {
735 meta_path: PathBuf,
736 bin_path: PathBuf,
737 meta_len: u64,
738 bin_len: u64,
739 meta_mtime: Option<SystemTime>,
740 bin_mtime: Option<SystemTime>,
741 meta: OnDiskMeta,
742}
743
744impl ScannedEntry {
745 fn matches_files(&self, meta_md: &fs::Metadata, bin_md: &fs::Metadata) -> bool {
746 meta_md.len() == self.meta_len
747 && bin_md.len() == self.bin_len
748 && meta_md.modified().ok() == self.meta_mtime
749 && bin_md.modified().ok() == self.bin_mtime
750 }
751}
752
753const fn meta_expired(secs: u64, nanos: u32, ttl: Duration, now_nanos: u128) -> bool {
758 let ttl_nanos = ttl.as_nanos();
759 if ttl_nanos == 0 {
760 return false;
761 }
762 let write_nanos = (secs as u128) * 1_000_000_000u128 + nanos as u128;
763 now_nanos.saturating_sub(write_nanos) >= ttl_nanos
764}
765
766fn lookup_cache() -> &'static Mutex<HashMap<LookupCacheKey, CachedLookup>> {
773 static CACHE: OnceLock<Mutex<HashMap<LookupCacheKey, CachedLookup>>> = OnceLock::new();
774 CACHE.get_or_init(|| Mutex::new(HashMap::new()))
775}
776
777const LOOKUP_CACHE_MAX_ENTRIES: usize = 128;
778const LOOKUP_CACHE_MAX_BYTES: usize = 256 * 1024 * 1024;
779
780const fn cached_lookup_resident_bytes(value: &CachedLookup) -> usize {
781 std::mem::size_of::<CachedLookup>().saturating_add(value.entry.payload.capacity())
782}
783
784fn lookup_cache_get(key: &LookupCacheKey) -> Option<CachedLookup> {
785 let guard = lookup_cache().lock().ok()?;
786 guard.get(key).cloned()
787}
788
789fn lookup_cache_insert(key: LookupCacheKey, val: CachedLookup) {
790 if let Ok(mut guard) = lookup_cache().lock() {
791 let new_bytes = cached_lookup_resident_bytes(&val);
792 if new_bytes > LOOKUP_CACHE_MAX_BYTES {
793 return;
794 }
795 let mut resident_bytes: usize = guard.values().map(cached_lookup_resident_bytes).sum();
796 if let Some(old) = guard.remove(&key) {
797 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
798 }
799 while guard.len() >= LOOKUP_CACHE_MAX_ENTRIES
800 || resident_bytes.saturating_add(new_bytes) > LOOKUP_CACHE_MAX_BYTES
801 {
802 let oldest = guard
803 .iter()
804 .min_by_key(|(_, cached)| cached.write_nanos)
805 .map(|(old_key, _)| *old_key);
806 let Some(oldest) = oldest else {
807 break;
808 };
809 if let Some(old) = guard.remove(&oldest) {
810 resident_bytes = resident_bytes.saturating_sub(cached_lookup_resident_bytes(&old));
811 }
812 }
813 guard.insert(key, val);
814 }
815}
816
817fn lookup_cache_invalidate(key: &LookupCacheKey) {
818 if let Ok(mut guard) = lookup_cache().lock() {
819 guard.remove(key);
820 }
821}
822
823const EVICT_EVERY_N_SAVES: u64 = 32;
828
829fn parse_tmp_pid(name: &str) -> Option<u32> {
830 let tail = name.split(".tmp.").nth(1)?;
834 let pid_str = tail.split('.').next()?;
835 pid_str.parse::<u32>().ok()
836}
837
838fn read_meta(path: &Path) -> Result<OnDiskMeta, StoreError> {
839 let bytes = fs::read(path)?;
840 let parsed: OnDiskMeta = serde_json::from_slice(&bytes)?;
841 Ok(parsed)
842}
843
844fn entry_better(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
845 match (candidate.objective, current.objective) {
846 (Some(c), Some(d)) => {
847 if (c - d).abs() < 1e-12 {
848 match (candidate.kind, current.kind) {
849 (EntryKind::Final, EntryKind::Checkpoint) => true,
850 (EntryKind::Checkpoint, EntryKind::Final) => false,
851 _ => entry_newer(candidate, current),
852 }
853 } else {
854 c < d
855 }
856 }
857 (Some(_), None) => true,
858 (None, Some(_)) => false,
859 (None, None) => entry_newer(candidate, current),
860 }
861}
862
863fn entry_newer(candidate: &OnDiskMeta, current: &OnDiskMeta) -> bool {
864 let candidate_stamp = (
865 candidate.written_unix_secs,
866 candidate.written_nanos,
867 candidate_kind_rank(candidate.kind),
868 );
869 let current_stamp = (
870 current.written_unix_secs,
871 current.written_nanos,
872 candidate_kind_rank(current.kind),
873 );
874 candidate_stamp > current_stamp
875}
876
877const fn candidate_kind_rank(kind: EntryKind) -> u8 {
878 match kind {
879 EntryKind::Checkpoint => 0,
880 EntryKind::Final => 1,
881 }
882}
883
884fn checksum_hex(payload: &[u8]) -> String {
885 let mut h = Sha256::new();
886 h.update(payload);
887 let out = h.finalize();
888 let mut s = String::with_capacity(out.len() * 2);
889 for b in out.iter() {
890 use std::fmt::Write;
891 write!(&mut s, "{:02x}", b).expect("writing to String is infallible");
892 }
893 s
894}
895
896impl WarmStartStore {
897 fn read_meta_indexed(
898 &self,
899 path: &Path,
900 meta_md: &fs::Metadata,
901 bin_md: &fs::Metadata,
902 ) -> Result<OnDiskMeta, StoreError> {
903 if let Ok(index) = self.index.lock()
904 && let Some(cached) = index.by_meta_path.get(path)
905 && cached.matches(meta_md, bin_md)
906 {
907 return Ok(cached.meta.clone());
908 }
909
910 let meta = read_meta(path)?;
911 let Some(meta_mtime) = meta_md.modified().ok() else {
912 return Ok(meta);
913 };
914 if let Ok(mut index) = self.index.lock() {
915 index.by_meta_path.insert(
916 path.to_path_buf(),
917 IndexedMeta {
918 meta_mtime,
919 meta_len: meta_md.len(),
920 bin_len: bin_md.len(),
921 meta: meta.clone(),
922 },
923 );
924 }
925 Ok(meta)
926 }
927
928 fn metadata_index_upsert(&self, meta_path: &Path, bin_path: &Path) -> Result<(), StoreError> {
929 let meta_md = fs::metadata(meta_path)?;
930 let bin_md = fs::metadata(bin_path)?;
931 self.read_meta_indexed(meta_path, &meta_md, &bin_md)?;
932 if let Some(parent) = meta_path.parent()
935 && let Ok(mut index) = self.index.lock()
936 {
937 index.by_key_dir.remove(parent);
938 }
939 Ok(())
940 }
941
942 fn metadata_index_remove(&self, meta_path: &Path) {
943 if let Ok(mut index) = self.index.lock() {
944 index.by_meta_path.remove(meta_path);
945 if let Some(parent) = meta_path.parent() {
946 index.by_key_dir.remove(parent);
947 }
948 }
949 }
950
951 fn metadata_index_remove_key(&self, key: &Fingerprint) {
952 let dir = self.key_dir(key);
953 if let Ok(mut index) = self.index.lock() {
954 index.by_meta_path.retain(|path, _| !path.starts_with(&dir));
955 index.by_key_dir.remove(&dir);
956 }
957 }
958
959 fn cached_dir_scan(&self, dir: &Path, dir_md: &fs::Metadata) -> Option<Vec<ScannedEntry>> {
970 let dir_mtime = dir_md.modified().ok()?;
971 let index = self.index.lock().ok()?;
972 let cached = index.by_key_dir.get(dir)?;
973 if cached.dir_mtime != dir_mtime {
974 return None;
975 }
976 for entry in &cached.entries {
977 let meta_md = fs::metadata(&entry.meta_path).ok()?;
978 let bin_md = fs::metadata(&entry.bin_path).ok()?;
979 if !entry.matches_files(&meta_md, &bin_md) {
980 return None;
981 }
982 }
983 Some(cached.entries.clone())
984 }
985
986 fn store_dir_scan(&self, dir: &Path, dir_mtime: SystemTime, entries: &[ScannedEntry]) {
987 if let Ok(mut index) = self.index.lock() {
988 index.by_key_dir.insert(
989 dir.to_path_buf(),
990 ScannedDir {
991 dir_mtime,
992 entries: entries.to_vec(),
993 },
994 );
995 }
996 }
997
998 fn scan_key_dir(&self, dir: &Path, now_nanos: u128) -> Vec<ScannedEntry> {
1012 let dir_md = match fs::metadata(dir) {
1013 Ok(m) => m,
1014 Err(_) => return Vec::new(),
1015 };
1016 if let Some(cached) = self.cached_dir_scan(dir, &dir_md) {
1017 let any_expired = cached.iter().any(|e| {
1024 meta_expired(
1025 e.meta.written_unix_secs,
1026 e.meta.written_nanos,
1027 self.opts.ttl,
1028 now_nanos,
1029 )
1030 });
1031 if !any_expired {
1032 return cached;
1033 }
1034 let mut survivors = Vec::with_capacity(cached.len());
1035 for entry in cached {
1036 if meta_expired(
1037 entry.meta.written_unix_secs,
1038 entry.meta.written_nanos,
1039 self.opts.ttl,
1040 now_nanos,
1041 ) {
1042 fs::remove_file(&entry.meta_path).ok();
1043 fs::remove_file(&entry.bin_path).ok();
1044 self.metadata_index_remove(&entry.meta_path);
1045 } else {
1046 survivors.push(entry);
1047 }
1048 }
1049 if let Some(mtime) = fs::metadata(dir).ok().and_then(|m| m.modified().ok()) {
1050 self.store_dir_scan(dir, mtime, &survivors);
1051 }
1052 return survivors;
1053 }
1054 let read_dir = match fs::read_dir(dir) {
1055 Ok(rd) => rd,
1056 Err(_) => return Vec::new(),
1057 };
1058 let mut entries = Vec::new();
1059 let mut mutated = false;
1060 for f in read_dir {
1061 let path = match f {
1062 Ok(e) => e.path(),
1063 Err(_) => continue,
1064 };
1065 let name = match path.file_name().and_then(|s| s.to_str()) {
1066 Some(s) => s,
1067 None => continue,
1068 };
1069 if name.contains(".tmp.") {
1070 if let Some(pid) = parse_tmp_pid(name)
1071 && pid != std::process::id()
1072 {
1073 fs::remove_file(&path).ok();
1074 mutated = true;
1075 }
1076 continue;
1077 }
1078 if path.extension().and_then(|s| s.to_str()) != Some("json") {
1079 continue;
1080 }
1081 let meta_md = match fs::metadata(&path) {
1082 Ok(m) => m,
1083 Err(_) => continue,
1084 };
1085 let bin = path.with_extension("bin");
1086 let bin_md = match fs::metadata(&bin) {
1087 Ok(m) => m,
1088 Err(_) => {
1089 fs::remove_file(&path).ok();
1090 self.metadata_index_remove(&path);
1091 mutated = true;
1092 continue;
1093 }
1094 };
1095 let meta = match self.read_meta_indexed(&path, &meta_md, &bin_md) {
1096 Ok(m) => m,
1097 Err(_) => {
1098 fs::remove_file(&path).ok();
1099 fs::remove_file(&bin).ok();
1100 self.metadata_index_remove(&path);
1101 mutated = true;
1102 continue;
1103 }
1104 };
1105 if meta.schema_version != SCHEMA_VERSION {
1106 fs::remove_file(&path).ok();
1107 fs::remove_file(&bin).ok();
1108 self.metadata_index_remove(&path);
1109 mutated = true;
1110 continue;
1111 }
1112 if meta_expired(
1113 meta.written_unix_secs,
1114 meta.written_nanos,
1115 self.opts.ttl,
1116 now_nanos,
1117 ) {
1118 fs::remove_file(&path).ok();
1119 fs::remove_file(&bin).ok();
1120 self.metadata_index_remove(&path);
1121 mutated = true;
1122 continue;
1123 }
1124 entries.push(ScannedEntry {
1125 meta_path: path,
1126 bin_path: bin,
1127 meta_len: meta_md.len(),
1128 bin_len: bin_md.len(),
1129 meta_mtime: meta_md.modified().ok(),
1130 bin_mtime: bin_md.modified().ok(),
1131 meta,
1132 });
1133 }
1134 let final_mtime = if mutated {
1138 fs::metadata(dir).ok().and_then(|m| m.modified().ok())
1139 } else {
1140 dir_md.modified().ok()
1141 };
1142 if let Some(mtime) = final_mtime {
1143 self.store_dir_scan(dir, mtime, &entries);
1144 }
1145 entries
1146 }
1147
1148 fn test_time_offset_ns(&self) -> u64 {
1149 self.test_time_offset_ns.load(Ordering::Relaxed)
1150 }
1151
1152 fn unix_now_parts(&self) -> (u64, u32) {
1153 let base = SystemTime::now()
1154 .duration_since(UNIX_EPOCH)
1155 .map(|d| d.as_nanos())
1156 .unwrap_or(0);
1157 let total = base.saturating_add(u128::from(self.test_time_offset_ns()));
1158 let secs = (total / 1_000_000_000u128) as u64;
1159 let nanos = (total % 1_000_000_000u128) as u32;
1160 (secs, nanos)
1161 }
1162
1163 fn nanos_now(&self) -> u128 {
1164 let base = SystemTime::now()
1165 .duration_since(UNIX_EPOCH)
1166 .map(|d| d.as_nanos())
1167 .unwrap_or(0);
1168 base.saturating_add(u128::from(self.test_time_offset_ns()))
1169 }
1170
1171 fn fresh_run_id(&self) -> String {
1172 let pid = std::process::id();
1173 let nanos = self.nanos_now();
1174 format!("r{pid:x}-{nanos:x}")
1175 }
1176}
1177
1178#[cfg(test)]
1179mod tests {
1180 use super::*;
1181 use crate::warm_start::key::Fingerprinter;
1182
1183 impl WarmStartStore {
1184 fn test_advance_time(&self, dur: Duration) {
1188 self.test_time_offset_ns
1189 .fetch_add(dur.as_nanos() as u64, Ordering::Relaxed);
1190 }
1191 }
1192
1193 fn temp_store() -> (tempfile::TempDir, WarmStartStore) {
1194 let dir = tempfile::tempdir().unwrap();
1195 let store = WarmStartStore::open(
1196 dir.path().to_path_buf(),
1197 StoreOptions {
1198 size_budget_bytes: 1024 * 1024,
1199 ttl: Duration::from_secs(60),
1200 },
1201 )
1202 .unwrap();
1203 (dir, store)
1204 }
1205
1206 fn key_for(s: &str) -> Fingerprint {
1207 let mut fp = Fingerprinter::new();
1208 fp.absorb_str(b"test", s);
1209 fp.finalize()
1210 }
1211
1212 #[test]
1213 fn roundtrip_save_then_lookup() {
1214 let (_d, store) = temp_store();
1215 let key = key_for("roundtrip");
1216 let _id = store
1217 .save(
1218 &key,
1219 b"hello-warm",
1220 Some(1.5),
1221 Some(7),
1222 EntryKind::Checkpoint,
1223 )
1224 .unwrap();
1225 let got = store.lookup(&key).unwrap().unwrap();
1226 assert_eq!(got.payload, b"hello-warm");
1227 assert_eq!(got.objective, Some(1.5));
1228 assert_eq!(got.iteration, Some(7));
1229 assert_eq!(got.kind, EntryKind::Checkpoint);
1230 }
1231
1232 #[test]
1233 fn lookup_picks_lowest_objective() {
1234 let (_d, store) = temp_store();
1235 let key = key_for("multi");
1236 store
1237 .save(&key, b"worse", Some(3.0), Some(1), EntryKind::Checkpoint)
1238 .unwrap();
1239 store
1240 .save(&key, b"better", Some(1.0), Some(2), EntryKind::Checkpoint)
1241 .unwrap();
1242 store
1243 .save(&key, b"mid", Some(2.0), Some(3), EntryKind::Checkpoint)
1244 .unwrap();
1245 let got = store.lookup(&key).unwrap().unwrap();
1246 assert_eq!(got.payload, b"better");
1247 assert_eq!(got.objective, Some(1.0));
1248 }
1249
1250 #[test]
1251 fn lookup_latest_ignores_objective_ordering() {
1252 let (_d, store) = temp_store();
1253 let key = key_for("latest-vs-best");
1254 store
1255 .save(&key, b"low-objective", Some(1.0), Some(1), EntryKind::Final)
1256 .unwrap();
1257 store.test_advance_time(Duration::from_millis(2));
1258 store
1259 .save(
1260 &key,
1261 b"newer-higher-objective",
1262 Some(10.0),
1263 Some(2),
1264 EntryKind::Checkpoint,
1265 )
1266 .unwrap();
1267
1268 let best = store.lookup(&key).unwrap().unwrap();
1269 assert_eq!(best.payload, b"low-objective");
1270
1271 let latest = store.lookup_latest(&key).unwrap().unwrap();
1272 assert_eq!(latest.payload, b"newer-higher-objective");
1273 assert_eq!(latest.iteration, Some(2));
1274 }
1275
1276 #[test]
1277 fn tiebreak_final_beats_checkpoint() {
1278 let (_d, store) = temp_store();
1279 let key = key_for("tie");
1280 store
1281 .save(&key, b"ckpt", Some(1.0), None, EntryKind::Checkpoint)
1282 .unwrap();
1283 store
1285 .save(&key, b"final", Some(1.0), None, EntryKind::Final)
1286 .unwrap();
1287 let got = store.lookup(&key).unwrap().unwrap();
1288 assert_eq!(got.payload, b"final");
1289 assert_eq!(got.kind, EntryKind::Final);
1290 }
1291
1292 #[test]
1293 fn tiebreak_latest_mtime_when_no_objective() {
1294 let (_d, store) = temp_store();
1295 let key = key_for("latest");
1296 store
1297 .save(&key, b"first", None, None, EntryKind::Checkpoint)
1298 .unwrap();
1299 store.test_advance_time(Duration::from_millis(1_100));
1300 store
1301 .save(&key, b"second", None, None, EntryKind::Checkpoint)
1302 .unwrap();
1303 let got = store.lookup(&key).unwrap().unwrap();
1304 assert_eq!(got.payload, b"second");
1305 }
1306
1307 #[test]
1308 fn corrupt_payload_is_cleaned_up() {
1309 let (_d, store) = temp_store();
1310 let key = key_for("corrupt");
1311 store
1312 .save(&key, b"original", Some(0.0), None, EntryKind::Checkpoint)
1313 .unwrap();
1314 let dir = store.key_dir(&key);
1316 for entry in fs::read_dir(&dir).unwrap() {
1317 let p = entry.unwrap().path();
1318 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1319 fs::write(&p, b"tampered!").unwrap();
1320 }
1321 }
1322 let got = store.lookup(&key).unwrap();
1323 assert!(got.is_none(), "tampered entry must be rejected");
1324 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1326 assert!(remaining.is_empty(), "corrupt entry should be removed");
1327 }
1328
1329 #[test]
1330 fn corrupt_meta_json_is_cleaned_up() {
1331 let (_d, store) = temp_store();
1332 let key = key_for("badjson");
1333 store
1334 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1335 .unwrap();
1336 let dir = store.key_dir(&key);
1337 for entry in fs::read_dir(&dir).unwrap() {
1338 let p = entry.unwrap().path();
1339 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1340 fs::write(&p, b"{not valid json").unwrap();
1341 }
1342 }
1343 let got = store.lookup(&key).unwrap();
1344 assert!(got.is_none());
1345 }
1346
1347 #[test]
1348 fn schema_mismatched_entry_is_cleaned_up() {
1349 let (_d, store) = temp_store();
1350 let key = key_for("schema");
1351 store
1352 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1353 .unwrap();
1354 let dir = store.key_dir(&key);
1355 for entry in fs::read_dir(&dir).unwrap() {
1356 let p = entry.unwrap().path();
1357 if p.extension().and_then(|s| s.to_str()) == Some("json") {
1358 let raw = fs::read(&p).unwrap();
1359 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1360 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1361 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1362 }
1363 }
1364 assert!(store.lookup(&key).unwrap().is_none());
1365 let remaining: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1366 assert!(
1367 remaining.is_empty(),
1368 "schema-mismatched entry should be removed"
1369 );
1370 }
1371
1372 #[test]
1373 fn schema_mismatched_entry_is_removed_during_save_eviction_path() {
1374 let dir = tempfile::tempdir().unwrap();
1375 let store = WarmStartStore::open(
1376 dir.path().to_path_buf(),
1377 StoreOptions {
1378 size_budget_bytes: 6 * 1024,
1379 ttl: Duration::from_secs(3600),
1380 },
1381 )
1382 .unwrap();
1383 let stale_key = key_for("schema-size-stale");
1384 store
1385 .save(
1386 &stale_key,
1387 &vec![0u8; 4 * 1024],
1388 None,
1389 None,
1390 EntryKind::Checkpoint,
1391 )
1392 .unwrap();
1393
1394 let stale_dir = store.key_dir(&stale_key);
1395 let mut stale_meta = None;
1396 let mut stale_bin = None;
1397 for entry in fs::read_dir(&stale_dir).unwrap() {
1398 let p = entry.unwrap().path();
1399 match p.extension().and_then(|s| s.to_str()) {
1400 Some("json") => {
1401 let raw = fs::read(&p).unwrap();
1402 let mut parsed: serde_json::Value = serde_json::from_slice(&raw).unwrap();
1403 parsed["schema_version"] = serde_json::json!(SCHEMA_VERSION + 99);
1404 fs::write(&p, serde_json::to_vec_pretty(&parsed).unwrap()).unwrap();
1405 stale_meta = Some(p);
1406 }
1407 Some("bin") => stale_bin = Some(p),
1408 _ => {}
1409 }
1410 }
1411 let stale_meta = stale_meta.expect("saved entry should have metadata");
1412 let stale_bin = stale_bin.expect("saved entry should have payload");
1413
1414 let fresh_key = key_for("schema-size-fresh");
1415 store
1416 .save(
1417 &fresh_key,
1418 &vec![1u8; 2 * 1024],
1419 None,
1420 None,
1421 EntryKind::Checkpoint,
1422 )
1423 .unwrap();
1424
1425 assert!(
1426 !stale_meta.exists(),
1427 "schema-mismatched metadata should be removed during eviction scan"
1428 );
1429 assert!(
1430 !stale_bin.exists(),
1431 "schema-mismatched payload should be removed during eviction scan"
1432 );
1433
1434 let mut total = 0u64;
1435 for key_dir in fs::read_dir(store.root()).unwrap() {
1436 let key_dir = key_dir.unwrap().path();
1437 if key_dir.is_dir() {
1438 for entry in fs::read_dir(key_dir).unwrap() {
1439 total += fs::metadata(entry.unwrap().path()).unwrap().len();
1440 }
1441 }
1442 }
1443 assert!(
1444 total <= store.options().size_budget_bytes,
1445 "schema-mismatched bytes must not leak past size accounting (got {total})"
1446 );
1447 assert!(store.lookup(&stale_key).unwrap().is_none());
1448 assert!(store.lookup(&fresh_key).unwrap().is_some());
1449 }
1450
1451 #[test]
1452 fn missing_bin_treated_as_missing() {
1453 let (_d, store) = temp_store();
1454 let key = key_for("nobin");
1455 store
1456 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1457 .unwrap();
1458 let dir = store.key_dir(&key);
1459 for entry in fs::read_dir(&dir).unwrap() {
1460 let p = entry.unwrap().path();
1461 if p.extension().and_then(|s| s.to_str()) == Some("bin") {
1462 fs::remove_file(&p).unwrap();
1463 }
1464 }
1465 assert!(store.lookup(&key).unwrap().is_none());
1466 }
1467
1468 #[test]
1469 fn missing_key_returns_none() {
1470 let (_d, store) = temp_store();
1471 let key = key_for("absent");
1472 assert!(store.lookup(&key).unwrap().is_none());
1473 }
1474
1475 #[test]
1476 fn lru_eviction_under_size_budget() {
1477 let dir = tempfile::tempdir().unwrap();
1478 let store = WarmStartStore::open(
1480 dir.path().to_path_buf(),
1481 StoreOptions {
1482 size_budget_bytes: 4 * 1024,
1483 ttl: Duration::from_secs(3600),
1484 },
1485 )
1486 .unwrap();
1487 let mut keys = Vec::new();
1488 for i in 0..20 {
1489 let mut fp = Fingerprinter::new();
1490 fp.absorb_u64(b"i", i);
1491 let key = fp.finalize();
1492 keys.push(key);
1493 let payload = vec![0u8; 256];
1494 store
1495 .save(&key, &payload, Some(i as f64), None, EntryKind::Checkpoint)
1496 .unwrap();
1497 }
1498 let mut total = 0u64;
1500 for kd in fs::read_dir(store.root()).unwrap() {
1501 let kd = kd.unwrap().path();
1502 if kd.is_dir() {
1503 for f in fs::read_dir(&kd).unwrap() {
1504 total += fs::metadata(f.unwrap().path()).unwrap().len();
1505 }
1506 }
1507 }
1508 assert!(
1509 total <= 8 * 1024,
1510 "eviction failed to bound size (got {total})"
1511 );
1512 assert!(store.lookup(&keys[0]).unwrap().is_none());
1514 assert!(store.lookup(keys.last().unwrap()).unwrap().is_some());
1515 }
1516
1517 #[test]
1518 fn ttl_drops_old_entries() {
1519 let dir = tempfile::tempdir().unwrap();
1527 let ttl = Duration::from_secs(60);
1528 let store = WarmStartStore::open(
1529 dir.path().to_path_buf(),
1530 StoreOptions {
1531 size_budget_bytes: 1024 * 1024,
1532 ttl,
1533 },
1534 )
1535 .unwrap();
1536 let key = key_for("ttl");
1537 store
1538 .save(&key, b"x", None, None, EntryKind::Checkpoint)
1539 .unwrap();
1540 assert!(store.lookup(&key).unwrap().is_some());
1541 store.test_advance_time(ttl + Duration::from_secs(5));
1542 let other = key_for("ttl-other");
1544 store
1545 .save(&other, b"y", None, None, EntryKind::Checkpoint)
1546 .unwrap();
1547 assert!(store.lookup(&key).unwrap().is_none());
1549 assert!(store.lookup(&other).unwrap().is_some());
1550 }
1551
1552 #[test]
1553 fn orphan_temp_files_from_dead_processes_are_swept() {
1554 let (_d, store) = temp_store();
1555 let key = key_for("tmp");
1556 let dir = store.key_dir(&key);
1557 fs::create_dir_all(&dir).unwrap();
1558 let orphan_other = dir.join("r0-0.json.tmp.1.0");
1560 let mine = dir.join(format!("r0-0.bin.tmp.{}.0", std::process::id()));
1561 fs::write(&orphan_other, b"orphan").unwrap();
1562 fs::write(&mine, b"mine").unwrap();
1563 store.evict_overflow().unwrap();
1564 assert!(!orphan_other.exists(), "other-PID tmp file should be swept");
1565 assert!(mine.exists(), "same-PID tmp file must be left alone");
1566 }
1567
1568 #[test]
1569 fn tmp_filenames_without_pid_are_skipped() {
1570 let (_d, store) = temp_store();
1572 let key = key_for("malformed");
1573 let dir = store.key_dir(&key);
1574 fs::create_dir_all(&dir).unwrap();
1575 let weird = dir.join("garbage.tmp.notapid.suffix");
1576 fs::write(&weird, b"x").unwrap();
1577 store.evict_overflow().unwrap();
1579 assert!(weird.exists());
1580 }
1581
1582 #[test]
1583 fn save_overwrite_keeps_single_entry() {
1584 let (_d, store) = temp_store();
1585 let key = key_for("overwrite");
1586 let id = store
1587 .save(&key, b"v1", Some(2.0), Some(1), EntryKind::Checkpoint)
1588 .unwrap();
1589 store
1590 .save_overwrite(&key, &id, b"v2", Some(1.0), Some(2), EntryKind::Checkpoint)
1591 .unwrap();
1592 let dir = store.key_dir(&key);
1594 let files: Vec<_> = fs::read_dir(&dir).unwrap().collect();
1595 assert_eq!(files.len(), 2, "overwrite should not create a new run-id");
1596 let got = store.lookup(&key).unwrap().unwrap();
1597 assert_eq!(got.payload, b"v2");
1598 assert_eq!(got.objective, Some(1.0));
1599 }
1600
1601 #[test]
1602 fn write_and_promote_recreates_dir_removed_before_write() {
1603 let (_d, store) = temp_store();
1607 let key = key_for("race-recreate");
1608 let dir = store.key_dir(&key);
1609 assert!(!dir.exists());
1612 let bin_tmp = dir.join("r0.bin.tmp.1.0.0");
1613 let meta_tmp = dir.join("r0.json.tmp.1.0.0");
1614 let bin_final = dir.join("r0.bin");
1615 let meta_final = dir.join("r0.json");
1616 let stamp_fn = || (0u64, 0u32);
1617 let build_meta_json = |_: u64, _: u32| -> io::Result<Vec<u8>> { Ok(b"{}".to_vec()) };
1618 write_and_promote_entry(&EntryWrite {
1619 dir: &dir,
1620 bin_tmp: &bin_tmp,
1621 meta_tmp: &meta_tmp,
1622 payload: b"payload",
1623 bin_final: &bin_final,
1624 meta_final: &meta_final,
1625 stamp_fn: &stamp_fn,
1626 build_meta_json: &build_meta_json,
1627 })
1628 .expect("promote into a missing dir must recreate it and succeed");
1629 assert!(bin_final.exists() && meta_final.exists());
1630 assert_eq!(fs::read(&bin_final).unwrap(), b"payload");
1631 }
1632
1633 #[test]
1634 fn save_survives_concurrent_eviction_removing_key_dir() {
1635 use std::sync::Arc;
1642 use std::sync::atomic::AtomicBool;
1643
1644 let dir = tempfile::tempdir().unwrap();
1645 let store = Arc::new(
1649 WarmStartStore::open(
1650 dir.path().to_path_buf(),
1651 StoreOptions {
1652 size_budget_bytes: 0,
1653 ttl: Duration::from_secs(60),
1654 },
1655 )
1656 .unwrap(),
1657 );
1658 let key = key_for("concurrent-evict");
1659 let stop = Arc::new(AtomicBool::new(false));
1660
1661 let evictor = {
1662 let store = Arc::clone(&store);
1663 let stop = Arc::clone(&stop);
1664 std::thread::spawn(move || {
1665 while !stop.load(Ordering::Relaxed) {
1666 store.evict_overflow().ok();
1667 }
1668 })
1669 };
1670
1671 let writers: Vec<_> = (0..4)
1672 .map(|w| {
1673 let store = Arc::clone(&store);
1674 std::thread::spawn(move || {
1675 for i in 0..200u32 {
1676 let payload = format!("w{w}-i{i}");
1677 store
1678 .save(
1679 &key,
1680 payload.as_bytes(),
1681 Some(i as f64),
1682 Some(i as u64),
1683 EntryKind::Checkpoint,
1684 )
1685 .expect("save must not fail with ENOENT under concurrent eviction");
1686 }
1687 })
1688 })
1689 .collect();
1690
1691 for h in writers {
1692 h.join().unwrap();
1693 }
1694 stop.store(true, Ordering::Relaxed);
1695 evictor.join().unwrap();
1696 }
1697
1698 #[test]
1699 fn keys_are_isolated() {
1700 let (_d, store) = temp_store();
1701 let a = key_for("a");
1702 let b = key_for("b");
1703 store
1704 .save(&a, b"AAA", Some(1.0), None, EntryKind::Final)
1705 .unwrap();
1706 store
1707 .save(&b, b"BBB", Some(1.0), None, EntryKind::Final)
1708 .unwrap();
1709 assert_eq!(store.lookup(&a).unwrap().unwrap().payload, b"AAA");
1710 assert_eq!(store.lookup(&b).unwrap().unwrap().payload, b"BBB");
1711 }
1712}