Skip to main content

verdant_runtime/
store.rs

1//! Content-addressed payload store. The cache key is the blake3 hash of
2//! the canonical *input* bytes for a tool call (not the output); the value
3//! is the exact output payload bytes that the MCP tool fed back to the
4//! model on its original execution. The store is append-only for M1; M2
5//! will add eviction.
6//!
7//! Layout on disk:
8//!
9//! ```text
10//! <root>/
11//!   ab/                       # first two hex chars of the key
12//!     ab12cd...ef.payload     # raw bytes
13//!     ab12cd...ef.meta.json   # invalidation metadata (raw_hash, size, kind)
14//! ```
15//!
16//! Keys are 64-char lowercase hex strings; we shard by the first two chars
17//! so a single project does not produce a directory with 100k+ entries.
18
19use serde::{Deserialize, Serialize};
20use std::fs;
21use std::io::{self, Read as _, Write as _};
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, thiserror::Error)]
25pub enum StoreError {
26    #[error("io: {0}")]
27    Io(#[from] io::Error),
28    #[error("malformed key: {0}")]
29    BadKey(String),
30    #[error("metadata decode failed: {0}")]
31    Meta(#[from] serde_json::Error),
32    #[error(
33        "integrity check failed for key {key}: stored payload hash {actual} != expected {expected}"
34    )]
35    Integrity {
36        key: String,
37        expected: String,
38        actual: String,
39    },
40}
41
42/// Lowercase hex blake3 digest. Wrapping in a newtype so we can't confuse
43/// a payload-hash with the cache key (which hashes inputs, not outputs).
44#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct Key(pub String);
46
47impl Key {
48    pub fn from_bytes(bytes: &[u8]) -> Self {
49        Key(blake3::hash(bytes).to_hex().to_string())
50    }
51
52    pub fn as_str(&self) -> &str {
53        &self.0
54    }
55
56    fn validate(&self) -> Result<(), StoreError> {
57        if self.0.len() != 64 || !self.0.chars().all(|c| c.is_ascii_hexdigit()) {
58            return Err(StoreError::BadKey(self.0.clone()));
59        }
60        Ok(())
61    }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65pub struct PayloadMeta {
66    /// blake3 of the raw payload bytes; recorded so reads can detect a
67    /// torn write or external file corruption rather than silently
68    /// returning bad bytes.
69    pub payload_hash: String,
70    /// Length of the payload bytes.
71    pub bytes: u64,
72    /// Tool kind tag, free-form ("read", "bash", etc.) — used for
73    /// telemetry and for tool-specific revalidation logic that the
74    /// runtime layer applies on green hits.
75    pub tool_kind: String,
76    /// File dependencies of this entry. Each entry pairs a path with the
77    /// blake3 of that file at the time the cache entry was written; on
78    /// every green-hit lookup the file is re-blake3'd and the entry is
79    /// invalidated on mismatch. Stored on disk so a fresh process can
80    /// restore the cache state without depending on an in-memory
81    /// registry that does not survive restart.
82    #[serde(default)]
83    pub file_roots: Vec<FileRootSerde>,
84    /// Upstream cache-key dependencies. For LlmCall entries this is the
85    /// set of tool-call cache keys whose results appeared in the
86    /// prompt's `tool_result` blocks; when one of those tool entries is
87    /// invalidated by a file edit, every LlmCall whose upstream set
88    /// contains that key is invalidated too. Tool-call entries
89    /// typically leave this empty (their dependencies are encoded in
90    /// `file_roots`); future M3+ extensions may use it for nested
91    /// composite nodes.
92    #[serde(default)]
93    pub upstream_keys: Vec<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct FileRootSerde {
98    pub path: String,
99    pub expected_hash: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Payload {
104    pub bytes: Vec<u8>,
105    pub meta: PayloadMeta,
106}
107
108/// Storage backend trait. Concrete impls: `FileStore` (local
109/// content-addressed disk), and (M4 step 7+) `RemoteStore` over HTTP.
110/// `LiveCache` owns one `Box<dyn Store>` and routes every
111/// content-addressed read/write through this trait so the same cache
112/// state machine works against either backend without conditional
113/// compilation in the runtime layer.
114pub trait Store: Send + Sync {
115    fn persist_with_upstreams(
116        &self,
117        key: &Key,
118        bytes: &[u8],
119        tool_kind: &str,
120        file_roots: Vec<FileRootSerde>,
121        upstream_keys: Vec<String>,
122    ) -> Result<(), StoreError>;
123
124    fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError>;
125
126    fn remove(&self, key: &Key) -> Result<(), StoreError>;
127
128    fn total_bytes(&self) -> Result<u64, StoreError>;
129
130    fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError>;
131
132    fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError>;
133
134    fn contains(&self, key: &Key) -> bool;
135
136    fn persist(
137        &self,
138        key: &Key,
139        bytes: &[u8],
140        tool_kind: &str,
141        file_roots: Vec<FileRootSerde>,
142    ) -> Result<(), StoreError> {
143        self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
144    }
145}
146
147#[derive(Debug)]
148pub struct FileStore {
149    root: PathBuf,
150}
151
152impl FileStore {
153    pub fn open(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
154        let root = root.into();
155        fs::create_dir_all(&root)?;
156        Ok(Self { root })
157    }
158
159    pub fn root(&self) -> &Path {
160        &self.root
161    }
162
163    fn shard_dir(&self, key: &Key) -> PathBuf {
164        self.root.join(&key.0[..2])
165    }
166
167    fn payload_path(&self, key: &Key) -> PathBuf {
168        self.shard_dir(key).join(format!("{}.payload", key.0))
169    }
170
171    fn meta_path(&self, key: &Key) -> PathBuf {
172        self.shard_dir(key).join(format!("{}.meta.json", key.0))
173    }
174
175    /// Write a payload. Uses tempfile + rename so a crash mid-write leaves
176    /// the store in a consistent state (either the entry is fully there
177    /// or it is not), which keeps `lookup` from ever observing torn data.
178    pub fn persist(
179        &self,
180        key: &Key,
181        bytes: &[u8],
182        tool_kind: &str,
183        file_roots: Vec<FileRootSerde>,
184    ) -> Result<(), StoreError> {
185        self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
186    }
187
188    /// Persist a payload that depends on previously-cached upstream
189    /// entries. The LlmCall path uses this to record which tool-call
190    /// keys it consumed, so downstream invalidation can walk the
191    /// dependency edge and drop dependent entries when a tool key is
192    /// marked dirty.
193    pub fn persist_with_upstreams(
194        &self,
195        key: &Key,
196        bytes: &[u8],
197        tool_kind: &str,
198        file_roots: Vec<FileRootSerde>,
199        upstream_keys: Vec<String>,
200    ) -> Result<(), StoreError> {
201        key.validate()?;
202        fs::create_dir_all(self.shard_dir(key))?;
203
204        let payload_hash = blake3::hash(bytes).to_hex().to_string();
205        let meta = PayloadMeta {
206            payload_hash,
207            bytes: bytes.len() as u64,
208            tool_kind: tool_kind.to_string(),
209            file_roots,
210            upstream_keys,
211        };
212
213        write_atomic(&self.payload_path(key), bytes)?;
214        let meta_bytes = serde_json::to_vec(&meta)?;
215        write_atomic(&self.meta_path(key), &meta_bytes)?;
216        Ok(())
217    }
218
219    /// Delete the payload + meta for a key. Used by `LiveCache` when an
220    /// upstream invalidation makes a cached entry definitely-stale; the
221    /// caller has already removed the registry entry, and this drops
222    /// the bytes from disk so a future rehydration does not resurrect
223    /// the entry.
224    pub fn remove(&self, key: &Key) -> Result<(), StoreError> {
225        key.validate()?;
226        let pp = self.payload_path(key);
227        let mp = self.meta_path(key);
228        if pp.exists() {
229            fs::remove_file(&pp)?;
230        }
231        if mp.exists() {
232            fs::remove_file(&mp)?;
233        }
234        Ok(())
235    }
236
237    /// Total bytes occupied by the store, summed across every payload
238    /// and meta file under the root. Used by `evict_to_cap` and by
239    /// operator-facing stats; cheap on small stores, linear-walk on
240    /// large ones (we accept the cost because eviction is a periodic
241    /// operation, not a hot path).
242    pub fn total_bytes(&self) -> Result<u64, StoreError> {
243        let mut total: u64 = 0;
244        let entries = match fs::read_dir(&self.root) {
245            Ok(e) => e,
246            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
247            Err(e) => return Err(e.into()),
248        };
249        for shard in entries.flatten() {
250            let shard_path = shard.path();
251            if !shard_path.is_dir() {
252                continue;
253            }
254            for entry in fs::read_dir(&shard_path)?.flatten() {
255                #[allow(clippy::collapsible_if)]
256                if let Ok(md) = entry.metadata() {
257                    if md.is_file() {
258                        total = total.saturating_add(md.len());
259                    }
260                }
261            }
262        }
263        Ok(total)
264    }
265
266    /// Evict oldest entries until `total_bytes() <= cap_bytes`. Order
267    /// is by payload-file `mtime` ascending so the least-recently-
268    /// modified entry is removed first; on filesystems where reads
269    /// update atime but not mtime this is a true write-order eviction
270    /// (entries that have never been re-persisted go first), which is
271    /// the right default for an append-only cache because a hot key
272    /// gets re-persisted on every backend fall-through and a cold key
273    /// does not. Returns the number of entries dropped.
274    ///
275    /// The function intentionally does not touch the in-memory
276    /// `LiveCache` registry; the caller is expected to either call
277    /// this at process startup (before `LiveCache::new` rehydrates)
278    /// or to recreate the cache afterwards.
279    pub fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
280        let mut current = self.total_bytes()?;
281        if current <= cap_bytes {
282            return Ok(0);
283        }
284
285        let mut entries: Vec<(std::time::SystemTime, Key, u64)> = Vec::new();
286        let dir = match fs::read_dir(&self.root) {
287            Ok(e) => e,
288            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
289            Err(e) => return Err(e.into()),
290        };
291        for shard in dir.flatten() {
292            let shard_path = shard.path();
293            if !shard_path.is_dir() {
294                continue;
295            }
296            for entry in fs::read_dir(&shard_path)?.flatten() {
297                let p = entry.path();
298                let name = match p.file_name().and_then(|n| n.to_str()) {
299                    Some(s) => s.to_string(),
300                    None => continue,
301                };
302                if let Some(stem) = name.strip_suffix(".payload") {
303                    let key = Key(stem.to_string());
304                    if key.validate().is_err() {
305                        continue;
306                    }
307                    let md = entry.metadata()?;
308                    let payload_len = md.len();
309                    let meta_len = fs::metadata(self.meta_path(&key))
310                        .map(|m| m.len())
311                        .unwrap_or(0);
312                    let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
313                    entries.push((mtime, key, payload_len + meta_len));
314                } else if let Some(stem) = name.strip_suffix(".meta.json") {
315                    // A meta.json whose sibling payload is absent is the
316                    // half-written state a crash between the two atomic
317                    // renames leaves; it still occupies disk, so count
318                    // and evict it rather than over-evicting healthy
319                    // entries to meet the cap.
320                    let key = Key(stem.to_string());
321                    if key.validate().is_err() || self.payload_path(&key).exists() {
322                        continue;
323                    }
324                    let md = entry.metadata()?;
325                    let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
326                    entries.push((mtime, key, md.len()));
327                }
328            }
329        }
330        // Secondary sort by store key: filesystem mtime granularity
331        // collapses many same-session writes to identical timestamps,
332        // so without a deterministic tiebreaker the eviction victim at
333        // the cap boundary varies across runs.
334        entries.sort_by(|(ta, ka, _), (tb, kb, _)| ta.cmp(tb).then_with(|| ka.0.cmp(&kb.0)));
335
336        let mut dropped = 0usize;
337        for (_, key, size) in entries {
338            if current <= cap_bytes {
339                break;
340            }
341            if self.remove(&key).is_ok() {
342                current = current.saturating_sub(size);
343                dropped += 1;
344            }
345        }
346        Ok(dropped)
347    }
348
349    /// Iterate every (key, meta) pair in the store. Used by `LiveCache`
350    /// to rehydrate its in-memory registry on startup so cross-process
351    /// cache hits work — without this, a fresh MCP server would see an
352    /// empty registry and miss every lookup until it re-populated each
353    /// entry from scratch.
354    pub fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
355        let mut out = Vec::new();
356        let entries = match fs::read_dir(&self.root) {
357            Ok(e) => e,
358            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
359            Err(e) => return Err(e.into()),
360        };
361        for shard in entries.flatten() {
362            let shard_path = shard.path();
363            if !shard_path.is_dir() {
364                continue;
365            }
366            for entry in fs::read_dir(&shard_path)?.flatten() {
367                let p = entry.path();
368                let name = match p.file_name().and_then(|n| n.to_str()) {
369                    Some(s) if s.ends_with(".meta.json") => s.to_string(),
370                    _ => continue,
371                };
372                let key_hex = name.trim_end_matches(".meta.json").to_string();
373                let key = Key(key_hex);
374                if key.validate().is_err() {
375                    continue;
376                }
377                let meta: PayloadMeta = match fs::read(&p)
378                    .ok()
379                    .and_then(|b| serde_json::from_slice(&b).ok())
380                {
381                    Some(m) => m,
382                    None => continue,
383                };
384                out.push((key, meta));
385            }
386        }
387        Ok(out)
388    }
389
390    /// Look up a payload. Returns `None` if absent. Returns
391    /// `StoreError::Integrity` if the payload bytes on disk do not match
392    /// the recorded hash, which indicates corruption (torn write that
393    /// somehow survived, on-disk tamper, hardware fault) and must not
394    /// silently return wrong bytes to the model.
395    pub fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
396        key.validate()?;
397        let pp = self.payload_path(key);
398        let mp = self.meta_path(key);
399        // A NotFound on either file is a benign race: eviction or
400        // another process removed one sibling between the two reads
401        // (or between this check and the read). Treat it as a cache
402        // miss rather than surfacing a hard io error to the caller.
403        let mut bytes = Vec::new();
404        match fs::File::open(&pp).and_then(|mut f| f.read_to_end(&mut bytes)) {
405            Ok(_) => {}
406            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
407            Err(e) => return Err(e.into()),
408        }
409        let meta_bytes = match fs::read(&mp) {
410            Ok(b) => b,
411            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
412            Err(e) => return Err(e.into()),
413        };
414        let meta: PayloadMeta = serde_json::from_slice(&meta_bytes)?;
415
416        let actual = blake3::hash(&bytes).to_hex().to_string();
417        if actual != meta.payload_hash {
418            return Err(StoreError::Integrity {
419                key: key.0.clone(),
420                expected: meta.payload_hash.clone(),
421                actual,
422            });
423        }
424        Ok(Some(Payload { bytes, meta }))
425    }
426
427    /// True if the key has a complete entry (both payload and meta).
428    /// Useful for tests and stats; the integrity check still fires on
429    /// `lookup`, so this is an existence-only check.
430    pub fn contains(&self, key: &Key) -> bool {
431        key.validate().is_ok() && self.payload_path(key).exists() && self.meta_path(key).exists()
432    }
433}
434
435impl Store for FileStore {
436    fn persist_with_upstreams(
437        &self,
438        key: &Key,
439        bytes: &[u8],
440        tool_kind: &str,
441        file_roots: Vec<FileRootSerde>,
442        upstream_keys: Vec<String>,
443    ) -> Result<(), StoreError> {
444        FileStore::persist_with_upstreams(self, key, bytes, tool_kind, file_roots, upstream_keys)
445    }
446
447    fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
448        FileStore::lookup(self, key)
449    }
450
451    fn remove(&self, key: &Key) -> Result<(), StoreError> {
452        FileStore::remove(self, key)
453    }
454
455    fn total_bytes(&self) -> Result<u64, StoreError> {
456        FileStore::total_bytes(self)
457    }
458
459    fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
460        FileStore::evict_to_cap(self, cap_bytes)
461    }
462
463    fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
464        FileStore::iter_meta(self)
465    }
466
467    fn contains(&self, key: &Key) -> bool {
468        FileStore::contains(self, key)
469    }
470}
471
472fn write_atomic(target: &Path, bytes: &[u8]) -> io::Result<()> {
473    // Write to a sibling temp file in the same directory so the rename is
474    // atomic on POSIX (same filesystem). Without this, a crash between
475    // open() and the final write could leave a half-written payload that
476    // a subsequent lookup would return as if intact.
477    let parent = target
478        .parent()
479        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "target has no parent"))?;
480    let mut guard = TempGuard::create_in(parent)?;
481    guard.file.write_all(bytes)?;
482    guard.file.flush()?;
483    guard.persist(target)
484}
485
486// Minimal in-tree temp-file guard. Holds a file handle plus its path; the
487// guard removes the file on drop so a failed write does not leak. Calling
488// `persist` consumes the guard, disarms the cleanup, and renames atomically
489// to the target. Without disarming we'd hit a race where the cleanup
490// removes the temp file before rename observes it, which is what the first
491// version of this helper was doing.
492struct TempGuard {
493    path: PathBuf,
494    file: fs::File,
495    armed: bool,
496}
497
498impl TempGuard {
499    fn create_in(dir: &Path) -> io::Result<Self> {
500        use std::sync::atomic::{AtomicU64, Ordering};
501        static COUNTER: AtomicU64 = AtomicU64::new(0);
502        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
503        let pid = std::process::id();
504        let path = dir.join(format!(".verdant-tmp-{pid}-{n}"));
505        let file = fs::OpenOptions::new()
506            .write(true)
507            .create_new(true)
508            .open(&path)?;
509        Ok(Self {
510            path,
511            file,
512            armed: true,
513        })
514    }
515
516    fn persist(mut self, target: &Path) -> io::Result<()> {
517        self.armed = false;
518        fs::rename(&self.path, target)
519    }
520}
521
522impl Drop for TempGuard {
523    fn drop(&mut self) {
524        if self.armed {
525            let _ = fs::remove_file(&self.path);
526        }
527    }
528}
529
530#[cfg(test)]
531mod tests {
532    use super::*;
533    use tempfile::TempDir;
534
535    fn store() -> (TempDir, FileStore) {
536        let dir = TempDir::new().unwrap();
537        let s = FileStore::open(dir.path().to_path_buf()).unwrap();
538        (dir, s)
539    }
540
541    #[test]
542    fn persist_then_lookup_roundtrip() {
543        let (_d, s) = store();
544        let k = Key::from_bytes(b"input-1");
545        s.persist(&k, b"hello world", "read", vec![]).unwrap();
546        let p = s.lookup(&k).unwrap().expect("must exist");
547        assert_eq!(p.bytes, b"hello world");
548        assert_eq!(p.meta.tool_kind, "read");
549        assert_eq!(p.meta.bytes, 11);
550    }
551
552    #[test]
553    fn lookup_missing_returns_none() {
554        let (_d, s) = store();
555        let k = Key::from_bytes(b"never-written");
556        assert!(s.lookup(&k).unwrap().is_none());
557    }
558
559    #[test]
560    fn integrity_violation_detected() {
561        let (_d, s) = store();
562        let k = Key::from_bytes(b"input-2");
563        s.persist(&k, b"trusted", "read", vec![]).unwrap();
564        // Corrupt the payload on disk under the store's feet — a real
565        // failure would be hardware-induced or external tamper, but a
566        // direct overwrite is the cheapest reproducible test.
567        let pp = s.root.join(&k.0[..2]).join(format!("{}.payload", k.0));
568        fs::write(&pp, b"tampered").unwrap();
569        let err = s.lookup(&k).expect_err("integrity must fire");
570        assert!(matches!(err, StoreError::Integrity { .. }));
571    }
572
573    #[test]
574    fn partial_write_only_meta_returns_none() {
575        // Simulates a case where meta landed but payload did not (or
576        // vice-versa); since we use atomic rename per file but the *pair*
577        // is not jointly atomic, a crash between the two renames can
578        // leave one orphan. lookup must treat that as cache miss, not
579        // partial data.
580        let (_d, s) = store();
581        let k = Key::from_bytes(b"input-3");
582        // Manually drop only a meta file.
583        fs::create_dir_all(s.root.join(&k.0[..2])).unwrap();
584        let mp = s.root.join(&k.0[..2]).join(format!("{}.meta.json", k.0));
585        fs::write(
586            &mp,
587            serde_json::to_vec(&PayloadMeta {
588                payload_hash: blake3::hash(b"orphan").to_hex().to_string(),
589                bytes: 6,
590                tool_kind: "read".into(),
591                file_roots: vec![],
592                upstream_keys: vec![],
593            })
594            .unwrap(),
595        )
596        .unwrap();
597        assert!(s.lookup(&k).unwrap().is_none());
598    }
599
600    #[test]
601    fn lookup_orphan_missing_payload_returns_none_not_err() {
602        // A crash between the two atomic renames (or eviction removing
603        // one sibling) leaves a meta with no payload. lookup must treat
604        // this benign race as a miss, not surface an io error.
605        let (_d, s) = store();
606        let k = Key::from_bytes(b"orphan-meta");
607        s.persist(&k, b"payload bytes", "read", vec![]).unwrap();
608        fs::remove_file(s.payload_path(&k)).unwrap();
609        assert!(
610            matches!(s.lookup(&k), Ok(None)),
611            "payload-missing/meta-present must be Ok(None)"
612        );
613    }
614
615    #[test]
616    fn lookup_orphan_missing_meta_returns_none_not_err() {
617        let (_d, s) = store();
618        let k = Key::from_bytes(b"orphan-payload");
619        s.persist(&k, b"payload bytes", "read", vec![]).unwrap();
620        fs::remove_file(s.meta_path(&k)).unwrap();
621        assert!(
622            matches!(s.lookup(&k), Ok(None)),
623            "meta-missing/payload-present must be Ok(None)"
624        );
625    }
626
627    #[test]
628    fn evict_reclaims_meta_only_orphans() {
629        // A meta.json whose sibling payload is absent still occupies
630        // disk. evict_to_cap must count and remove it, not over-evict
631        // healthy entries to compensate.
632        let (_d, s) = store();
633        let healthy = Key::from_bytes(b"healthy");
634        s.persist(&healthy, &[b'h'; 4096], "read", vec![]).unwrap();
635
636        let orphan = Key::from_bytes(b"orphan-entry");
637        fs::create_dir_all(s.shard_dir(&orphan)).unwrap();
638        let orphan_meta = serde_json::to_vec(&PayloadMeta {
639            payload_hash: blake3::hash(b"gone").to_hex().to_string(),
640            bytes: 4,
641            tool_kind: "read".into(),
642            file_roots: vec![],
643            upstream_keys: vec![],
644        })
645        .unwrap();
646        fs::write(s.meta_path(&orphan), &orphan_meta).unwrap();
647
648        let dropped = s.evict_to_cap(0).unwrap();
649        assert!(dropped >= 2, "both healthy entry and orphan must drop");
650        assert!(
651            !s.meta_path(&orphan).exists(),
652            "meta-only orphan must be removed"
653        );
654        assert_eq!(s.total_bytes().unwrap(), 0);
655    }
656
657    #[test]
658    fn evict_order_is_deterministic_for_equal_mtimes() {
659        // Filesystem mtime granularity collapses same-session writes to
660        // identical timestamps. With every entry sharing one mtime the
661        // primary sort key is a constant, so the eviction victim is
662        // fully determined by the secondary key-order tiebreaker: the
663        // lexicographically-smallest store keys must be the ones
664        // dropped, independent of read_dir enumeration order.
665        let fixed = std::time::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
666        let dir = TempDir::new().unwrap();
667        let s = FileStore::open(dir.path().to_path_buf()).unwrap();
668        let mut keys: Vec<Key> = (0..8)
669            .map(|i| Key::from_bytes(format!("dk{i}").as_bytes()))
670            .collect();
671        for k in &keys {
672            s.persist(k, &[b'x'; 4096], "read", vec![]).unwrap();
673            fs::File::options()
674                .write(true)
675                .open(s.payload_path(k))
676                .unwrap()
677                .set_modified(fixed)
678                .unwrap();
679        }
680        let before = s.total_bytes().unwrap();
681        let dropped = s.evict_to_cap(before / 2).unwrap();
682        assert!(dropped > 0, "eviction must drop at least one entry");
683
684        let evicted: std::collections::HashSet<String> = keys
685            .iter()
686            .filter(|k| s.lookup(k).unwrap().is_none())
687            .map(|k| k.0.clone())
688            .collect();
689        keys.sort_by(|a, b| a.0.cmp(&b.0));
690        let expected: std::collections::HashSet<String> = keys
691            .iter()
692            .take(evicted.len())
693            .map(|k| k.0.clone())
694            .collect();
695        assert_eq!(
696            evicted, expected,
697            "with equal mtimes the lowest store keys must be the deterministic victims"
698        );
699    }
700
701    #[test]
702    fn total_bytes_sums_payloads_and_meta() {
703        let (_d, s) = store();
704        assert_eq!(s.total_bytes().unwrap(), 0, "fresh store is zero bytes");
705        let k = Key::from_bytes(b"size-test");
706        s.persist(&k, &[b'x'; 1024], "read", vec![]).unwrap();
707        let bytes = s.total_bytes().unwrap();
708        assert!(bytes >= 1024, "payload alone is ≥1024, got {bytes}");
709    }
710
711    #[test]
712    fn evict_to_cap_drops_oldest_first() {
713        let (_d, s) = store();
714        // Persist four entries with distinct mtimes (sleep briefly so
715        // the filesystem mtime resolution doesn't collapse them).
716        let keys: Vec<Key> = (0..4)
717            .map(|i| Key::from_bytes(format!("k{i}").as_bytes()))
718            .collect();
719        for (i, k) in keys.iter().enumerate() {
720            s.persist(k, &[b'A' + i as u8; 4096], "read", vec![])
721                .unwrap();
722            std::thread::sleep(std::time::Duration::from_millis(20));
723        }
724        let before = s.total_bytes().unwrap();
725        assert!(before >= 4 * 4096);
726
727        // Cap to roughly two entries worth.
728        let cap = before / 2;
729        let dropped = s.evict_to_cap(cap).unwrap();
730        assert!(dropped >= 1, "should drop at least one entry");
731        let after = s.total_bytes().unwrap();
732        assert!(
733            after <= cap,
734            "after eviction must fit cap; got {after}/{cap}"
735        );
736
737        // The oldest key (k0) must be gone; the newest (k3) must
738        // still be present.
739        assert!(s.lookup(&keys[0]).unwrap().is_none(), "oldest must evict");
740        assert!(s.lookup(&keys[3]).unwrap().is_some(), "newest must survive");
741    }
742
743    #[test]
744    fn evict_below_cap_is_noop() {
745        let (_d, s) = store();
746        let k = Key::from_bytes(b"small");
747        s.persist(&k, b"tiny", "read", vec![]).unwrap();
748        let dropped = s.evict_to_cap(u64::MAX).unwrap();
749        assert_eq!(dropped, 0);
750        assert!(s.lookup(&k).unwrap().is_some());
751    }
752
753    #[test]
754    fn malformed_key_rejected() {
755        let (_d, s) = store();
756        let bad = Key("not-hex".to_string());
757        assert!(s.persist(&bad, b"x", "read", vec![]).is_err());
758        assert!(s.lookup(&bad).is_err());
759    }
760
761    #[test]
762    fn shard_dirs_distribute_keys() {
763        let (_d, s) = store();
764        for i in 0..16u8 {
765            let k = Key::from_bytes(&[i, i, i]);
766            s.persist(&k, &[i], "read", vec![]).unwrap();
767        }
768        // Count distinct two-char shard directories. With 16 random
769        // blake3 hashes the chance of every key collapsing into one
770        // shard is astronomically small; we assert that we have at
771        // least four distinct shards.
772        let mut shards = std::collections::HashSet::new();
773        for entry in fs::read_dir(s.root()).unwrap() {
774            let e = entry.unwrap();
775            if e.path().is_dir() {
776                shards.insert(e.file_name().to_string_lossy().to_string());
777            }
778        }
779        assert!(shards.len() >= 4, "shards = {shards:?}");
780    }
781
782    #[test]
783    fn contains_only_when_complete() {
784        let (_d, s) = store();
785        let k = Key::from_bytes(b"x");
786        assert!(!s.contains(&k));
787        s.persist(&k, b"y", "read", vec![]).unwrap();
788        assert!(s.contains(&k));
789    }
790}