use async_trait::async_trait;
use hashtree_core::store::{Store, StoreError, StoreStats};
use hashtree_core::types::Hash;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use std::time::SystemTime;
pub struct FsBlobStore {
base_path: PathBuf,
max_bytes: AtomicU64,
pins: RwLock<HashMap<String, u32>>,
}
impl FsBlobStore {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, StoreError> {
let base_path = path.as_ref().to_path_buf();
fs::create_dir_all(&base_path)?;
let pins = Self::load_pins(&base_path).unwrap_or_default();
Ok(Self {
base_path,
max_bytes: AtomicU64::new(0), pins: RwLock::new(pins),
})
}
pub fn with_max_bytes<P: AsRef<Path>>(path: P, max_bytes: u64) -> Result<Self, StoreError> {
let store = Self::new(path)?;
store.max_bytes.store(max_bytes, Ordering::Relaxed);
Ok(store)
}
fn pins_path(&self) -> PathBuf {
self.base_path.join("pins.json")
}
fn load_pins(base_path: &Path) -> Option<HashMap<String, u32>> {
let pins_path = base_path.join("pins.json");
let contents = fs::read_to_string(pins_path).ok()?;
serde_json::from_str(&contents).ok()
}
fn save_pins(&self) -> Result<(), StoreError> {
let pins = self.pins.read().unwrap();
let json = serde_json::to_string(&*pins)
.map_err(|e| StoreError::Other(format!("Failed to serialize pins: {}", e)))?;
fs::write(self.pins_path(), json)?;
Ok(())
}
fn blob_path_from_hex(&self, hash_hex: &str) -> PathBuf {
let (prefix, rest) = hash_hex.split_at(2);
let (subdir, filename) = rest.split_at(2);
self.base_path.join(prefix).join(subdir).join(filename)
}
fn legacy_blob_path(&self, hash: &Hash) -> PathBuf {
let hex = hex::encode(hash);
let (prefix, rest) = hex.split_at(2);
self.base_path.join(prefix).join(rest)
}
fn blob_path(&self, hash: &Hash) -> PathBuf {
self.blob_path_from_hex(&hex::encode(hash))
}
fn existing_blob_path(&self, hash: &Hash) -> Option<PathBuf> {
let primary = self.blob_path(hash);
if primary.exists() {
return Some(primary);
}
let legacy = self.legacy_blob_path(hash);
if legacy.exists() {
return Some(legacy);
}
None
}
fn hash_hex_for_blob_path(&self, path: &Path) -> Option<String> {
let relative = path.strip_prefix(&self.base_path).ok()?;
let mut hex = String::new();
for component in relative.iter() {
let part = component.to_str()?;
hex.push_str(part);
}
if hex.len() != 64 || !hex.bytes().all(|b| b.is_ascii_hexdigit()) {
return None;
}
Some(hex.to_ascii_lowercase())
}
fn collect_blob_metadata_recursive(
&self,
dir: &Path,
blobs: &mut Vec<(PathBuf, String, fs::Metadata)>,
) -> Result<(), StoreError> {
let entries = match fs::read_dir(dir) {
Ok(e) => e,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e.into()),
};
for entry in entries {
let entry = entry?;
let file_type = entry.file_type()?;
let path = entry.path();
if file_type.is_dir() {
self.collect_blob_metadata_recursive(&path, blobs)?;
continue;
}
if !file_type.is_file() {
continue;
}
if let Some(hex) = self.hash_hex_for_blob_path(&path) {
blobs.push((path, hex, entry.metadata()?));
}
}
Ok(())
}
fn collect_blob_metadata(&self) -> Result<Vec<(PathBuf, String, fs::Metadata)>, StoreError> {
let mut blobs = Vec::new();
self.collect_blob_metadata_recursive(&self.base_path, &mut blobs)?;
Ok(blobs)
}
pub fn put_sync(&self, hash: Hash, data: &[u8]) -> Result<bool, StoreError> {
let path = self.blob_path(&hash);
if self.existing_blob_path(&hash).is_some() {
return Ok(false);
}
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let temp_path = path.with_extension("tmp");
fs::write(&temp_path, data)?;
fs::rename(&temp_path, &path)?;
Ok(true)
}
pub fn get_sync(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
if let Some(path) = self.existing_blob_path(hash) {
Ok(Some(fs::read(&path)?))
} else {
Ok(None)
}
}
pub fn exists(&self, hash: &Hash) -> bool {
self.existing_blob_path(hash).is_some()
}
pub fn delete_sync(&self, hash: &Hash) -> Result<bool, StoreError> {
let primary = self.blob_path(hash);
let legacy = self.legacy_blob_path(hash);
let mut deleted = false;
for path in [primary, legacy] {
if path.exists() {
fs::remove_file(path)?;
deleted = true;
}
}
Ok(deleted)
}
pub fn list(&self) -> Result<Vec<Hash>, StoreError> {
let mut hashes = Vec::new();
for (_, full_hex, _) in self.collect_blob_metadata()? {
if let Ok(bytes) = hex::decode(&full_hex) {
if bytes.len() == 32 {
let mut hash = [0u8; 32];
hash.copy_from_slice(&bytes);
hashes.push(hash);
}
}
}
Ok(hashes)
}
pub fn stats(&self) -> Result<FsStats, StoreError> {
let pins = self.pins.read().unwrap();
let mut count = 0usize;
let mut total_bytes = 0u64;
let mut pinned_count = 0usize;
let mut pinned_bytes = 0u64;
for (_, hex, metadata) in self.collect_blob_metadata()? {
let size = metadata.len();
count += 1;
total_bytes += size;
if pins.get(&hex).copied().unwrap_or(0) > 0 {
pinned_count += 1;
pinned_bytes += size;
}
}
Ok(FsStats {
count,
total_bytes,
pinned_count,
pinned_bytes,
})
}
fn collect_blobs_for_eviction(&self) -> Vec<(PathBuf, String, SystemTime, u64)> {
self.collect_blob_metadata()
.map(|blobs| {
blobs
.into_iter()
.map(|(path, hex, metadata)| {
let mtime = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
let size = metadata.len();
(path, hex, mtime, size)
})
.collect()
})
.unwrap_or_default()
}
fn evict_to_target(&self, target_bytes: u64) -> u64 {
let pins = self.pins.read().unwrap();
let mut blobs = self.collect_blobs_for_eviction();
blobs.retain(|(_, hex, _, _)| pins.get(hex).copied().unwrap_or(0) == 0);
blobs.sort_by_key(|(_, _, mtime, _)| *mtime);
drop(pins);
let current_bytes: u64 = self
.collect_blobs_for_eviction()
.iter()
.map(|(_, _, _, size)| *size)
.sum();
if current_bytes <= target_bytes {
return 0;
}
let to_free = current_bytes - target_bytes;
let mut freed = 0u64;
for (path, _, _, size) in blobs {
if freed >= to_free {
break;
}
if fs::remove_file(&path).is_ok() {
freed += size;
}
}
freed
}
}
#[derive(Debug, Clone)]
pub struct FsStats {
pub count: usize,
pub total_bytes: u64,
pub pinned_count: usize,
pub pinned_bytes: u64,
}
#[async_trait]
impl Store for FsBlobStore {
async fn put(&self, hash: Hash, data: Vec<u8>) -> Result<bool, StoreError> {
self.put_sync(hash, &data)
}
async fn get(&self, hash: &Hash) -> Result<Option<Vec<u8>>, StoreError> {
self.get_sync(hash)
}
async fn has(&self, hash: &Hash) -> Result<bool, StoreError> {
Ok(self.exists(hash))
}
async fn delete(&self, hash: &Hash) -> Result<bool, StoreError> {
let hex = hex::encode(hash);
{
let mut pins = self.pins.write().unwrap();
pins.remove(&hex);
}
let _ = self.save_pins(); self.delete_sync(hash)
}
fn set_max_bytes(&self, max: u64) {
self.max_bytes.store(max, Ordering::Relaxed);
}
fn max_bytes(&self) -> Option<u64> {
let max = self.max_bytes.load(Ordering::Relaxed);
if max > 0 {
Some(max)
} else {
None
}
}
async fn stats(&self) -> StoreStats {
match self.stats() {
Ok(fs_stats) => StoreStats {
count: fs_stats.count as u64,
bytes: fs_stats.total_bytes,
pinned_count: fs_stats.pinned_count as u64,
pinned_bytes: fs_stats.pinned_bytes,
},
Err(_) => StoreStats::default(),
}
}
async fn evict_if_needed(&self) -> Result<u64, StoreError> {
let max = self.max_bytes.load(Ordering::Relaxed);
if max == 0 {
return Ok(0); }
let current = match self.stats() {
Ok(s) => s.total_bytes,
Err(_) => return Ok(0),
};
if current <= max {
return Ok(0);
}
let target = max * 9 / 10;
Ok(self.evict_to_target(target))
}
async fn pin(&self, hash: &Hash) -> Result<(), StoreError> {
let hex = hex::encode(hash);
{
let mut pins = self.pins.write().unwrap();
*pins.entry(hex).or_insert(0) += 1;
}
self.save_pins()
}
async fn unpin(&self, hash: &Hash) -> Result<(), StoreError> {
let hex = hex::encode(hash);
{
let mut pins = self.pins.write().unwrap();
if let Some(count) = pins.get_mut(&hex) {
if *count > 0 {
*count -= 1;
}
if *count == 0 {
pins.remove(&hex);
}
}
}
self.save_pins()
}
fn pin_count(&self, hash: &Hash) -> u32 {
let hex = hex::encode(hash);
self.pins.read().unwrap().get(&hex).copied().unwrap_or(0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use hashtree_core::sha256;
use tempfile::TempDir;
#[tokio::test]
async fn test_put_get() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"hello filesystem";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
assert!(store.has(&hash).await.unwrap());
assert_eq!(store.get(&hash).await.unwrap(), Some(data.to_vec()));
}
#[tokio::test]
async fn test_get_missing() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let hash = [0u8; 32];
assert!(!store.has(&hash).await.unwrap());
assert_eq!(store.get(&hash).await.unwrap(), None);
}
#[tokio::test]
async fn test_delete() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"delete me";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
assert!(store.has(&hash).await.unwrap());
assert!(store.delete(&hash).await.unwrap());
assert!(!store.has(&hash).await.unwrap());
assert!(!store.delete(&hash).await.unwrap());
}
#[tokio::test]
async fn test_deduplication() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"same content";
let hash = sha256(data);
assert!(store.put(hash, data.to_vec()).await.unwrap());
assert!(!store.put(hash, data.to_vec()).await.unwrap());
assert_eq!(store.list().unwrap().len(), 1);
}
#[tokio::test]
async fn test_list() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let d1 = b"one";
let d2 = b"two";
let d3 = b"three";
let h1 = sha256(d1);
let h2 = sha256(d2);
let h3 = sha256(d3);
store.put(h1, d1.to_vec()).await.unwrap();
store.put(h2, d2.to_vec()).await.unwrap();
store.put(h3, d3.to_vec()).await.unwrap();
let hashes = store.list().unwrap();
assert_eq!(hashes.len(), 3);
assert!(hashes.contains(&h1));
assert!(hashes.contains(&h2));
assert!(hashes.contains(&h3));
}
#[tokio::test]
async fn test_stats() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let d1 = b"hello";
let d2 = b"world";
let h1 = sha256(d1);
store.put(h1, d1.to_vec()).await.unwrap();
store.put(sha256(d2), d2.to_vec()).await.unwrap();
let stats = store.stats().unwrap();
assert_eq!(stats.count, 2);
assert_eq!(stats.total_bytes, 10);
assert_eq!(stats.pinned_count, 0);
assert_eq!(stats.pinned_bytes, 0);
store.pin(&h1).await.unwrap();
let stats = store.stats().unwrap();
assert_eq!(stats.pinned_count, 1);
assert_eq!(stats.pinned_bytes, 5);
}
#[tokio::test]
async fn test_directory_structure() {
let temp = TempDir::new().unwrap();
let blobs_path = temp.path().join("blobs");
let store = FsBlobStore::new(&blobs_path).unwrap();
let data = b"test data";
let hash = sha256(data);
let hex = hex::encode(hash);
store.put(hash, data.to_vec()).await.unwrap();
let prefix = &hex[..2];
let subdir = &hex[2..4];
let rest = &hex[4..];
let expected_path = blobs_path.join(prefix).join(subdir).join(rest);
assert!(
expected_path.exists(),
"Blob should be at {:?}",
expected_path
);
assert_eq!(fs::read(&expected_path).unwrap(), data);
}
#[test]
fn test_blob_path_format() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path()).unwrap();
let mut hash = [0u8; 32];
hash[0] = 0x00;
hash[1] = 0x11;
hash[2] = 0x22;
let path = store.blob_path(&hash);
let path_str = path.to_string_lossy();
assert!(
path_str.contains("/00/"),
"Path should contain /00/ directory: {}",
path_str
);
assert!(
path_str.contains("/11/"),
"Path should contain /11/ directory: {}",
path_str
);
assert!(path.file_name().unwrap().len() == 60);
}
#[tokio::test]
async fn test_legacy_single_level_layout_remains_readable() {
let temp = TempDir::new().unwrap();
let blobs_path = temp.path().join("blobs");
let store = FsBlobStore::new(&blobs_path).unwrap();
let data = b"legacy blob";
let hash = sha256(data);
let hex = hex::encode(hash);
let legacy_path = blobs_path.join(&hex[..2]).join(&hex[2..]);
fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
fs::write(&legacy_path, data).unwrap();
assert!(store.has(&hash).await.unwrap());
assert_eq!(store.get(&hash).await.unwrap(), Some(data.to_vec()));
let listed = store.list().unwrap();
assert_eq!(listed, vec![hash]);
let stats = store.stats().unwrap();
assert_eq!(stats.count, 1);
assert_eq!(stats.total_bytes, data.len() as u64);
assert!(!store.put(hash, data.to_vec()).await.unwrap());
assert!(store.delete(&hash).await.unwrap());
assert!(!legacy_path.exists());
}
#[tokio::test]
async fn test_empty_store_stats() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let stats = store.stats().unwrap();
assert_eq!(stats.count, 0);
assert_eq!(stats.total_bytes, 0);
}
#[tokio::test]
async fn test_empty_store_list() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let hashes = store.list().unwrap();
assert!(hashes.is_empty());
}
#[tokio::test]
async fn test_pin_and_unpin() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"pin me";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
assert!(!store.is_pinned(&hash));
assert_eq!(store.pin_count(&hash), 0);
store.pin(&hash).await.unwrap();
assert!(store.is_pinned(&hash));
assert_eq!(store.pin_count(&hash), 1);
store.unpin(&hash).await.unwrap();
assert!(!store.is_pinned(&hash));
assert_eq!(store.pin_count(&hash), 0);
}
#[tokio::test]
async fn test_pin_ref_counting() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"multi pin";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
store.pin(&hash).await.unwrap();
store.pin(&hash).await.unwrap();
store.pin(&hash).await.unwrap();
assert_eq!(store.pin_count(&hash), 3);
store.unpin(&hash).await.unwrap();
assert_eq!(store.pin_count(&hash), 2);
assert!(store.is_pinned(&hash));
store.unpin(&hash).await.unwrap();
store.unpin(&hash).await.unwrap();
assert_eq!(store.pin_count(&hash), 0);
}
#[tokio::test]
async fn test_pins_persist_across_reload() {
let temp = TempDir::new().unwrap();
let blobs_path = temp.path().join("blobs");
let data = b"persist me";
let hash = sha256(data);
{
let store = FsBlobStore::new(&blobs_path).unwrap();
store.put(hash, data.to_vec()).await.unwrap();
store.pin(&hash).await.unwrap();
store.pin(&hash).await.unwrap();
assert_eq!(store.pin_count(&hash), 2);
}
{
let store = FsBlobStore::new(&blobs_path).unwrap();
assert_eq!(store.pin_count(&hash), 2);
assert!(store.is_pinned(&hash));
}
}
#[tokio::test]
async fn test_max_bytes() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
assert!(store.max_bytes().is_none());
store.set_max_bytes(1000);
assert_eq!(store.max_bytes(), Some(1000));
store.set_max_bytes(0);
assert!(store.max_bytes().is_none());
}
#[tokio::test]
async fn test_with_max_bytes() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 500).unwrap();
assert_eq!(store.max_bytes(), Some(500));
}
#[tokio::test]
async fn test_eviction_respects_pins() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 20).unwrap();
let d1 = b"aaaaa"; let d2 = b"bbbbb";
let d3 = b"ccccc";
let h1 = sha256(d1);
let h2 = sha256(d2);
let h3 = sha256(d3);
store.put(h1, d1.to_vec()).await.unwrap();
std::thread::sleep(std::time::Duration::from_millis(10)); store.put(h2, d2.to_vec()).await.unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
store.put(h3, d3.to_vec()).await.unwrap();
store.pin(&h1).await.unwrap();
let d4 = b"ddddd";
let h4 = sha256(d4);
std::thread::sleep(std::time::Duration::from_millis(10));
store.put(h4, d4.to_vec()).await.unwrap();
let d5 = b"eeeee";
let h5 = sha256(d5);
std::thread::sleep(std::time::Duration::from_millis(10));
store.put(h5, d5.to_vec()).await.unwrap();
let freed = store.evict_if_needed().await.unwrap();
assert!(freed > 0, "Should have freed some bytes");
assert!(store.has(&h1).await.unwrap(), "Pinned item should exist");
assert!(
!store.has(&h2).await.unwrap(),
"Oldest unpinned should be evicted"
);
assert!(store.has(&h5).await.unwrap(), "Newest should exist");
}
#[tokio::test]
async fn test_no_eviction_when_under_limit() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::with_max_bytes(temp.path().join("blobs"), 1000).unwrap();
let data = b"small";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
let freed = store.evict_if_needed().await.unwrap();
assert_eq!(freed, 0);
assert!(store.has(&hash).await.unwrap());
}
#[tokio::test]
async fn test_no_eviction_without_limit() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
for i in 0..10u8 {
let data = vec![i; 100];
let hash = sha256(&data);
store.put(hash, data).await.unwrap();
}
let freed = store.evict_if_needed().await.unwrap();
assert_eq!(freed, 0);
assert_eq!(store.list().unwrap().len(), 10);
}
#[tokio::test]
async fn test_delete_removes_pin() {
let temp = TempDir::new().unwrap();
let store = FsBlobStore::new(temp.path().join("blobs")).unwrap();
let data = b"delete pinned";
let hash = sha256(data);
store.put(hash, data.to_vec()).await.unwrap();
store.pin(&hash).await.unwrap();
assert!(store.is_pinned(&hash));
store.delete(&hash).await.unwrap();
assert_eq!(store.pin_count(&hash), 0);
}
}