verdant-cache-runtime 0.1.0

Live cache runtime for the verdant agent-loop cache: content-addressed payload store + DDG
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
//! Content-addressed payload store. The cache key is the blake3 hash of
//! the canonical *input* bytes for a tool call (not the output); the value
//! is the exact output payload bytes that the MCP tool fed back to the
//! model on its original execution. The store is append-only for M1; M2
//! will add eviction.
//!
//! Layout on disk:
//!
//! ```text
//! <root>/
//!   ab/                       # first two hex chars of the key
//!     ab12cd...ef.payload     # raw bytes
//!     ab12cd...ef.meta.json   # invalidation metadata (raw_hash, size, kind)
//! ```
//!
//! Keys are 64-char lowercase hex strings; we shard by the first two chars
//! so a single project does not produce a directory with 100k+ entries.

use serde::{Deserialize, Serialize};
use std::fs;
use std::io::{self, Read as _, Write as _};
use std::path::{Path, PathBuf};

#[derive(Debug, thiserror::Error)]
pub enum StoreError {
    #[error("io: {0}")]
    Io(#[from] io::Error),
    #[error("malformed key: {0}")]
    BadKey(String),
    #[error("metadata decode failed: {0}")]
    Meta(#[from] serde_json::Error),
    #[error(
        "integrity check failed for key {key}: stored payload hash {actual} != expected {expected}"
    )]
    Integrity {
        key: String,
        expected: String,
        actual: String,
    },
}

/// Lowercase hex blake3 digest. Wrapping in a newtype so we can't confuse
/// a payload-hash with the cache key (which hashes inputs, not outputs).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Key(pub String);

impl Key {
    pub fn from_bytes(bytes: &[u8]) -> Self {
        Key(blake3::hash(bytes).to_hex().to_string())
    }

    pub fn as_str(&self) -> &str {
        &self.0
    }

    fn validate(&self) -> Result<(), StoreError> {
        if self.0.len() != 64 || !self.0.chars().all(|c| c.is_ascii_hexdigit()) {
            return Err(StoreError::BadKey(self.0.clone()));
        }
        Ok(())
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PayloadMeta {
    /// blake3 of the raw payload bytes; recorded so reads can detect a
    /// torn write or external file corruption rather than silently
    /// returning bad bytes.
    pub payload_hash: String,
    /// Length of the payload bytes.
    pub bytes: u64,
    /// Tool kind tag, free-form ("read", "bash", etc.) — used for
    /// telemetry and for tool-specific revalidation logic that the
    /// runtime layer applies on green hits.
    pub tool_kind: String,
    /// File dependencies of this entry. Each entry pairs a path with the
    /// blake3 of that file at the time the cache entry was written; on
    /// every green-hit lookup the file is re-blake3'd and the entry is
    /// invalidated on mismatch. Stored on disk so a fresh process can
    /// restore the cache state without depending on an in-memory
    /// registry that does not survive restart.
    #[serde(default)]
    pub file_roots: Vec<FileRootSerde>,
    /// Upstream cache-key dependencies. For LlmCall entries this is the
    /// set of tool-call cache keys whose results appeared in the
    /// prompt's `tool_result` blocks; when one of those tool entries is
    /// invalidated by a file edit, every LlmCall whose upstream set
    /// contains that key is invalidated too. Tool-call entries
    /// typically leave this empty (their dependencies are encoded in
    /// `file_roots`); future M3+ extensions may use it for nested
    /// composite nodes.
    #[serde(default)]
    pub upstream_keys: Vec<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileRootSerde {
    pub path: String,
    pub expected_hash: String,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Payload {
    pub bytes: Vec<u8>,
    pub meta: PayloadMeta,
}

/// Storage backend trait. Concrete impls: `FileStore` (local
/// content-addressed disk), and (M4 step 7+) `RemoteStore` over HTTP.
/// `LiveCache` owns one `Box<dyn Store>` and routes every
/// content-addressed read/write through this trait so the same cache
/// state machine works against either backend without conditional
/// compilation in the runtime layer.
pub trait Store: Send + Sync {
    fn persist_with_upstreams(
        &self,
        key: &Key,
        bytes: &[u8],
        tool_kind: &str,
        file_roots: Vec<FileRootSerde>,
        upstream_keys: Vec<String>,
    ) -> Result<(), StoreError>;

    fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError>;

    fn remove(&self, key: &Key) -> Result<(), StoreError>;

    fn total_bytes(&self) -> Result<u64, StoreError>;

    fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError>;

    fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError>;

    fn contains(&self, key: &Key) -> bool;

    fn persist(
        &self,
        key: &Key,
        bytes: &[u8],
        tool_kind: &str,
        file_roots: Vec<FileRootSerde>,
    ) -> Result<(), StoreError> {
        self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
    }
}

#[derive(Debug)]
pub struct FileStore {
    root: PathBuf,
}

impl FileStore {
    pub fn open(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
        let root = root.into();
        fs::create_dir_all(&root)?;
        Ok(Self { root })
    }

    pub fn root(&self) -> &Path {
        &self.root
    }

    fn shard_dir(&self, key: &Key) -> PathBuf {
        self.root.join(&key.0[..2])
    }

    fn payload_path(&self, key: &Key) -> PathBuf {
        self.shard_dir(key).join(format!("{}.payload", key.0))
    }

    fn meta_path(&self, key: &Key) -> PathBuf {
        self.shard_dir(key).join(format!("{}.meta.json", key.0))
    }

    /// Write a payload. Uses tempfile + rename so a crash mid-write leaves
    /// the store in a consistent state (either the entry is fully there
    /// or it is not), which keeps `lookup` from ever observing torn data.
    pub fn persist(
        &self,
        key: &Key,
        bytes: &[u8],
        tool_kind: &str,
        file_roots: Vec<FileRootSerde>,
    ) -> Result<(), StoreError> {
        self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
    }

    /// Persist a payload that depends on previously-cached upstream
    /// entries. The LlmCall path uses this to record which tool-call
    /// keys it consumed, so downstream invalidation can walk the
    /// dependency edge and drop dependent entries when a tool key is
    /// marked dirty.
    pub fn persist_with_upstreams(
        &self,
        key: &Key,
        bytes: &[u8],
        tool_kind: &str,
        file_roots: Vec<FileRootSerde>,
        upstream_keys: Vec<String>,
    ) -> Result<(), StoreError> {
        key.validate()?;
        fs::create_dir_all(self.shard_dir(key))?;

        let payload_hash = blake3::hash(bytes).to_hex().to_string();
        let meta = PayloadMeta {
            payload_hash,
            bytes: bytes.len() as u64,
            tool_kind: tool_kind.to_string(),
            file_roots,
            upstream_keys,
        };

        write_atomic(&self.payload_path(key), bytes)?;
        let meta_bytes = serde_json::to_vec(&meta)?;
        write_atomic(&self.meta_path(key), &meta_bytes)?;
        Ok(())
    }

    /// Delete the payload + meta for a key. Used by `LiveCache` when an
    /// upstream invalidation makes a cached entry definitely-stale; the
    /// caller has already removed the registry entry, and this drops
    /// the bytes from disk so a future rehydration does not resurrect
    /// the entry.
    pub fn remove(&self, key: &Key) -> Result<(), StoreError> {
        key.validate()?;
        let pp = self.payload_path(key);
        let mp = self.meta_path(key);
        if pp.exists() {
            fs::remove_file(&pp)?;
        }
        if mp.exists() {
            fs::remove_file(&mp)?;
        }
        Ok(())
    }

    /// Total bytes occupied by the store, summed across every payload
    /// and meta file under the root. Used by `evict_to_cap` and by
    /// operator-facing stats; cheap on small stores, linear-walk on
    /// large ones (we accept the cost because eviction is a periodic
    /// operation, not a hot path).
    pub fn total_bytes(&self) -> Result<u64, StoreError> {
        let mut total: u64 = 0;
        let entries = match fs::read_dir(&self.root) {
            Ok(e) => e,
            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
            Err(e) => return Err(e.into()),
        };
        for shard in entries.flatten() {
            let shard_path = shard.path();
            if !shard_path.is_dir() {
                continue;
            }
            for entry in fs::read_dir(&shard_path)?.flatten() {
                if let Ok(md) = entry.metadata() {
                    if md.is_file() {
                        total = total.saturating_add(md.len());
                    }
                }
            }
        }
        Ok(total)
    }

    /// Evict oldest entries until `total_bytes() <= cap_bytes`. Order
    /// is by payload-file `mtime` ascending so the least-recently-
    /// modified entry is removed first; on filesystems where reads
    /// update atime but not mtime this is a true write-order eviction
    /// (entries that have never been re-persisted go first), which is
    /// the right default for an append-only cache because a hot key
    /// gets re-persisted on every backend fall-through and a cold key
    /// does not. Returns the number of entries dropped.
    ///
    /// The function intentionally does not touch the in-memory
    /// `LiveCache` registry; the caller is expected to either call
    /// this at process startup (before `LiveCache::new` rehydrates)
    /// or to recreate the cache afterwards.
    pub fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
        let mut current = self.total_bytes()?;
        if current <= cap_bytes {
            return Ok(0);
        }

        // Collect (mtime, key) pairs and sort by mtime ascending.
        let mut entries: Vec<(std::time::SystemTime, Key, u64)> = Vec::new();
        let dir = match fs::read_dir(&self.root) {
            Ok(e) => e,
            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
            Err(e) => return Err(e.into()),
        };
        for shard in dir.flatten() {
            let shard_path = shard.path();
            if !shard_path.is_dir() {
                continue;
            }
            for entry in fs::read_dir(&shard_path)?.flatten() {
                let p = entry.path();
                let name = match p.file_name().and_then(|n| n.to_str()) {
                    Some(s) if s.ends_with(".payload") => s.to_string(),
                    _ => continue,
                };
                let key_hex = name.trim_end_matches(".payload").to_string();
                let key = Key(key_hex);
                if key.validate().is_err() {
                    continue;
                }
                let md = entry.metadata()?;
                let payload_len = md.len();
                let meta_len = fs::metadata(self.meta_path(&key))
                    .map(|m| m.len())
                    .unwrap_or(0);
                let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
                entries.push((mtime, key, payload_len + meta_len));
            }
        }
        entries.sort_by_key(|(t, _, _)| *t);

        let mut dropped = 0usize;
        for (_, key, size) in entries {
            if current <= cap_bytes {
                break;
            }
            if self.remove(&key).is_ok() {
                current = current.saturating_sub(size);
                dropped += 1;
            }
        }
        Ok(dropped)
    }

    /// Iterate every (key, meta) pair in the store. Used by `LiveCache`
    /// to rehydrate its in-memory registry on startup so cross-process
    /// cache hits work — without this, a fresh MCP server would see an
    /// empty registry and miss every lookup until it re-populated each
    /// entry from scratch.
    pub fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
        let mut out = Vec::new();
        let entries = match fs::read_dir(&self.root) {
            Ok(e) => e,
            Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
            Err(e) => return Err(e.into()),
        };
        for shard in entries.flatten() {
            let shard_path = shard.path();
            if !shard_path.is_dir() {
                continue;
            }
            for entry in fs::read_dir(&shard_path)?.flatten() {
                let p = entry.path();
                let name = match p.file_name().and_then(|n| n.to_str()) {
                    Some(s) if s.ends_with(".meta.json") => s.to_string(),
                    _ => continue,
                };
                let key_hex = name.trim_end_matches(".meta.json").to_string();
                let key = Key(key_hex);
                if key.validate().is_err() {
                    continue;
                }
                let meta: PayloadMeta = match fs::read(&p)
                    .ok()
                    .and_then(|b| serde_json::from_slice(&b).ok())
                {
                    Some(m) => m,
                    None => continue,
                };
                out.push((key, meta));
            }
        }
        Ok(out)
    }

    /// Look up a payload. Returns `None` if absent. Returns
    /// `StoreError::Integrity` if the payload bytes on disk do not match
    /// the recorded hash, which indicates corruption (torn write that
    /// somehow survived, on-disk tamper, hardware fault) and must not
    /// silently return wrong bytes to the model.
    pub fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
        key.validate()?;
        let pp = self.payload_path(key);
        let mp = self.meta_path(key);
        if !pp.exists() || !mp.exists() {
            return Ok(None);
        }
        let mut bytes = Vec::new();
        fs::File::open(&pp)?.read_to_end(&mut bytes)?;
        let meta: PayloadMeta = serde_json::from_slice(&fs::read(&mp)?)?;

        let actual = blake3::hash(&bytes).to_hex().to_string();
        if actual != meta.payload_hash {
            return Err(StoreError::Integrity {
                key: key.0.clone(),
                expected: meta.payload_hash.clone(),
                actual,
            });
        }
        Ok(Some(Payload { bytes, meta }))
    }

    /// True if the key has a complete entry (both payload and meta).
    /// Useful for tests and stats; the integrity check still fires on
    /// `lookup`, so this is an existence-only check.
    pub fn contains(&self, key: &Key) -> bool {
        key.validate().is_ok() && self.payload_path(key).exists() && self.meta_path(key).exists()
    }
}

impl Store for FileStore {
    fn persist_with_upstreams(
        &self,
        key: &Key,
        bytes: &[u8],
        tool_kind: &str,
        file_roots: Vec<FileRootSerde>,
        upstream_keys: Vec<String>,
    ) -> Result<(), StoreError> {
        FileStore::persist_with_upstreams(self, key, bytes, tool_kind, file_roots, upstream_keys)
    }

    fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
        FileStore::lookup(self, key)
    }

    fn remove(&self, key: &Key) -> Result<(), StoreError> {
        FileStore::remove(self, key)
    }

    fn total_bytes(&self) -> Result<u64, StoreError> {
        FileStore::total_bytes(self)
    }

    fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
        FileStore::evict_to_cap(self, cap_bytes)
    }

    fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
        FileStore::iter_meta(self)
    }

    fn contains(&self, key: &Key) -> bool {
        FileStore::contains(self, key)
    }
}

fn write_atomic(target: &Path, bytes: &[u8]) -> io::Result<()> {
    // Write to a sibling temp file in the same directory so the rename is
    // atomic on POSIX (same filesystem). Without this, a crash between
    // open() and the final write could leave a half-written payload that
    // a subsequent lookup would return as if intact.
    let parent = target
        .parent()
        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "target has no parent"))?;
    let mut guard = TempGuard::create_in(parent)?;
    guard.file.write_all(bytes)?;
    guard.file.flush()?;
    guard.persist(target)
}

// Minimal in-tree temp-file guard. Holds a file handle plus its path; the
// guard removes the file on drop so a failed write does not leak. Calling
// `persist` consumes the guard, disarms the cleanup, and renames atomically
// to the target. Without disarming we'd hit a race where the cleanup
// removes the temp file before rename observes it, which is what the first
// version of this helper was doing.
struct TempGuard {
    path: PathBuf,
    file: fs::File,
    armed: bool,
}

impl TempGuard {
    fn create_in(dir: &Path) -> io::Result<Self> {
        use std::sync::atomic::{AtomicU64, Ordering};
        static COUNTER: AtomicU64 = AtomicU64::new(0);
        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
        let pid = std::process::id();
        let path = dir.join(format!(".verdant-tmp-{pid}-{n}"));
        let file = fs::OpenOptions::new()
            .write(true)
            .create_new(true)
            .open(&path)?;
        Ok(Self {
            path,
            file,
            armed: true,
        })
    }

    fn persist(mut self, target: &Path) -> io::Result<()> {
        self.armed = false;
        fs::rename(&self.path, target)
    }
}

impl Drop for TempGuard {
    fn drop(&mut self) {
        if self.armed {
            let _ = fs::remove_file(&self.path);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::TempDir;

    fn store() -> (TempDir, FileStore) {
        let dir = TempDir::new().unwrap();
        let s = FileStore::open(dir.path().to_path_buf()).unwrap();
        (dir, s)
    }

    #[test]
    fn persist_then_lookup_roundtrip() {
        let (_d, s) = store();
        let k = Key::from_bytes(b"input-1");
        s.persist(&k, b"hello world", "read", vec![]).unwrap();
        let p = s.lookup(&k).unwrap().expect("must exist");
        assert_eq!(p.bytes, b"hello world");
        assert_eq!(p.meta.tool_kind, "read");
        assert_eq!(p.meta.bytes, 11);
    }

    #[test]
    fn lookup_missing_returns_none() {
        let (_d, s) = store();
        let k = Key::from_bytes(b"never-written");
        assert!(s.lookup(&k).unwrap().is_none());
    }

    #[test]
    fn integrity_violation_detected() {
        let (_d, s) = store();
        let k = Key::from_bytes(b"input-2");
        s.persist(&k, b"trusted", "read", vec![]).unwrap();
        // Corrupt the payload on disk under the store's feet — a real
        // failure would be hardware-induced or external tamper, but a
        // direct overwrite is the cheapest reproducible test.
        let pp = s.root.join(&k.0[..2]).join(format!("{}.payload", k.0));
        fs::write(&pp, b"tampered").unwrap();
        let err = s.lookup(&k).err().expect("integrity must fire");
        assert!(matches!(err, StoreError::Integrity { .. }));
    }

    #[test]
    fn partial_write_only_meta_returns_none() {
        // Simulates a case where meta landed but payload did not (or
        // vice-versa); since we use atomic rename per file but the *pair*
        // is not jointly atomic, a crash between the two renames can
        // leave one orphan. lookup must treat that as cache miss, not
        // partial data.
        let (_d, s) = store();
        let k = Key::from_bytes(b"input-3");
        // Manually drop only a meta file.
        fs::create_dir_all(s.root.join(&k.0[..2])).unwrap();
        let mp = s.root.join(&k.0[..2]).join(format!("{}.meta.json", k.0));
        fs::write(
            &mp,
            serde_json::to_vec(&PayloadMeta {
                payload_hash: blake3::hash(b"orphan").to_hex().to_string(),
                bytes: 6,
                tool_kind: "read".into(),
                file_roots: vec![],
                upstream_keys: vec![],
            })
            .unwrap(),
        )
        .unwrap();
        assert!(s.lookup(&k).unwrap().is_none());
    }

    #[test]
    fn total_bytes_sums_payloads_and_meta() {
        let (_d, s) = store();
        assert_eq!(s.total_bytes().unwrap(), 0, "fresh store is zero bytes");
        let k = Key::from_bytes(b"size-test");
        s.persist(&k, &vec![b'x'; 1024], "read", vec![]).unwrap();
        let bytes = s.total_bytes().unwrap();
        assert!(bytes >= 1024, "payload alone is ≥1024, got {bytes}");
    }

    #[test]
    fn evict_to_cap_drops_oldest_first() {
        let (_d, s) = store();
        // Persist four entries with distinct mtimes (sleep briefly so
        // the filesystem mtime resolution doesn't collapse them).
        let keys: Vec<Key> = (0..4)
            .map(|i| Key::from_bytes(format!("k{i}").as_bytes()))
            .collect();
        for (i, k) in keys.iter().enumerate() {
            s.persist(k, &vec![b'A' + i as u8; 4096], "read", vec![])
                .unwrap();
            std::thread::sleep(std::time::Duration::from_millis(20));
        }
        let before = s.total_bytes().unwrap();
        assert!(before >= 4 * 4096);

        // Cap to roughly two entries worth.
        let cap = before / 2;
        let dropped = s.evict_to_cap(cap).unwrap();
        assert!(dropped >= 1, "should drop at least one entry");
        let after = s.total_bytes().unwrap();
        assert!(
            after <= cap,
            "after eviction must fit cap; got {after}/{cap}"
        );

        // The oldest key (k0) must be gone; the newest (k3) must
        // still be present.
        assert!(s.lookup(&keys[0]).unwrap().is_none(), "oldest must evict");
        assert!(s.lookup(&keys[3]).unwrap().is_some(), "newest must survive");
    }

    #[test]
    fn evict_below_cap_is_noop() {
        let (_d, s) = store();
        let k = Key::from_bytes(b"small");
        s.persist(&k, b"tiny", "read", vec![]).unwrap();
        let dropped = s.evict_to_cap(u64::MAX).unwrap();
        assert_eq!(dropped, 0);
        assert!(s.lookup(&k).unwrap().is_some());
    }

    #[test]
    fn malformed_key_rejected() {
        let (_d, s) = store();
        let bad = Key("not-hex".to_string());
        assert!(s.persist(&bad, b"x", "read", vec![]).is_err());
        assert!(s.lookup(&bad).is_err());
    }

    #[test]
    fn shard_dirs_distribute_keys() {
        let (_d, s) = store();
        for i in 0..16u8 {
            let k = Key::from_bytes(&[i, i, i]);
            s.persist(&k, &[i], "read", vec![]).unwrap();
        }
        // Count distinct two-char shard directories. With 16 random
        // blake3 hashes the chance of every key collapsing into one
        // shard is astronomically small; we assert that we have at
        // least four distinct shards.
        let mut shards = std::collections::HashSet::new();
        for entry in fs::read_dir(s.root()).unwrap() {
            let e = entry.unwrap();
            if e.path().is_dir() {
                shards.insert(e.file_name().to_string_lossy().to_string());
            }
        }
        assert!(shards.len() >= 4, "shards = {shards:?}");
    }

    #[test]
    fn contains_only_when_complete() {
        let (_d, s) = store();
        let k = Key::from_bytes(b"x");
        assert!(!s.contains(&k));
        s.persist(&k, b"y", "read", vec![]).unwrap();
        assert!(s.contains(&k));
    }
}