use crate::digest::Digest256;
use crate::error::{Error, Result};
use dashmap::DashMap;
use parking_lot::Mutex;
use std::fs;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
pub const ZSTD_LEVEL: i32 = 19;
pub trait BlobStore: Send + Sync {
fn put(&self, bytes: &[u8]) -> Result<Digest256>;
fn get(&self, digest: &Digest256) -> Result<Vec<u8>>;
fn contains(&self, digest: &Digest256) -> Result<bool>;
fn physical_bytes(&self) -> Result<u64>;
}
#[derive(Debug)]
pub struct FsBlobStore {
root: PathBuf,
counter: Mutex<u64>,
}
impl FsBlobStore {
pub fn open(root: impl AsRef<Path>) -> Result<Self> {
let root = root.as_ref().to_path_buf();
let blobs = root.join("blobs").join("sha256");
fs::create_dir_all(&blobs)?;
Ok(Self {
root,
counter: Mutex::new(0),
})
}
pub fn root(&self) -> &Path {
&self.root
}
fn path_for(&self, digest: &Digest256) -> Result<PathBuf> {
let hex = digest.hex();
let shard = &hex[..2];
let dir = self.root.join("blobs").join("sha256").join(shard);
fs::create_dir_all(&dir)?;
Ok(dir.join(format!("{hex}.zst")))
}
fn walk_size(dir: &Path) -> Result<u64> {
let mut total = 0u64;
if !dir.exists() {
return Ok(0);
}
for entry in fs::read_dir(dir)? {
let entry = entry?;
let ty = entry.file_type()?;
if ty.is_dir() {
total = total.saturating_add(Self::walk_size(&entry.path())?);
} else if ty.is_file() {
total = total.saturating_add(entry.metadata()?.len());
}
}
Ok(total)
}
}
impl BlobStore for FsBlobStore {
fn put(&self, bytes: &[u8]) -> Result<Digest256> {
let digest = Digest256::of(bytes);
let final_path = self.path_for(&digest)?;
if final_path.exists() {
return Ok(digest);
}
let counter = {
let mut g = self.counter.lock();
*g = g.wrapping_add(1);
*g
};
let pid = std::process::id();
let tmp_path = final_path.with_extension(format!("tmp.{pid}.{counter}"));
let compressed = zstd::encode_all(bytes, ZSTD_LEVEL)?;
{
let mut f = fs::File::create(&tmp_path)?;
f.write_all(&compressed)?;
f.sync_all()?;
}
match fs::rename(&tmp_path, &final_path) {
Ok(()) => Ok(digest),
Err(e) => {
let _ = fs::remove_file(&tmp_path);
Err(e.into())
}
}
}
fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
let path = self.path_for(digest)?;
let mut f = fs::File::open(&path)?;
let mut compressed = Vec::new();
f.read_to_end(&mut compressed)?;
let bytes = zstd::decode_all(compressed.as_slice())?;
let observed = Digest256::of(&bytes);
if &observed != digest {
return Err(Error::Integrity(format!(
"blob {digest} on disk hashes to {observed}"
)));
}
Ok(bytes)
}
fn contains(&self, digest: &Digest256) -> Result<bool> {
Ok(self.path_for(digest)?.exists())
}
fn physical_bytes(&self) -> Result<u64> {
Self::walk_size(&self.root.join("blobs"))
}
}
#[derive(Debug, Default)]
pub struct MemBlobStore {
inner: DashMap<Digest256, Vec<u8>>,
}
impl MemBlobStore {
pub fn new() -> Self {
Self::default()
}
}
impl BlobStore for MemBlobStore {
fn put(&self, bytes: &[u8]) -> Result<Digest256> {
let d = Digest256::of(bytes);
self.inner
.entry(d.clone())
.or_insert_with(|| bytes.to_vec());
Ok(d)
}
fn get(&self, digest: &Digest256) -> Result<Vec<u8>> {
self.inner
.get(digest)
.map(|r| r.value().clone())
.ok_or_else(|| Error::Integrity(format!("not in mem store: {digest}")))
}
fn contains(&self, digest: &Digest256) -> Result<bool> {
Ok(self.inner.contains_key(digest))
}
fn physical_bytes(&self) -> Result<u64> {
Ok(self
.inner
.iter()
.map(|kv| kv.value().len() as u64)
.sum::<u64>())
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn fs_round_trip_byte_identical() {
let dir = TempDir::new().unwrap();
let store = FsBlobStore::open(dir.path()).unwrap();
let payload = b"the quick brown fox".to_vec();
let d = store.put(&payload).unwrap();
assert_eq!(d, Digest256::of(&payload));
assert!(store.contains(&d).unwrap());
let back = store.get(&d).unwrap();
assert_eq!(back, payload);
}
#[test]
fn fs_dedupes_identical_writes() {
let dir = TempDir::new().unwrap();
let store = FsBlobStore::open(dir.path()).unwrap();
let payload = vec![0xABu8; 1024];
let d1 = store.put(&payload).unwrap();
let size_after_first = store.physical_bytes().unwrap();
let d2 = store.put(&payload).unwrap();
let size_after_second = store.physical_bytes().unwrap();
assert_eq!(d1, d2);
assert_eq!(
size_after_first, size_after_second,
"second put must be a no-op"
);
}
#[test]
fn fs_detects_corruption() {
let dir = TempDir::new().unwrap();
let store = FsBlobStore::open(dir.path()).unwrap();
let d = store.put(b"original").unwrap();
let path = store.path_for(&d).unwrap();
fs::write(&path, b"\x28\xb5\x2f\xfd\x00garbage").unwrap();
let err = store.get(&d).unwrap_err();
assert!(matches!(err, Error::Integrity(_)) || matches!(err, Error::Io(_)));
}
#[test]
fn mem_round_trip() {
let store = MemBlobStore::new();
let d = store.put(b"hello").unwrap();
assert_eq!(store.get(&d).unwrap(), b"hello".to_vec());
assert!(store.contains(&d).unwrap());
}
#[test]
fn fs_sharding_uses_first_two_hex_chars() {
let dir = TempDir::new().unwrap();
let store = FsBlobStore::open(dir.path()).unwrap();
let d = store.put(b"sharding test").unwrap();
let path = store.path_for(&d).unwrap();
let parent = path
.parent()
.unwrap()
.file_name()
.unwrap()
.to_str()
.unwrap();
assert_eq!(parent, &d.hex()[..2]);
}
}