1use serde::{Deserialize, Serialize};
20use std::fs;
21use std::io::{self, Read as _, Write as _};
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, thiserror::Error)]
25pub enum StoreError {
26 #[error("io: {0}")]
27 Io(#[from] io::Error),
28 #[error("malformed key: {0}")]
29 BadKey(String),
30 #[error("metadata decode failed: {0}")]
31 Meta(#[from] serde_json::Error),
32 #[error(
33 "integrity check failed for key {key}: stored payload hash {actual} != expected {expected}"
34 )]
35 Integrity {
36 key: String,
37 expected: String,
38 actual: String,
39 },
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
45pub struct Key(pub String);
46
47impl Key {
48 pub fn from_bytes(bytes: &[u8]) -> Self {
49 Key(blake3::hash(bytes).to_hex().to_string())
50 }
51
52 pub fn as_str(&self) -> &str {
53 &self.0
54 }
55
56 fn validate(&self) -> Result<(), StoreError> {
57 if self.0.len() != 64 || !self.0.chars().all(|c| c.is_ascii_hexdigit()) {
58 return Err(StoreError::BadKey(self.0.clone()));
59 }
60 Ok(())
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65pub struct PayloadMeta {
66 pub payload_hash: String,
70 pub bytes: u64,
72 pub tool_kind: String,
76 #[serde(default)]
83 pub file_roots: Vec<FileRootSerde>,
84 #[serde(default)]
93 pub upstream_keys: Vec<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct FileRootSerde {
98 pub path: String,
99 pub expected_hash: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Payload {
104 pub bytes: Vec<u8>,
105 pub meta: PayloadMeta,
106}
107
108pub trait Store: Send + Sync {
115 fn persist_with_upstreams(
116 &self,
117 key: &Key,
118 bytes: &[u8],
119 tool_kind: &str,
120 file_roots: Vec<FileRootSerde>,
121 upstream_keys: Vec<String>,
122 ) -> Result<(), StoreError>;
123
124 fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError>;
125
126 fn remove(&self, key: &Key) -> Result<(), StoreError>;
127
128 fn total_bytes(&self) -> Result<u64, StoreError>;
129
130 fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError>;
131
132 fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError>;
133
134 fn contains(&self, key: &Key) -> bool;
135
136 fn persist(
137 &self,
138 key: &Key,
139 bytes: &[u8],
140 tool_kind: &str,
141 file_roots: Vec<FileRootSerde>,
142 ) -> Result<(), StoreError> {
143 self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
144 }
145}
146
147#[derive(Debug)]
148pub struct FileStore {
149 root: PathBuf,
150}
151
152impl FileStore {
153 pub fn open(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
154 let root = root.into();
155 fs::create_dir_all(&root)?;
156 Ok(Self { root })
157 }
158
159 pub fn root(&self) -> &Path {
160 &self.root
161 }
162
163 fn shard_dir(&self, key: &Key) -> PathBuf {
164 self.root.join(&key.0[..2])
165 }
166
167 fn payload_path(&self, key: &Key) -> PathBuf {
168 self.shard_dir(key).join(format!("{}.payload", key.0))
169 }
170
171 fn meta_path(&self, key: &Key) -> PathBuf {
172 self.shard_dir(key).join(format!("{}.meta.json", key.0))
173 }
174
175 pub fn persist(
179 &self,
180 key: &Key,
181 bytes: &[u8],
182 tool_kind: &str,
183 file_roots: Vec<FileRootSerde>,
184 ) -> Result<(), StoreError> {
185 self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
186 }
187
188 pub fn persist_with_upstreams(
194 &self,
195 key: &Key,
196 bytes: &[u8],
197 tool_kind: &str,
198 file_roots: Vec<FileRootSerde>,
199 upstream_keys: Vec<String>,
200 ) -> Result<(), StoreError> {
201 key.validate()?;
202 fs::create_dir_all(self.shard_dir(key))?;
203
204 let payload_hash = blake3::hash(bytes).to_hex().to_string();
205 let meta = PayloadMeta {
206 payload_hash,
207 bytes: bytes.len() as u64,
208 tool_kind: tool_kind.to_string(),
209 file_roots,
210 upstream_keys,
211 };
212
213 write_atomic(&self.payload_path(key), bytes)?;
214 let meta_bytes = serde_json::to_vec(&meta)?;
215 write_atomic(&self.meta_path(key), &meta_bytes)?;
216 Ok(())
217 }
218
219 pub fn remove(&self, key: &Key) -> Result<(), StoreError> {
225 key.validate()?;
226 let pp = self.payload_path(key);
227 let mp = self.meta_path(key);
228 if pp.exists() {
229 fs::remove_file(&pp)?;
230 }
231 if mp.exists() {
232 fs::remove_file(&mp)?;
233 }
234 Ok(())
235 }
236
237 pub fn total_bytes(&self) -> Result<u64, StoreError> {
243 let mut total: u64 = 0;
244 let entries = match fs::read_dir(&self.root) {
245 Ok(e) => e,
246 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
247 Err(e) => return Err(e.into()),
248 };
249 for shard in entries.flatten() {
250 let shard_path = shard.path();
251 if !shard_path.is_dir() {
252 continue;
253 }
254 for entry in fs::read_dir(&shard_path)?.flatten() {
255 #[allow(clippy::collapsible_if)]
256 if let Ok(md) = entry.metadata() {
257 if md.is_file() {
258 total = total.saturating_add(md.len());
259 }
260 }
261 }
262 }
263 Ok(total)
264 }
265
266 pub fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
280 let mut current = self.total_bytes()?;
281 if current <= cap_bytes {
282 return Ok(0);
283 }
284
285 let mut entries: Vec<(std::time::SystemTime, Key, u64)> = Vec::new();
286 let dir = match fs::read_dir(&self.root) {
287 Ok(e) => e,
288 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
289 Err(e) => return Err(e.into()),
290 };
291 for shard in dir.flatten() {
292 let shard_path = shard.path();
293 if !shard_path.is_dir() {
294 continue;
295 }
296 for entry in fs::read_dir(&shard_path)?.flatten() {
297 let p = entry.path();
298 let name = match p.file_name().and_then(|n| n.to_str()) {
299 Some(s) => s.to_string(),
300 None => continue,
301 };
302 if let Some(stem) = name.strip_suffix(".payload") {
303 let key = Key(stem.to_string());
304 if key.validate().is_err() {
305 continue;
306 }
307 let md = entry.metadata()?;
308 let payload_len = md.len();
309 let meta_len = fs::metadata(self.meta_path(&key))
310 .map(|m| m.len())
311 .unwrap_or(0);
312 let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
313 entries.push((mtime, key, payload_len + meta_len));
314 } else if let Some(stem) = name.strip_suffix(".meta.json") {
315 let key = Key(stem.to_string());
321 if key.validate().is_err() || self.payload_path(&key).exists() {
322 continue;
323 }
324 let md = entry.metadata()?;
325 let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
326 entries.push((mtime, key, md.len()));
327 }
328 }
329 }
330 entries.sort_by(|(ta, ka, _), (tb, kb, _)| ta.cmp(tb).then_with(|| ka.0.cmp(&kb.0)));
335
336 let mut dropped = 0usize;
337 for (_, key, size) in entries {
338 if current <= cap_bytes {
339 break;
340 }
341 if self.remove(&key).is_ok() {
342 current = current.saturating_sub(size);
343 dropped += 1;
344 }
345 }
346 Ok(dropped)
347 }
348
349 pub fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
355 let mut out = Vec::new();
356 let entries = match fs::read_dir(&self.root) {
357 Ok(e) => e,
358 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
359 Err(e) => return Err(e.into()),
360 };
361 for shard in entries.flatten() {
362 let shard_path = shard.path();
363 if !shard_path.is_dir() {
364 continue;
365 }
366 for entry in fs::read_dir(&shard_path)?.flatten() {
367 let p = entry.path();
368 let name = match p.file_name().and_then(|n| n.to_str()) {
369 Some(s) if s.ends_with(".meta.json") => s.to_string(),
370 _ => continue,
371 };
372 let key_hex = name.trim_end_matches(".meta.json").to_string();
373 let key = Key(key_hex);
374 if key.validate().is_err() {
375 continue;
376 }
377 let meta: PayloadMeta = match fs::read(&p)
378 .ok()
379 .and_then(|b| serde_json::from_slice(&b).ok())
380 {
381 Some(m) => m,
382 None => continue,
383 };
384 out.push((key, meta));
385 }
386 }
387 Ok(out)
388 }
389
390 pub fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
396 key.validate()?;
397 let pp = self.payload_path(key);
398 let mp = self.meta_path(key);
399 let mut bytes = Vec::new();
404 match fs::File::open(&pp).and_then(|mut f| f.read_to_end(&mut bytes)) {
405 Ok(_) => {}
406 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
407 Err(e) => return Err(e.into()),
408 }
409 let meta_bytes = match fs::read(&mp) {
410 Ok(b) => b,
411 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
412 Err(e) => return Err(e.into()),
413 };
414 let meta: PayloadMeta = serde_json::from_slice(&meta_bytes)?;
415
416 let actual = blake3::hash(&bytes).to_hex().to_string();
417 if actual != meta.payload_hash {
418 return Err(StoreError::Integrity {
419 key: key.0.clone(),
420 expected: meta.payload_hash.clone(),
421 actual,
422 });
423 }
424 Ok(Some(Payload { bytes, meta }))
425 }
426
427 pub fn contains(&self, key: &Key) -> bool {
431 key.validate().is_ok() && self.payload_path(key).exists() && self.meta_path(key).exists()
432 }
433}
434
435impl Store for FileStore {
436 fn persist_with_upstreams(
437 &self,
438 key: &Key,
439 bytes: &[u8],
440 tool_kind: &str,
441 file_roots: Vec<FileRootSerde>,
442 upstream_keys: Vec<String>,
443 ) -> Result<(), StoreError> {
444 FileStore::persist_with_upstreams(self, key, bytes, tool_kind, file_roots, upstream_keys)
445 }
446
447 fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
448 FileStore::lookup(self, key)
449 }
450
451 fn remove(&self, key: &Key) -> Result<(), StoreError> {
452 FileStore::remove(self, key)
453 }
454
455 fn total_bytes(&self) -> Result<u64, StoreError> {
456 FileStore::total_bytes(self)
457 }
458
459 fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
460 FileStore::evict_to_cap(self, cap_bytes)
461 }
462
463 fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
464 FileStore::iter_meta(self)
465 }
466
467 fn contains(&self, key: &Key) -> bool {
468 FileStore::contains(self, key)
469 }
470}
471
472fn write_atomic(target: &Path, bytes: &[u8]) -> io::Result<()> {
473 let parent = target
478 .parent()
479 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "target has no parent"))?;
480 let mut guard = TempGuard::create_in(parent)?;
481 guard.file.write_all(bytes)?;
482 guard.file.flush()?;
483 guard.persist(target)
484}
485
486struct TempGuard {
493 path: PathBuf,
494 file: fs::File,
495 armed: bool,
496}
497
498impl TempGuard {
499 fn create_in(dir: &Path) -> io::Result<Self> {
500 use std::sync::atomic::{AtomicU64, Ordering};
501 static COUNTER: AtomicU64 = AtomicU64::new(0);
502 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
503 let pid = std::process::id();
504 let path = dir.join(format!(".verdant-tmp-{pid}-{n}"));
505 let file = fs::OpenOptions::new()
506 .write(true)
507 .create_new(true)
508 .open(&path)?;
509 Ok(Self {
510 path,
511 file,
512 armed: true,
513 })
514 }
515
516 fn persist(mut self, target: &Path) -> io::Result<()> {
517 self.armed = false;
518 fs::rename(&self.path, target)
519 }
520}
521
522impl Drop for TempGuard {
523 fn drop(&mut self) {
524 if self.armed {
525 let _ = fs::remove_file(&self.path);
526 }
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use super::*;
533 use tempfile::TempDir;
534
535 fn store() -> (TempDir, FileStore) {
536 let dir = TempDir::new().unwrap();
537 let s = FileStore::open(dir.path().to_path_buf()).unwrap();
538 (dir, s)
539 }
540
541 #[test]
542 fn persist_then_lookup_roundtrip() {
543 let (_d, s) = store();
544 let k = Key::from_bytes(b"input-1");
545 s.persist(&k, b"hello world", "read", vec![]).unwrap();
546 let p = s.lookup(&k).unwrap().expect("must exist");
547 assert_eq!(p.bytes, b"hello world");
548 assert_eq!(p.meta.tool_kind, "read");
549 assert_eq!(p.meta.bytes, 11);
550 }
551
552 #[test]
553 fn lookup_missing_returns_none() {
554 let (_d, s) = store();
555 let k = Key::from_bytes(b"never-written");
556 assert!(s.lookup(&k).unwrap().is_none());
557 }
558
559 #[test]
560 fn integrity_violation_detected() {
561 let (_d, s) = store();
562 let k = Key::from_bytes(b"input-2");
563 s.persist(&k, b"trusted", "read", vec![]).unwrap();
564 let pp = s.root.join(&k.0[..2]).join(format!("{}.payload", k.0));
568 fs::write(&pp, b"tampered").unwrap();
569 let err = s.lookup(&k).expect_err("integrity must fire");
570 assert!(matches!(err, StoreError::Integrity { .. }));
571 }
572
573 #[test]
574 fn partial_write_only_meta_returns_none() {
575 let (_d, s) = store();
581 let k = Key::from_bytes(b"input-3");
582 fs::create_dir_all(s.root.join(&k.0[..2])).unwrap();
584 let mp = s.root.join(&k.0[..2]).join(format!("{}.meta.json", k.0));
585 fs::write(
586 &mp,
587 serde_json::to_vec(&PayloadMeta {
588 payload_hash: blake3::hash(b"orphan").to_hex().to_string(),
589 bytes: 6,
590 tool_kind: "read".into(),
591 file_roots: vec![],
592 upstream_keys: vec![],
593 })
594 .unwrap(),
595 )
596 .unwrap();
597 assert!(s.lookup(&k).unwrap().is_none());
598 }
599
600 #[test]
601 fn lookup_orphan_missing_payload_returns_none_not_err() {
602 let (_d, s) = store();
606 let k = Key::from_bytes(b"orphan-meta");
607 s.persist(&k, b"payload bytes", "read", vec![]).unwrap();
608 fs::remove_file(s.payload_path(&k)).unwrap();
609 assert!(
610 matches!(s.lookup(&k), Ok(None)),
611 "payload-missing/meta-present must be Ok(None)"
612 );
613 }
614
615 #[test]
616 fn lookup_orphan_missing_meta_returns_none_not_err() {
617 let (_d, s) = store();
618 let k = Key::from_bytes(b"orphan-payload");
619 s.persist(&k, b"payload bytes", "read", vec![]).unwrap();
620 fs::remove_file(s.meta_path(&k)).unwrap();
621 assert!(
622 matches!(s.lookup(&k), Ok(None)),
623 "meta-missing/payload-present must be Ok(None)"
624 );
625 }
626
627 #[test]
628 fn evict_reclaims_meta_only_orphans() {
629 let (_d, s) = store();
633 let healthy = Key::from_bytes(b"healthy");
634 s.persist(&healthy, &[b'h'; 4096], "read", vec![]).unwrap();
635
636 let orphan = Key::from_bytes(b"orphan-entry");
637 fs::create_dir_all(s.shard_dir(&orphan)).unwrap();
638 let orphan_meta = serde_json::to_vec(&PayloadMeta {
639 payload_hash: blake3::hash(b"gone").to_hex().to_string(),
640 bytes: 4,
641 tool_kind: "read".into(),
642 file_roots: vec![],
643 upstream_keys: vec![],
644 })
645 .unwrap();
646 fs::write(s.meta_path(&orphan), &orphan_meta).unwrap();
647
648 let dropped = s.evict_to_cap(0).unwrap();
649 assert!(dropped >= 2, "both healthy entry and orphan must drop");
650 assert!(
651 !s.meta_path(&orphan).exists(),
652 "meta-only orphan must be removed"
653 );
654 assert_eq!(s.total_bytes().unwrap(), 0);
655 }
656
657 #[test]
658 fn evict_order_is_deterministic_for_equal_mtimes() {
659 let fixed = std::time::UNIX_EPOCH + std::time::Duration::from_secs(1_700_000_000);
666 let dir = TempDir::new().unwrap();
667 let s = FileStore::open(dir.path().to_path_buf()).unwrap();
668 let mut keys: Vec<Key> = (0..8)
669 .map(|i| Key::from_bytes(format!("dk{i}").as_bytes()))
670 .collect();
671 for k in &keys {
672 s.persist(k, &[b'x'; 4096], "read", vec![]).unwrap();
673 fs::File::options()
674 .write(true)
675 .open(s.payload_path(k))
676 .unwrap()
677 .set_modified(fixed)
678 .unwrap();
679 }
680 let before = s.total_bytes().unwrap();
681 let dropped = s.evict_to_cap(before / 2).unwrap();
682 assert!(dropped > 0, "eviction must drop at least one entry");
683
684 let evicted: std::collections::HashSet<String> = keys
685 .iter()
686 .filter(|k| s.lookup(k).unwrap().is_none())
687 .map(|k| k.0.clone())
688 .collect();
689 keys.sort_by(|a, b| a.0.cmp(&b.0));
690 let expected: std::collections::HashSet<String> = keys
691 .iter()
692 .take(evicted.len())
693 .map(|k| k.0.clone())
694 .collect();
695 assert_eq!(
696 evicted, expected,
697 "with equal mtimes the lowest store keys must be the deterministic victims"
698 );
699 }
700
701 #[test]
702 fn total_bytes_sums_payloads_and_meta() {
703 let (_d, s) = store();
704 assert_eq!(s.total_bytes().unwrap(), 0, "fresh store is zero bytes");
705 let k = Key::from_bytes(b"size-test");
706 s.persist(&k, &[b'x'; 1024], "read", vec![]).unwrap();
707 let bytes = s.total_bytes().unwrap();
708 assert!(bytes >= 1024, "payload alone is ≥1024, got {bytes}");
709 }
710
711 #[test]
712 fn evict_to_cap_drops_oldest_first() {
713 let (_d, s) = store();
714 let keys: Vec<Key> = (0..4)
717 .map(|i| Key::from_bytes(format!("k{i}").as_bytes()))
718 .collect();
719 for (i, k) in keys.iter().enumerate() {
720 s.persist(k, &[b'A' + i as u8; 4096], "read", vec![])
721 .unwrap();
722 std::thread::sleep(std::time::Duration::from_millis(20));
723 }
724 let before = s.total_bytes().unwrap();
725 assert!(before >= 4 * 4096);
726
727 let cap = before / 2;
729 let dropped = s.evict_to_cap(cap).unwrap();
730 assert!(dropped >= 1, "should drop at least one entry");
731 let after = s.total_bytes().unwrap();
732 assert!(
733 after <= cap,
734 "after eviction must fit cap; got {after}/{cap}"
735 );
736
737 assert!(s.lookup(&keys[0]).unwrap().is_none(), "oldest must evict");
740 assert!(s.lookup(&keys[3]).unwrap().is_some(), "newest must survive");
741 }
742
743 #[test]
744 fn evict_below_cap_is_noop() {
745 let (_d, s) = store();
746 let k = Key::from_bytes(b"small");
747 s.persist(&k, b"tiny", "read", vec![]).unwrap();
748 let dropped = s.evict_to_cap(u64::MAX).unwrap();
749 assert_eq!(dropped, 0);
750 assert!(s.lookup(&k).unwrap().is_some());
751 }
752
753 #[test]
754 fn malformed_key_rejected() {
755 let (_d, s) = store();
756 let bad = Key("not-hex".to_string());
757 assert!(s.persist(&bad, b"x", "read", vec![]).is_err());
758 assert!(s.lookup(&bad).is_err());
759 }
760
761 #[test]
762 fn shard_dirs_distribute_keys() {
763 let (_d, s) = store();
764 for i in 0..16u8 {
765 let k = Key::from_bytes(&[i, i, i]);
766 s.persist(&k, &[i], "read", vec![]).unwrap();
767 }
768 let mut shards = std::collections::HashSet::new();
773 for entry in fs::read_dir(s.root()).unwrap() {
774 let e = entry.unwrap();
775 if e.path().is_dir() {
776 shards.insert(e.file_name().to_string_lossy().to_string());
777 }
778 }
779 assert!(shards.len() >= 4, "shards = {shards:?}");
780 }
781
782 #[test]
783 fn contains_only_when_complete() {
784 let (_d, s) = store();
785 let k = Key::from_bytes(b"x");
786 assert!(!s.contains(&k));
787 s.persist(&k, b"y", "read", vec![]).unwrap();
788 assert!(s.contains(&k));
789 }
790}