use serde::{Deserialize, Serialize};
use std::fs;
use std::io::{self, Read as _, Write as _};
use std::path::{Path, PathBuf};
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("malformed key: {0}")]
BadKey(String),
#[error("metadata decode failed: {0}")]
Meta(#[from] serde_json::Error),
#[error(
"integrity check failed for key {key}: stored payload hash {actual} != expected {expected}"
)]
Integrity {
key: String,
expected: String,
actual: String,
},
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Key(pub String);
impl Key {
pub fn from_bytes(bytes: &[u8]) -> Self {
Key(blake3::hash(bytes).to_hex().to_string())
}
pub fn as_str(&self) -> &str {
&self.0
}
fn validate(&self) -> Result<(), StoreError> {
if self.0.len() != 64 || !self.0.chars().all(|c| c.is_ascii_hexdigit()) {
return Err(StoreError::BadKey(self.0.clone()));
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PayloadMeta {
pub payload_hash: String,
pub bytes: u64,
pub tool_kind: String,
#[serde(default)]
pub file_roots: Vec<FileRootSerde>,
#[serde(default)]
pub upstream_keys: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FileRootSerde {
pub path: String,
pub expected_hash: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Payload {
pub bytes: Vec<u8>,
pub meta: PayloadMeta,
}
pub trait Store: Send + Sync {
fn persist_with_upstreams(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
upstream_keys: Vec<String>,
) -> Result<(), StoreError>;
fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError>;
fn remove(&self, key: &Key) -> Result<(), StoreError>;
fn total_bytes(&self) -> Result<u64, StoreError>;
fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError>;
fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError>;
fn contains(&self, key: &Key) -> bool;
fn persist(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
) -> Result<(), StoreError> {
self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
}
}
#[derive(Debug)]
pub struct FileStore {
root: PathBuf,
}
impl FileStore {
pub fn open(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
let root = root.into();
fs::create_dir_all(&root)?;
Ok(Self { root })
}
pub fn root(&self) -> &Path {
&self.root
}
fn shard_dir(&self, key: &Key) -> PathBuf {
self.root.join(&key.0[..2])
}
fn payload_path(&self, key: &Key) -> PathBuf {
self.shard_dir(key).join(format!("{}.payload", key.0))
}
fn meta_path(&self, key: &Key) -> PathBuf {
self.shard_dir(key).join(format!("{}.meta.json", key.0))
}
pub fn persist(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
) -> Result<(), StoreError> {
self.persist_with_upstreams(key, bytes, tool_kind, file_roots, Vec::new())
}
pub fn persist_with_upstreams(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
upstream_keys: Vec<String>,
) -> Result<(), StoreError> {
key.validate()?;
fs::create_dir_all(self.shard_dir(key))?;
let payload_hash = blake3::hash(bytes).to_hex().to_string();
let meta = PayloadMeta {
payload_hash,
bytes: bytes.len() as u64,
tool_kind: tool_kind.to_string(),
file_roots,
upstream_keys,
};
write_atomic(&self.payload_path(key), bytes)?;
let meta_bytes = serde_json::to_vec(&meta)?;
write_atomic(&self.meta_path(key), &meta_bytes)?;
Ok(())
}
pub fn remove(&self, key: &Key) -> Result<(), StoreError> {
key.validate()?;
let pp = self.payload_path(key);
let mp = self.meta_path(key);
if pp.exists() {
fs::remove_file(&pp)?;
}
if mp.exists() {
fs::remove_file(&mp)?;
}
Ok(())
}
pub fn total_bytes(&self) -> Result<u64, StoreError> {
let mut total: u64 = 0;
let entries = match fs::read_dir(&self.root) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(e.into()),
};
for shard in entries.flatten() {
let shard_path = shard.path();
if !shard_path.is_dir() {
continue;
}
for entry in fs::read_dir(&shard_path)?.flatten() {
if let Ok(md) = entry.metadata() {
if md.is_file() {
total = total.saturating_add(md.len());
}
}
}
}
Ok(total)
}
pub fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
let mut current = self.total_bytes()?;
if current <= cap_bytes {
return Ok(0);
}
let mut entries: Vec<(std::time::SystemTime, Key, u64)> = Vec::new();
let dir = match fs::read_dir(&self.root) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(e.into()),
};
for shard in dir.flatten() {
let shard_path = shard.path();
if !shard_path.is_dir() {
continue;
}
for entry in fs::read_dir(&shard_path)?.flatten() {
let p = entry.path();
let name = match p.file_name().and_then(|n| n.to_str()) {
Some(s) if s.ends_with(".payload") => s.to_string(),
_ => continue,
};
let key_hex = name.trim_end_matches(".payload").to_string();
let key = Key(key_hex);
if key.validate().is_err() {
continue;
}
let md = entry.metadata()?;
let payload_len = md.len();
let meta_len = fs::metadata(self.meta_path(&key))
.map(|m| m.len())
.unwrap_or(0);
let mtime = md.modified().unwrap_or(std::time::UNIX_EPOCH);
entries.push((mtime, key, payload_len + meta_len));
}
}
entries.sort_by_key(|(t, _, _)| *t);
let mut dropped = 0usize;
for (_, key, size) in entries {
if current <= cap_bytes {
break;
}
if self.remove(&key).is_ok() {
current = current.saturating_sub(size);
dropped += 1;
}
}
Ok(dropped)
}
pub fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
let mut out = Vec::new();
let entries = match fs::read_dir(&self.root) {
Ok(e) => e,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(out),
Err(e) => return Err(e.into()),
};
for shard in entries.flatten() {
let shard_path = shard.path();
if !shard_path.is_dir() {
continue;
}
for entry in fs::read_dir(&shard_path)?.flatten() {
let p = entry.path();
let name = match p.file_name().and_then(|n| n.to_str()) {
Some(s) if s.ends_with(".meta.json") => s.to_string(),
_ => continue,
};
let key_hex = name.trim_end_matches(".meta.json").to_string();
let key = Key(key_hex);
if key.validate().is_err() {
continue;
}
let meta: PayloadMeta = match fs::read(&p)
.ok()
.and_then(|b| serde_json::from_slice(&b).ok())
{
Some(m) => m,
None => continue,
};
out.push((key, meta));
}
}
Ok(out)
}
pub fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
key.validate()?;
let pp = self.payload_path(key);
let mp = self.meta_path(key);
if !pp.exists() || !mp.exists() {
return Ok(None);
}
let mut bytes = Vec::new();
fs::File::open(&pp)?.read_to_end(&mut bytes)?;
let meta: PayloadMeta = serde_json::from_slice(&fs::read(&mp)?)?;
let actual = blake3::hash(&bytes).to_hex().to_string();
if actual != meta.payload_hash {
return Err(StoreError::Integrity {
key: key.0.clone(),
expected: meta.payload_hash.clone(),
actual,
});
}
Ok(Some(Payload { bytes, meta }))
}
pub fn contains(&self, key: &Key) -> bool {
key.validate().is_ok() && self.payload_path(key).exists() && self.meta_path(key).exists()
}
}
impl Store for FileStore {
fn persist_with_upstreams(
&self,
key: &Key,
bytes: &[u8],
tool_kind: &str,
file_roots: Vec<FileRootSerde>,
upstream_keys: Vec<String>,
) -> Result<(), StoreError> {
FileStore::persist_with_upstreams(self, key, bytes, tool_kind, file_roots, upstream_keys)
}
fn lookup(&self, key: &Key) -> Result<Option<Payload>, StoreError> {
FileStore::lookup(self, key)
}
fn remove(&self, key: &Key) -> Result<(), StoreError> {
FileStore::remove(self, key)
}
fn total_bytes(&self) -> Result<u64, StoreError> {
FileStore::total_bytes(self)
}
fn evict_to_cap(&self, cap_bytes: u64) -> Result<usize, StoreError> {
FileStore::evict_to_cap(self, cap_bytes)
}
fn iter_meta(&self) -> Result<Vec<(Key, PayloadMeta)>, StoreError> {
FileStore::iter_meta(self)
}
fn contains(&self, key: &Key) -> bool {
FileStore::contains(self, key)
}
}
fn write_atomic(target: &Path, bytes: &[u8]) -> io::Result<()> {
let parent = target
.parent()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "target has no parent"))?;
let mut guard = TempGuard::create_in(parent)?;
guard.file.write_all(bytes)?;
guard.file.flush()?;
guard.persist(target)
}
struct TempGuard {
path: PathBuf,
file: fs::File,
armed: bool,
}
impl TempGuard {
fn create_in(dir: &Path) -> io::Result<Self> {
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let path = dir.join(format!(".verdant-tmp-{pid}-{n}"));
let file = fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)?;
Ok(Self {
path,
file,
armed: true,
})
}
fn persist(mut self, target: &Path) -> io::Result<()> {
self.armed = false;
fs::rename(&self.path, target)
}
}
impl Drop for TempGuard {
fn drop(&mut self) {
if self.armed {
let _ = fs::remove_file(&self.path);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn store() -> (TempDir, FileStore) {
let dir = TempDir::new().unwrap();
let s = FileStore::open(dir.path().to_path_buf()).unwrap();
(dir, s)
}
#[test]
fn persist_then_lookup_roundtrip() {
let (_d, s) = store();
let k = Key::from_bytes(b"input-1");
s.persist(&k, b"hello world", "read", vec![]).unwrap();
let p = s.lookup(&k).unwrap().expect("must exist");
assert_eq!(p.bytes, b"hello world");
assert_eq!(p.meta.tool_kind, "read");
assert_eq!(p.meta.bytes, 11);
}
#[test]
fn lookup_missing_returns_none() {
let (_d, s) = store();
let k = Key::from_bytes(b"never-written");
assert!(s.lookup(&k).unwrap().is_none());
}
#[test]
fn integrity_violation_detected() {
let (_d, s) = store();
let k = Key::from_bytes(b"input-2");
s.persist(&k, b"trusted", "read", vec![]).unwrap();
let pp = s.root.join(&k.0[..2]).join(format!("{}.payload", k.0));
fs::write(&pp, b"tampered").unwrap();
let err = s.lookup(&k).err().expect("integrity must fire");
assert!(matches!(err, StoreError::Integrity { .. }));
}
#[test]
fn partial_write_only_meta_returns_none() {
let (_d, s) = store();
let k = Key::from_bytes(b"input-3");
fs::create_dir_all(s.root.join(&k.0[..2])).unwrap();
let mp = s.root.join(&k.0[..2]).join(format!("{}.meta.json", k.0));
fs::write(
&mp,
serde_json::to_vec(&PayloadMeta {
payload_hash: blake3::hash(b"orphan").to_hex().to_string(),
bytes: 6,
tool_kind: "read".into(),
file_roots: vec![],
upstream_keys: vec![],
})
.unwrap(),
)
.unwrap();
assert!(s.lookup(&k).unwrap().is_none());
}
#[test]
fn total_bytes_sums_payloads_and_meta() {
let (_d, s) = store();
assert_eq!(s.total_bytes().unwrap(), 0, "fresh store is zero bytes");
let k = Key::from_bytes(b"size-test");
s.persist(&k, &vec![b'x'; 1024], "read", vec![]).unwrap();
let bytes = s.total_bytes().unwrap();
assert!(bytes >= 1024, "payload alone is ≥1024, got {bytes}");
}
#[test]
fn evict_to_cap_drops_oldest_first() {
let (_d, s) = store();
let keys: Vec<Key> = (0..4)
.map(|i| Key::from_bytes(format!("k{i}").as_bytes()))
.collect();
for (i, k) in keys.iter().enumerate() {
s.persist(k, &vec![b'A' + i as u8; 4096], "read", vec![])
.unwrap();
std::thread::sleep(std::time::Duration::from_millis(20));
}
let before = s.total_bytes().unwrap();
assert!(before >= 4 * 4096);
let cap = before / 2;
let dropped = s.evict_to_cap(cap).unwrap();
assert!(dropped >= 1, "should drop at least one entry");
let after = s.total_bytes().unwrap();
assert!(
after <= cap,
"after eviction must fit cap; got {after}/{cap}"
);
assert!(s.lookup(&keys[0]).unwrap().is_none(), "oldest must evict");
assert!(s.lookup(&keys[3]).unwrap().is_some(), "newest must survive");
}
#[test]
fn evict_below_cap_is_noop() {
let (_d, s) = store();
let k = Key::from_bytes(b"small");
s.persist(&k, b"tiny", "read", vec![]).unwrap();
let dropped = s.evict_to_cap(u64::MAX).unwrap();
assert_eq!(dropped, 0);
assert!(s.lookup(&k).unwrap().is_some());
}
#[test]
fn malformed_key_rejected() {
let (_d, s) = store();
let bad = Key("not-hex".to_string());
assert!(s.persist(&bad, b"x", "read", vec![]).is_err());
assert!(s.lookup(&bad).is_err());
}
#[test]
fn shard_dirs_distribute_keys() {
let (_d, s) = store();
for i in 0..16u8 {
let k = Key::from_bytes(&[i, i, i]);
s.persist(&k, &[i], "read", vec![]).unwrap();
}
let mut shards = std::collections::HashSet::new();
for entry in fs::read_dir(s.root()).unwrap() {
let e = entry.unwrap();
if e.path().is_dir() {
shards.insert(e.file_name().to_string_lossy().to_string());
}
}
assert!(shards.len() >= 4, "shards = {shards:?}");
}
#[test]
fn contains_only_when_complete() {
let (_d, s) = store();
let k = Key::from_bytes(b"x");
assert!(!s.contains(&k));
s.persist(&k, b"y", "read", vec![]).unwrap();
assert!(s.contains(&k));
}
}