1use serde::{Deserialize, Serialize};
20use std::fs;
21use std::io::{self, Read as _, Write as _};
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, thiserror::Error)]
25pub enum StoreError {
26 #[error("io: {0}")]
27 Io(#[from] io::Error),
28 #[error("malformed key: {0}")]
29 BadKey(String),
30 #[error("metadata decode failed: {0}")]
31 Meta(#[from] serde_json::Error),
32 #[error(
33 "integrity check failed for key {key}: stored payload hash {actual} != expected {expected}"
34 )]
35 Integrity {
36 key: String,
37 expected: String,
38 actual: String,
39 },
40}
41
42#[derive(Debug, Clone, PartialEq, Eq, Hash)]
45pub struct Key(pub String);
46
47impl Key {
48 pub fn from_bytes(bytes: &[u8]) -> Self {
49 Key(blake3::hash(bytes).to_hex().to_string())
50 }
51
52 pub fn as_str(&self) -> &str {
53 &self.0
54 }
55
56 fn validate(&self) -> Result<(), StoreError> {
57 if self.0.len() != 64 || !self.0.chars().all(|c| c.is_ascii_hexdigit()) {
58 return Err(StoreError::BadKey(self.0.clone()));
59 }
60 Ok(())
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65pub struct PayloadMeta {
66 pub payload_hash: String,
70 pub bytes: u64,
72 pub tool_kind: String,
76 #[serde(default)]
83 pub file_roots: Vec<FileRootSerde>,
84 #[serde(default)]
93 pub upstream_keys: Vec<String>,
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
97pub struct FileRootSerde {
98 pub path: String,
99 pub expected_hash: String,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq)]
103pub struct Payload {
104 pub bytes: Vec<u8>,
105 pub meta: PayloadMeta,
106}
107
108pub trait Store: Send + Sync {
115 fn persist_with_upstreams(
116 &self,
117 key: &Key,
118 bytes: &[u8],
119 tool_kind: &str,
120 file_roots: Vec<FileRootSerde>,
121 upstream_keys: Vec<String>,
122 ) -> Result<(), StoreError>;
123
124 fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError>;
125
126 fn remove(&self, key: &Key) -> Result<(), StoreError>;
127
128 fn total_bytes(&self) -> Result<u64, StoreError>;
129
130 fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError>;
131
132 fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError>;
133
134 fn contains(&self, key: &Key) -> bool;
135
136 fn persist(
137 &self,
138 key: &Key,
139 bytes: &[u8],
140 tool_kind: &str,
141 file_roots: Vec<FileRootSerde>,
142 ) -> Result<(), StoreError> {
143 self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
144 }
145}
146
147#[derive(Debug)]
148pub struct FileStore {
149 root: PathBuf,
150}
151
152impl FileStore {
153 pub fn open(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
154 let root = root.into();
155 fs::create_dir_all(&root)?;
156 Ok(Self { root })
157 }
158
159 pub fn root(&self) -> &Path {
160 &self.root
161 }
162
163 fn shard_dir(&self, key: &Key) -> PathBuf {
164 self.root.join(&key.0[..2])
165 }
166
167 fn payload_path(&self, key: &Key) -> PathBuf {
168 self.shard_dir(key).join(format!("{}.payload", key.0))
169 }
170
171 fn meta_path(&self, key: &Key) -> PathBuf {
172 self.shard_dir(key).join(format!("{}.meta.json", key.0))
173 }
174
175 pub fn persist(
179 &self,
180 key: &Key,
181 bytes: &[u8],
182 tool_kind: &str,
183 file_roots: Vec<FileRootSerde>,
184 ) -> Result<(), StoreError> {
185 self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
186 }
187
188 pub fn persist_with_upstreams(
194 &self,
195 key: &Key,
196 bytes: &[u8],
197 tool_kind: &str,
198 file_roots: Vec<FileRootSerde>,
199 upstream_keys: Vec<String>,
200 ) -> Result<(), StoreError> {
201 key.validate()?;
202 fs::create_dir_all(self.shard_dir(key))?;
203
204 let payload_hash = blake3::hash(bytes).to_hex().to_string();
205 let meta = PayloadMeta {
206 payload_hash,
207 bytes: bytes.len() as u64,
208 tool_kind: tool_kind.to_string(),
209 file_roots,
210 upstream_keys,
211 };
212
213 write_atomic(&self.payload_path(key), bytes)?;
214 let meta_bytes = serde_json::to_vec(&meta)?;
215 write_atomic(&self.meta_path(key), &meta_bytes)?;
216 Ok(())
217 }
218
219 pub fn remove(&self, key: &Key) -> Result<(), StoreError> {
225 key.validate()?;
226 let pp = self.payload_path(key);
227 let mp = self.meta_path(key);
228 if pp.exists() {
229 fs::remove_file(&pp)?;
230 }
231 if mp.exists() {
232 fs::remove_file(&mp)?;
233 }
234 Ok(())
235 }
236
237 pub fn total_bytes(&self) -> Result<u64, StoreError> {
243 let mut total: u64 = 0;
244 let entries = match fs::read_dir(&self.root) {
245 Ok(e) => e,
246 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
247 Err(e) => return Err(e.into()),
248 };
249 for shard in entries.flatten() {
250 let shard_path = shard.path();
251 if !shard_path.is_dir() {
252 continue;
253 }
254 for entry in fs::read_dir(&shard_path)?.flatten() {
255 if let Ok(md) = entry.metadata() {
256 if md.is_file() {
257 total = total.saturating_add(md.len());
258 }
259 }
260 }
261 }
262 Ok(total)
263 }
264
265 pub fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
279 let mut current = self.total_bytes()?;
280 if current <= cap_bytes {
281 return Ok(0);
282 }
283
284 let mut entries: Vec<(std::time::SystemTime, Key, u64)> = Vec::new();
286 let dir = match fs::read_dir(&self.root) {
287 Ok(e) => e,
288 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
289 Err(e) => return Err(e.into()),
290 };
291 for shard in dir.flatten() {
292 let shard_path = shard.path();
293 if !shard_path.is_dir() {
294 continue;
295 }
296 for entry in fs::read_dir(&shard_path)?.flatten() {
297 let p = entry.path();
298 let name = match p.file_name().and_then(|n| n.to_str()) {
299 Some(s) if s.ends_with(".payload") => s.to_string(),
300 _ => continue,
301 };
302 let key_hex = name.trim_end_matches(".payload").to_string();
303 let key = Key(key_hex);
304 if key.validate().is_err() {
305 continue;
306 }
307 let md = entry.metadata()?;
308 let payload_len = md.len();
309 let meta_len = fs::metadata(self.meta_path(&key))
310 .map(|m| m.len())
311 .unwrap_or(0);
312 let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
313 entries.push((mtime, key, payload_len + meta_len));
314 }
315 }
316 entries.sort_by_key(|(t, _, _)| *t);
317
318 let mut dropped = 0usize;
319 for (_, key, size) in entries {
320 if current <= cap_bytes {
321 break;
322 }
323 if self.remove(&key).is_ok() {
324 current = current.saturating_sub(size);
325 dropped += 1;
326 }
327 }
328 Ok(dropped)
329 }
330
331 pub fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
337 let mut out = Vec::new();
338 let entries = match fs::read_dir(&self.root) {
339 Ok(e) => e,
340 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
341 Err(e) => return Err(e.into()),
342 };
343 for shard in entries.flatten() {
344 let shard_path = shard.path();
345 if !shard_path.is_dir() {
346 continue;
347 }
348 for entry in fs::read_dir(&shard_path)?.flatten() {
349 let p = entry.path();
350 let name = match p.file_name().and_then(|n| n.to_str()) {
351 Some(s) if s.ends_with(".meta.json") => s.to_string(),
352 _ => continue,
353 };
354 let key_hex = name.trim_end_matches(".meta.json").to_string();
355 let key = Key(key_hex);
356 if key.validate().is_err() {
357 continue;
358 }
359 let meta: PayloadMeta = match fs::read(&p)
360 .ok()
361 .and_then(|b| serde_json::from_slice(&b).ok())
362 {
363 Some(m) => m,
364 None => continue,
365 };
366 out.push((key, meta));
367 }
368 }
369 Ok(out)
370 }
371
372 pub fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
378 key.validate()?;
379 let pp = self.payload_path(key);
380 let mp = self.meta_path(key);
381 if !pp.exists() || !mp.exists() {
382 return Ok(None);
383 }
384 let mut bytes = Vec::new();
385 fs::File::open(&pp)?.read_to_end(&mut bytes)?;
386 let meta: PayloadMeta = serde_json::from_slice(&fs::read(&mp)?)?;
387
388 let actual = blake3::hash(&bytes).to_hex().to_string();
389 if actual != meta.payload_hash {
390 return Err(StoreError::Integrity {
391 key: key.0.clone(),
392 expected: meta.payload_hash.clone(),
393 actual,
394 });
395 }
396 Ok(Some(Payload { bytes, meta }))
397 }
398
399 pub fn contains(&self, key: &Key) -> bool {
403 key.validate().is_ok() && self.payload_path(key).exists() && self.meta_path(key).exists()
404 }
405}
406
407impl Store for FileStore {
408 fn persist_with_upstreams(
409 &self,
410 key: &Key,
411 bytes: &[u8],
412 tool_kind: &str,
413 file_roots: Vec<FileRootSerde>,
414 upstream_keys: Vec<String>,
415 ) -> Result<(), StoreError> {
416 FileStore::persist_with_upstreams(self, key, bytes, tool_kind, file_roots, upstream_keys)
417 }
418
419 fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
420 FileStore::lookup(self, key)
421 }
422
423 fn remove(&self, key: &Key) -> Result<(), StoreError> {
424 FileStore::remove(self, key)
425 }
426
427 fn total_bytes(&self) -> Result<u64, StoreError> {
428 FileStore::total_bytes(self)
429 }
430
431 fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
432 FileStore::evict_to_cap(self, cap_bytes)
433 }
434
435 fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
436 FileStore::iter_meta(self)
437 }
438
439 fn contains(&self, key: &Key) -> bool {
440 FileStore::contains(self, key)
441 }
442}
443
444fn write_atomic(target: &Path, bytes: &[u8]) -> io::Result<()> {
445 let parent = target
450 .parent()
451 .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "target has no parent"))?;
452 let mut guard = TempGuard::create_in(parent)?;
453 guard.file.write_all(bytes)?;
454 guard.file.flush()?;
455 guard.persist(target)
456}
457
458struct TempGuard {
465 path: PathBuf,
466 file: fs::File,
467 armed: bool,
468}
469
470impl TempGuard {
471 fn create_in(dir: &Path) -> io::Result<Self> {
472 use std::sync::atomic::{AtomicU64, Ordering};
473 static COUNTER: AtomicU64 = AtomicU64::new(0);
474 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
475 let pid = std::process::id();
476 let path = dir.join(format!(".verdant-tmp-{pid}-{n}"));
477 let file = fs::OpenOptions::new()
478 .write(true)
479 .create_new(true)
480 .open(&path)?;
481 Ok(Self {
482 path,
483 file,
484 armed: true,
485 })
486 }
487
488 fn persist(mut self, target: &Path) -> io::Result<()> {
489 self.armed = false;
490 fs::rename(&self.path, target)
491 }
492}
493
494impl Drop for TempGuard {
495 fn drop(&mut self) {
496 if self.armed {
497 let _ = fs::remove_file(&self.path);
498 }
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use tempfile::TempDir;
506
507 fn store() -> (TempDir, FileStore) {
508 let dir = TempDir::new().unwrap();
509 let s = FileStore::open(dir.path().to_path_buf()).unwrap();
510 (dir, s)
511 }
512
513 #[test]
514 fn persist_then_lookup_roundtrip() {
515 let (_d, s) = store();
516 let k = Key::from_bytes(b"input-1");
517 s.persist(&k, b"hello world", "read", vec![]).unwrap();
518 let p = s.lookup(&k).unwrap().expect("must exist");
519 assert_eq!(p.bytes, b"hello world");
520 assert_eq!(p.meta.tool_kind, "read");
521 assert_eq!(p.meta.bytes, 11);
522 }
523
524 #[test]
525 fn lookup_missing_returns_none() {
526 let (_d, s) = store();
527 let k = Key::from_bytes(b"never-written");
528 assert!(s.lookup(&k).unwrap().is_none());
529 }
530
531 #[test]
532 fn integrity_violation_detected() {
533 let (_d, s) = store();
534 let k = Key::from_bytes(b"input-2");
535 s.persist(&k, b"trusted", "read", vec![]).unwrap();
536 let pp = s.root.join(&k.0[..2]).join(format!("{}.payload", k.0));
540 fs::write(&pp, b"tampered").unwrap();
541 let err = s.lookup(&k).err().expect("integrity must fire");
542 assert!(matches!(err, StoreError::Integrity { .. }));
543 }
544
545 #[test]
546 fn partial_write_only_meta_returns_none() {
547 let (_d, s) = store();
553 let k = Key::from_bytes(b"input-3");
554 fs::create_dir_all(s.root.join(&k.0[..2])).unwrap();
556 let mp = s.root.join(&k.0[..2]).join(format!("{}.meta.json", k.0));
557 fs::write(
558 &mp,
559 serde_json::to_vec(&PayloadMeta {
560 payload_hash: blake3::hash(b"orphan").to_hex().to_string(),
561 bytes: 6,
562 tool_kind: "read".into(),
563 file_roots: vec![],
564 upstream_keys: vec![],
565 })
566 .unwrap(),
567 )
568 .unwrap();
569 assert!(s.lookup(&k).unwrap().is_none());
570 }
571
572 #[test]
573 fn total_bytes_sums_payloads_and_meta() {
574 let (_d, s) = store();
575 assert_eq!(s.total_bytes().unwrap(), 0, "fresh store is zero bytes");
576 let k = Key::from_bytes(b"size-test");
577 s.persist(&k, &vec![b'x'; 1024], "read", vec![]).unwrap();
578 let bytes = s.total_bytes().unwrap();
579 assert!(bytes >= 1024, "payload alone is ≥1024, got {bytes}");
580 }
581
582 #[test]
583 fn evict_to_cap_drops_oldest_first() {
584 let (_d, s) = store();
585 let keys: Vec<Key> = (0..4)
588 .map(|i| Key::from_bytes(format!("k{i}").as_bytes()))
589 .collect();
590 for (i, k) in keys.iter().enumerate() {
591 s.persist(k, &vec![b'A' + i as u8; 4096], "read", vec![])
592 .unwrap();
593 std::thread::sleep(std::time::Duration::from_millis(20));
594 }
595 let before = s.total_bytes().unwrap();
596 assert!(before >= 4 * 4096);
597
598 let cap = before / 2;
600 let dropped = s.evict_to_cap(cap).unwrap();
601 assert!(dropped >= 1, "should drop at least one entry");
602 let after = s.total_bytes().unwrap();
603 assert!(
604 after <= cap,
605 "after eviction must fit cap; got {after}/{cap}"
606 );
607
608 assert!(s.lookup(&keys[0]).unwrap().is_none(), "oldest must evict");
611 assert!(s.lookup(&keys[3]).unwrap().is_some(), "newest must survive");
612 }
613
614 #[test]
615 fn evict_below_cap_is_noop() {
616 let (_d, s) = store();
617 let k = Key::from_bytes(b"small");
618 s.persist(&k, b"tiny", "read", vec![]).unwrap();
619 let dropped = s.evict_to_cap(u64::MAX).unwrap();
620 assert_eq!(dropped, 0);
621 assert!(s.lookup(&k).unwrap().is_some());
622 }
623
624 #[test]
625 fn malformed_key_rejected() {
626 let (_d, s) = store();
627 let bad = Key("not-hex".to_string());
628 assert!(s.persist(&bad, b"x", "read", vec![]).is_err());
629 assert!(s.lookup(&bad).is_err());
630 }
631
632 #[test]
633 fn shard_dirs_distribute_keys() {
634 let (_d, s) = store();
635 for i in 0..16u8 {
636 let k = Key::from_bytes(&[i, i, i]);
637 s.persist(&k, &[i], "read", vec![]).unwrap();
638 }
639 let mut shards = std::collections::HashSet::new();
644 for entry in fs::read_dir(s.root()).unwrap() {
645 let e = entry.unwrap();
646 if e.path().is_dir() {
647 shards.insert(e.file_name().to_string_lossy().to_string());
648 }
649 }
650 assert!(shards.len() >= 4, "shards = {shards:?}");
651 }
652
653 #[test]
654 fn contains_only_when_complete() {
655 let (_d, s) = store();
656 let k = Key::from_bytes(b"x");
657 assert!(!s.contains(&k));
658 s.persist(&k, b"y", "read", vec![]).unwrap();
659 assert!(s.contains(&k));
660 }
661}