use crate::checkpoint::Checkpoint;
use crate::compression::CompressionEngine;
use crate::error::{Result, TitorError};
use crate::types::{FileManifest, StorageMetadata, StorageObject, TitorConfig};
use chrono::Utc;
use dashmap::DashMap;
use parking_lot::{Mutex, RwLock};
use crate::collections::GxBuildHasher;
use sha2::{Digest, Sha256};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
pub struct Storage {
root: PathBuf,
compression: Arc<Mutex<CompressionEngine>>,
object_cache: Arc<DashMap<String, StorageObject, GxBuildHasher>>,
ref_counts: Arc<DashMap<String, usize, GxBuildHasher>>,
pending_ref_updates: Arc<DashMap<String, usize, GxBuildHasher>>,
metadata: Arc<RwLock<StorageMetadata>>,
}
impl std::fmt::Debug for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Storage")
.field("root", &self.root)
.field("object_cache_size", &self.object_cache.len())
.field("ref_counts_size", &self.ref_counts.len())
.field("pending_updates", &self.pending_ref_updates.len())
.finish()
}
}
impl Storage {
pub fn init(root: PathBuf, config: TitorConfig, compression: CompressionEngine) -> Result<Self> {
if root.exists() {
return Err(TitorError::StorageAlreadyExists(root));
}
fs::create_dir_all(&root)?;
fs::create_dir_all(root.join("checkpoints"))?;
fs::create_dir_all(root.join("objects"))?;
fs::create_dir_all(root.join("refs"))?;
let metadata = StorageMetadata {
format_version: 1,
titor_version: env!("CARGO_PKG_VERSION").to_string(),
created_at: Utc::now(),
last_accessed: Utc::now(),
config,
};
let metadata_path = root.join("metadata.json");
let metadata_json = serde_json::to_string_pretty(&metadata)?;
fs::write(metadata_path, metadata_json)?;
info!("Initialized storage at {:?}", root);
Ok(Self {
root,
compression: Arc::new(Mutex::new(compression)),
object_cache: Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default())),
ref_counts: Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default())),
pending_ref_updates: Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default())),
metadata: Arc::new(RwLock::new(metadata)),
})
}
pub fn init_or_open(root: PathBuf, config: TitorConfig, compression: CompressionEngine) -> Result<Self> {
if root.join("metadata.json").exists() {
Storage::open(root, compression)
} else {
Storage::init(root, config, compression)
}
}
pub fn open(root: PathBuf, compression: CompressionEngine) -> Result<Self> {
if !root.exists() {
return Err(TitorError::StorageNotInitialized(root));
}
let metadata_path = root.join("metadata.json");
let metadata_json = fs::read_to_string(&metadata_path)?;
let mut metadata: StorageMetadata = serde_json::from_str(&metadata_json)?;
metadata.last_accessed = Utc::now();
let ref_counts = Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default()));
let refs_dir = root.join("refs");
if refs_dir.exists() {
for entry in fs::read_dir(refs_dir)? {
let entry = entry?;
let hash = entry.file_name().to_string_lossy().to_string();
if let Ok(count_str) = fs::read_to_string(entry.path()) {
if let Ok(count) = count_str.trim().parse::<usize>() {
ref_counts.insert(hash, count);
}
}
}
}
let object_cache = Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default()));
let cache_path = root.join("object_cache.bin");
if cache_path.exists() {
match fs::read(&cache_path) {
Ok(cache_bytes) => {
match bincode::serde::decode_from_slice::<Vec<(String, StorageObject)>, _>(&cache_bytes, bincode::config::standard()) {
Ok((cache_entries, _)) => {
debug!("Loaded {} cached object entries", cache_entries.len());
for (hash, obj) in cache_entries {
object_cache.insert(hash, obj);
}
}
Err(e) => {
warn!("Failed to load object cache: {}", e);
fs::remove_file(&cache_path).ok();
}
}
}
Err(e) => {
warn!("Failed to read object cache: {}", e);
}
}
}
info!("Opened storage at {:?}", root);
Ok(Self {
root,
compression: Arc::new(Mutex::new(compression)),
object_cache,
ref_counts,
pending_ref_updates: Arc::new(DashMap::with_capacity_and_hasher(1000, GxBuildHasher::default())),
metadata: Arc::new(RwLock::new(metadata)),
})
}
pub fn store_object(&self, content: &[u8], path: &Path) -> Result<(String, u64)> {
let hash = compute_hash(content);
if self.object_exists(&hash)? {
debug!("Object {} already exists, incrementing ref count", &hash[..8]);
self.increment_ref_count(&hash)?;
let compressed_size = self.get_object_size(&hash).unwrap_or(content.len() as u64);
return Ok((hash, compressed_size));
}
let compressed = self.compression.lock().compress(path, content)?;
let is_compressed = compressed.len() < content.len();
let compressed_size = compressed.len() as u64;
let object_path = self.get_object_path(&hash);
let object_dir = object_path.parent().unwrap();
fs::create_dir_all(object_dir)?;
fs::write(&object_path, &compressed)?;
let obj = StorageObject {
hash: hash.clone(),
compressed_size,
uncompressed_size: content.len() as u64,
ref_count: 1,
is_compressed,
created_at: Utc::now(),
last_accessed: Utc::now(),
};
self.object_cache.insert(hash.clone(), obj);
self.set_ref_count(&hash, 1)?;
trace!("Stored object {} ({} bytes)", &hash[..8], compressed.len());
Ok((hash, compressed_size))
}
pub fn load_object(&self, hash: &str) -> Result<Vec<u8>> {
let object_path = self.get_object_path(hash);
if !object_path.exists() {
return Err(TitorError::ObjectNotFound(hash.to_string()));
}
let compressed = fs::read(&object_path)?;
let content = self.compression.lock().decompress(&compressed)?;
if let Some(mut obj) = self.object_cache.get_mut(hash) {
obj.last_accessed = Utc::now();
}
trace!("Loaded object {} ({} bytes)", &hash[..8], content.len());
Ok(content)
}
pub fn object_exists(&self, hash: &str) -> Result<bool> {
if self.object_cache.contains_key(hash) {
return Ok(true);
}
Ok(self.get_object_path(hash).exists())
}
pub fn store_checkpoint(&self, checkpoint: &Checkpoint) -> Result<()> {
let checkpoint_dir = self.root.join("checkpoints").join(&checkpoint.id);
fs::create_dir_all(&checkpoint_dir)?;
let metadata_path = checkpoint_dir.join("metadata.json");
let metadata_json = serde_json::to_string_pretty(checkpoint)?;
fs::write(metadata_path, metadata_json)?;
debug!("Stored checkpoint {}", checkpoint.short_id());
Ok(())
}
pub fn load_checkpoint(&self, checkpoint_id: &str) -> Result<Checkpoint> {
let metadata_path = self.root
.join("checkpoints")
.join(checkpoint_id)
.join("metadata.json");
if !metadata_path.exists() {
return Err(TitorError::CheckpointNotFound(checkpoint_id.to_string()));
}
let metadata_json = fs::read_to_string(metadata_path)?;
let checkpoint = serde_json::from_str(&metadata_json)?;
Ok(checkpoint)
}
pub fn store_manifest(&self, manifest: &FileManifest) -> Result<()> {
let manifest_path = self.root
.join("checkpoints")
.join(&manifest.checkpoint_id)
.join("manifest.bin");
let manifest_bytes = bincode::serde::encode_to_vec(manifest, bincode::config::standard())?;
fs::write(&manifest_path, &manifest_bytes)?;
debug!("Stored manifest for checkpoint {} ({} files, {} bytes)",
&manifest.checkpoint_id[..8], manifest.file_count, manifest_bytes.len());
Ok(())
}
pub fn load_manifest(&self, checkpoint_id: &str) -> Result<FileManifest> {
let manifest_path = self.root
.join("checkpoints")
.join(checkpoint_id)
.join("manifest.bin");
if !manifest_path.exists() {
return Err(TitorError::CheckpointNotFound(checkpoint_id.to_string()));
}
let manifest_bytes = fs::read(&manifest_path)?;
let (manifest, _): (FileManifest, _) = bincode::serde::decode_from_slice(&manifest_bytes, bincode::config::standard())?;
Ok(manifest)
}
pub fn list_checkpoints(&self) -> Result<Vec<String>> {
let checkpoints_dir = self.root.join("checkpoints");
let mut checkpoint_ids = Vec::new();
if checkpoints_dir.exists() {
for entry in fs::read_dir(checkpoints_dir)? {
let entry = entry?;
if entry.path().is_dir() {
checkpoint_ids.push(entry.file_name().to_string_lossy().to_string());
}
}
}
Ok(checkpoint_ids)
}
pub fn checkpoint_exists(&self, checkpoint_id: &str) -> bool {
self.root
.join("checkpoints")
.join(checkpoint_id)
.join("metadata.json")
.exists()
}
pub fn delete_checkpoint(&self, checkpoint_id: &str) -> Result<()> {
let checkpoint_dir = self.root.join("checkpoints").join(checkpoint_id);
if checkpoint_dir.exists() {
fs::remove_dir_all(checkpoint_dir)?;
info!("Deleted checkpoint {}", &checkpoint_id[..8]);
}
Ok(())
}
fn increment_ref_count(&self, hash: &str) -> Result<()> {
let mut count = self.ref_counts.entry(hash.to_string()).or_insert(0);
*count += 1;
self.pending_ref_updates.insert(hash.to_string(), *count);
Ok(())
}
pub fn decrement_ref_count(&self, hash: &str) -> Result<usize> {
let mut count = 0;
if let Some(mut ref_count) = self.ref_counts.get_mut(hash) {
if *ref_count > 0 {
*ref_count -= 1;
count = *ref_count;
}
}
self.pending_ref_updates.insert(hash.to_string(), count);
Ok(count)
}
fn set_ref_count(&self, hash: &str, count: usize) -> Result<()> {
self.ref_counts.insert(hash.to_string(), count);
self.pending_ref_updates.insert(hash.to_string(), count);
Ok(())
}
pub fn flush_ref_counts(&self) -> Result<()> {
let updates: Vec<_> = self.pending_ref_updates.iter()
.map(|entry| (entry.key().clone(), *entry.value()))
.collect();
for (hash, count) in updates {
self.persist_ref_count(&hash, count)?;
self.pending_ref_updates.remove(&hash);
}
Ok(())
}
fn persist_ref_count(&self, hash: &str, count: usize) -> Result<()> {
let ref_path = self.root.join("refs").join(hash);
if count == 0 {
if ref_path.exists() {
fs::remove_file(ref_path)?;
}
} else {
fs::write(ref_path, count.to_string())?;
}
Ok(())
}
pub fn delete_object(&self, hash: &str) -> Result<()> {
let object_path = self.get_object_path(hash);
if object_path.exists() {
fs::remove_file(object_path)?;
self.object_cache.remove(hash);
debug!("Deleted object {}", &hash[..8]);
}
Ok(())
}
pub fn get_object_size(&self, hash: &str) -> Result<u64> {
if let Some(obj) = self.object_cache.get(hash) {
return Ok(obj.compressed_size);
}
let object_path = self.get_object_path(hash);
if !object_path.exists() {
return Err(TitorError::ObjectNotFound(hash.to_string()));
}
let metadata = fs::metadata(&object_path)?;
Ok(metadata.len())
}
fn get_object_path(&self, hash: &str) -> PathBuf {
let (prefix, suffix) = hash.split_at(2);
self.root.join("objects").join(prefix).join(suffix)
}
pub fn persist_object_cache(&self) -> Result<()> {
let cache_path = self.root.join("object_cache.bin");
let cache_entries: Vec<(String, StorageObject)> = self.object_cache
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
if cache_entries.is_empty() {
if cache_path.exists() {
fs::remove_file(&cache_path).ok();
}
return Ok(());
}
let cache_bytes = bincode::serde::encode_to_vec(&cache_entries, bincode::config::standard())?;
fs::write(&cache_path, &cache_bytes)?;
debug!("Persisted {} object cache entries ({} bytes)",
cache_entries.len(), cache_bytes.len());
Ok(())
}
pub fn stats(&self) -> Result<StorageStats> {
let mut stats = StorageStats::default();
stats.object_count = self.object_cache.len();
if stats.object_count == 0 {
let objects_dir = self.root.join("objects");
if objects_dir.exists() {
let shard_count = fs::read_dir(objects_dir)?.count();
stats.object_count = shard_count * 100; }
}
stats.total_size = self.object_cache.iter()
.map(|entry| entry.value().compressed_size)
.sum();
stats.checkpoint_count = self.list_checkpoints()?.len();
stats.total_references = self.ref_counts.len();
Ok(stats)
}
pub fn get_unreferenced_objects(&self) -> Result<Vec<String>> {
let mut unreferenced = Vec::new();
let objects_dir = self.root.join("objects");
if objects_dir.exists() {
for shard_entry in fs::read_dir(objects_dir)? {
let shard_entry = shard_entry?;
if shard_entry.path().is_dir() {
let shard_name = shard_entry.file_name().to_string_lossy().to_string();
for object_entry in fs::read_dir(shard_entry.path())? {
let object_entry = object_entry?;
if object_entry.path().is_file() {
let object_name = object_entry.file_name().to_string_lossy().to_string();
let hash = format!("{}{}", shard_name, object_name);
let ref_count = self.ref_counts.get(&hash).map(|r| *r).unwrap_or(0);
if ref_count == 0 {
unreferenced.push(hash);
}
}
}
}
}
}
Ok(unreferenced)
}
pub fn list_all_objects(&self) -> Result<Vec<String>> {
let mut objects = Vec::new();
let objects_dir = self.root.join("objects");
if objects_dir.exists() {
for shard_entry in fs::read_dir(objects_dir)? {
let shard_entry = shard_entry?;
if shard_entry.path().is_dir() {
let shard_name = shard_entry.file_name().to_string_lossy().to_string();
for object_entry in fs::read_dir(shard_entry.path())? {
let object_entry = object_entry?;
if object_entry.path().is_file() {
let object_name = object_entry.file_name().to_string_lossy().to_string();
let hash = format!("{}{}", shard_name, object_name);
objects.push(hash);
}
}
}
}
}
Ok(objects)
}
pub fn update_metadata<F>(&self, updater: F) -> Result<()>
where
F: FnOnce(&mut StorageMetadata),
{
let mut metadata = self.metadata.write();
updater(&mut metadata);
metadata.last_accessed = Utc::now();
let metadata_path = self.root.join("metadata.json");
let metadata_json = serde_json::to_string_pretty(&*metadata)?;
fs::write(metadata_path, metadata_json)?;
Ok(())
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn metadata(&self) -> &RwLock<StorageMetadata> {
&self.metadata
}
}
#[derive(Debug, Default)]
pub struct StorageStats {
pub object_count: usize,
pub total_size: u64,
pub checkpoint_count: usize,
pub total_references: usize,
}
fn compute_hash(content: &[u8]) -> String {
let mut hasher = Sha256::new();
hasher.update(content);
hex::encode(hasher.finalize())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compression::CompressionStrategy;
use tempfile::TempDir;
fn create_test_storage() -> (Storage, TempDir) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
std::fs::remove_dir_all(&path).ok();
let config = TitorConfig {
root_path: PathBuf::from("/test"),
storage_path: path.clone(),
max_file_size: 0,
parallel_workers: 4,
ignore_patterns: vec![],
compression_strategy: "fast".to_string(),
follow_symlinks: false,
version: env!("CARGO_PKG_VERSION").to_string(),
};
let compression = CompressionEngine::new(CompressionStrategy::Fast);
let storage = Storage::init(
path,
config,
compression,
).unwrap();
(storage, temp_dir)
}
#[test]
fn test_storage_init_and_open() {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().to_path_buf();
std::fs::remove_dir_all(&path).ok();
let config = TitorConfig {
root_path: PathBuf::from("/test"),
storage_path: path.clone(),
max_file_size: 0,
parallel_workers: 4,
ignore_patterns: vec![],
compression_strategy: "fast".to_string(),
follow_symlinks: false,
version: env!("CARGO_PKG_VERSION").to_string(),
};
let compression = CompressionEngine::new(CompressionStrategy::Fast);
let _storage = Storage::init(path.clone(), config, compression).unwrap();
assert!(path.join("checkpoints").exists());
assert!(path.join("objects").exists());
assert!(path.join("refs").exists());
assert!(path.join("metadata.json").exists());
let compression2 = CompressionEngine::new(CompressionStrategy::Fast);
let _storage2 = Storage::open(path, compression2).unwrap();
}
#[test]
fn test_object_storage() {
let (storage, _temp_dir) = create_test_storage();
let content = b"Hello, World!";
let path = PathBuf::from("test.txt");
let (hash, _compressed_size) = storage.store_object(content, &path).unwrap();
assert_eq!(hash.len(), 64);
assert!(storage.object_exists(&hash).unwrap());
let loaded = storage.load_object(&hash).unwrap();
assert_eq!(loaded, content);
let (hash2, _compressed_size2) = storage.store_object(content, &path).unwrap();
assert_eq!(hash, hash2);
}
#[test]
fn test_reference_counting() {
let (storage, _temp_dir) = create_test_storage();
let content = b"Test content";
let path = PathBuf::from("test.txt");
let (hash, _compressed_size) = storage.store_object(content, &path).unwrap();
assert_eq!(*storage.ref_counts.get(&hash).unwrap(), 1);
let _ = storage.store_object(content, &path).unwrap();
assert_eq!(*storage.ref_counts.get(&hash).unwrap(), 2);
let count = storage.decrement_ref_count(&hash).unwrap();
assert_eq!(count, 1);
let count = storage.decrement_ref_count(&hash).unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_checkpoint_storage() {
let (storage, _temp_dir) = create_test_storage();
let checkpoint = Checkpoint::new(
None,
Some("Test checkpoint".to_string()),
crate::checkpoint::CheckpointMetadataBuilder::new()
.file_count(10)
.total_size(1000)
.build(),
"merkle_root".to_string(),
);
let checkpoint_id = checkpoint.id.clone();
storage.store_checkpoint(&checkpoint).unwrap();
assert!(storage.checkpoint_exists(&checkpoint_id));
let loaded = storage.load_checkpoint(&checkpoint_id).unwrap();
assert_eq!(loaded.id, checkpoint_id);
assert_eq!(loaded.description, Some("Test checkpoint".to_string()));
let checkpoints = storage.list_checkpoints().unwrap();
assert_eq!(checkpoints.len(), 1);
assert_eq!(checkpoints[0], checkpoint_id);
storage.delete_checkpoint(&checkpoint_id).unwrap();
assert!(!storage.checkpoint_exists(&checkpoint_id));
}
#[test]
fn test_sharding() {
let (storage, temp_dir) = create_test_storage();
let content = b"Sharded content";
let path = PathBuf::from("test.txt");
let (hash, _compressed_size) = storage.store_object(content, &path).unwrap();
let object_path = storage.get_object_path(&hash);
assert!(object_path.exists());
let prefix = &hash[..2];
let suffix = &hash[2..];
let expected_path = temp_dir.path()
.join("objects")
.join(prefix)
.join(suffix);
assert_eq!(object_path, expected_path);
}
}