use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct DedupConfig {
pub min_ref_count: u32,
pub enable_inline_dedup: bool,
pub enable_background_dedup: bool,
pub min_chunk_size: usize,
}
impl Default for DedupConfig {
fn default() -> Self {
Self {
min_ref_count: 2,
enable_inline_dedup: true,
enable_background_dedup: true,
min_chunk_size: 4096, }
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChunkRef {
pub hash: [u8; 32],
pub size: u64,
pub ref_count: u32,
pub storage_path: String,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DedupEntry {
pub hash: [u8; 32],
pub references: Vec<ChunkReference>,
pub bytes_saved: u64,
pub created_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ChunkReference {
pub cid: String,
pub chunk_index: u64,
}
pub struct DedupStore {
config: DedupConfig,
base_path: PathBuf,
index: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
content_chunks: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>,
stats: Arc<RwLock<DedupStats>>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct DedupStats {
pub unique_chunks: u64,
pub total_references: u64,
pub bytes_saved: u64,
pub bytes_stored: u64,
pub dedup_ratio: f64,
pub space_savings_percent: f64,
}
impl DedupStats {
#[inline]
pub fn update(&mut self) {
if self.unique_chunks > 0 {
self.dedup_ratio = self.total_references as f64 / self.unique_chunks as f64;
}
let total_logical = self.bytes_stored + self.bytes_saved;
if total_logical > 0 {
self.space_savings_percent = (self.bytes_saved as f64 / total_logical as f64) * 100.0;
}
}
}
#[derive(Debug, Clone)]
pub enum StoreResult {
Stored { hash: [u8; 32], size: u64 },
Deduplicated { hash: [u8; 32], bytes_saved: u64 },
}
impl DedupStore {
pub async fn new(base_path: PathBuf, config: DedupConfig) -> std::io::Result<Self> {
fs::create_dir_all(&base_path).await?;
fs::create_dir_all(base_path.join("chunks")).await?;
fs::create_dir_all(base_path.join("meta")).await?;
let store = Self {
config,
base_path,
index: Arc::new(RwLock::new(HashMap::new())),
content_chunks: Arc::new(RwLock::new(HashMap::new())),
stats: Arc::new(RwLock::new(DedupStats::default())),
};
store.load_index().await?;
Ok(store)
}
pub async fn store_chunk(
&self,
cid: &str,
_chunk_index: u64,
data: &[u8],
) -> std::io::Result<StoreResult> {
if data.len() < self.config.min_chunk_size {
return Ok(StoreResult::Stored {
hash: [0u8; 32],
size: data.len() as u64,
});
}
let hash = chie_crypto::hash(data);
let mut index = self.index.write().await;
let mut content_chunks = self.content_chunks.write().await;
let mut stats = self.stats.write().await;
if let Some(chunk_ref) = index.get_mut(&hash) {
chunk_ref.ref_count += 1;
let bytes_saved = data.len() as u64;
content_chunks
.entry(cid.to_string())
.or_default()
.push(hash);
stats.total_references += 1;
stats.bytes_saved += bytes_saved;
stats.update();
debug!(
"Deduplicated chunk: {} refs for hash {:?}",
chunk_ref.ref_count,
hex::encode(&hash[..8])
);
return Ok(StoreResult::Deduplicated { hash, bytes_saved });
}
let storage_path = self.chunk_path(&hash);
if let Some(parent) = storage_path.parent() {
fs::create_dir_all(parent).await?;
}
fs::write(&storage_path, data).await?;
let chunk_ref = ChunkRef {
hash,
size: data.len() as u64,
ref_count: 1,
storage_path: storage_path.to_string_lossy().to_string(),
};
index.insert(hash, chunk_ref);
content_chunks
.entry(cid.to_string())
.or_default()
.push(hash);
stats.unique_chunks += 1;
stats.total_references += 1;
stats.bytes_stored += data.len() as u64;
stats.update();
drop(index);
drop(content_chunks);
drop(stats);
self.save_index().await?;
Ok(StoreResult::Stored {
hash,
size: data.len() as u64,
})
}
pub async fn get_chunk(&self, hash: &[u8; 32]) -> std::io::Result<Option<Vec<u8>>> {
let index = self.index.read().await;
if let Some(chunk_ref) = index.get(hash) {
let path = Path::new(&chunk_ref.storage_path);
if path.exists() {
let data = fs::read(path).await?;
return Ok(Some(data));
}
}
Ok(None)
}
pub async fn get_content_chunk(
&self,
cid: &str,
chunk_index: u64,
) -> std::io::Result<Option<Vec<u8>>> {
let content_chunks = self.content_chunks.read().await;
if let Some(hashes) = content_chunks.get(cid) {
if let Some(hash) = hashes.get(chunk_index as usize) {
return self.get_chunk(hash).await;
}
}
Ok(None)
}
pub async fn remove_content(&self, cid: &str) -> std::io::Result<u64> {
let mut index = self.index.write().await;
let mut content_chunks = self.content_chunks.write().await;
let mut stats = self.stats.write().await;
let mut bytes_freed = 0u64;
if let Some(hashes) = content_chunks.remove(cid) {
for hash in hashes {
if let Some(chunk_ref) = index.get_mut(&hash) {
chunk_ref.ref_count -= 1;
stats.total_references -= 1;
if chunk_ref.ref_count == 0 {
let path = Path::new(&chunk_ref.storage_path);
if path.exists() {
fs::remove_file(path).await?;
}
bytes_freed += chunk_ref.size;
stats.unique_chunks -= 1;
stats.bytes_stored -= chunk_ref.size;
index.remove(&hash);
}
}
}
}
stats.update();
drop(index);
drop(content_chunks);
drop(stats);
self.save_index().await?;
info!("Removed content {} - freed {} bytes", cid, bytes_freed);
Ok(bytes_freed)
}
#[must_use]
#[inline]
pub async fn stats(&self) -> DedupStats {
self.stats.read().await.clone()
}
#[must_use]
#[inline]
pub async fn contains(&self, hash: &[u8; 32]) -> bool {
let index = self.index.read().await;
index.contains_key(hash)
}
#[must_use]
#[inline]
pub async fn ref_count(&self, hash: &[u8; 32]) -> Option<u32> {
let index = self.index.read().await;
index.get(hash).map(|r| r.ref_count)
}
#[must_use]
#[inline]
pub async fn list_content(&self) -> Vec<String> {
let content_chunks = self.content_chunks.read().await;
content_chunks.keys().cloned().collect()
}
#[must_use]
#[inline]
pub async fn content_info(&self, cid: &str) -> Option<ContentDedupInfo> {
let index = self.index.read().await;
let content_chunks = self.content_chunks.read().await;
if let Some(hashes) = content_chunks.get(cid) {
let mut total_size = 0u64;
let mut unique_chunks = 0u64;
let mut shared_chunks = 0u64;
for hash in hashes {
if let Some(chunk_ref) = index.get(hash) {
total_size += chunk_ref.size;
if chunk_ref.ref_count == 1 {
unique_chunks += 1;
} else {
shared_chunks += 1;
}
}
}
return Some(ContentDedupInfo {
cid: cid.to_string(),
total_chunks: hashes.len() as u64,
unique_chunks,
shared_chunks,
total_size,
});
}
None
}
pub async fn gc(&self) -> std::io::Result<GcResult> {
let mut index = self.index.write().await;
let mut stats = self.stats.write().await;
let mut orphaned: Vec<[u8; 32]> = Vec::new();
let mut bytes_freed = 0u64;
for (hash, chunk_ref) in index.iter() {
if chunk_ref.ref_count == 0 {
orphaned.push(*hash);
}
}
for hash in &orphaned {
if let Some(chunk_ref) = index.remove(hash) {
let path = Path::new(&chunk_ref.storage_path);
if path.exists() {
fs::remove_file(path).await?;
}
bytes_freed += chunk_ref.size;
stats.unique_chunks -= 1;
stats.bytes_stored -= chunk_ref.size;
}
}
stats.update();
info!(
"GC completed: {} orphaned chunks removed, {} bytes freed",
orphaned.len(),
bytes_freed
);
Ok(GcResult {
chunks_removed: orphaned.len() as u64,
bytes_freed,
})
}
fn chunk_path(&self, hash: &[u8; 32]) -> PathBuf {
let hash_hex = hex::encode(hash);
let subdir = &hash_hex[..2];
self.base_path.join("chunks").join(subdir).join(&hash_hex)
}
async fn load_index(&self) -> std::io::Result<()> {
let index_path = self.base_path.join("meta").join("index.json");
if !index_path.exists() {
return Ok(());
}
let data = fs::read(&index_path).await?;
let saved: SavedIndex = serde_json::from_slice(&data)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let mut index = self.index.write().await;
for (hex_key, value) in saved.chunks {
if let Ok(bytes) = hex::decode(&hex_key) {
if bytes.len() == 32 {
let mut key = [0u8; 32];
key.copy_from_slice(&bytes);
index.insert(key, value);
}
}
}
let mut content_chunks = self.content_chunks.write().await;
for (cid, hex_hashes) in saved.content_chunks {
let hashes: Vec<[u8; 32]> = hex_hashes
.iter()
.filter_map(|h| {
hex::decode(h).ok().and_then(|bytes| {
if bytes.len() == 32 {
let mut arr = [0u8; 32];
arr.copy_from_slice(&bytes);
Some(arr)
} else {
None
}
})
})
.collect();
content_chunks.insert(cid, hashes);
}
let mut stats = self.stats.write().await;
*stats = saved.stats;
Ok(())
}
async fn save_index(&self) -> std::io::Result<()> {
let index = self.index.read().await;
let content_chunks = self.content_chunks.read().await;
let stats = self.stats.read().await;
let chunks_hex: HashMap<String, ChunkRef> = index
.iter()
.map(|(k, v)| (hex::encode(k), v.clone()))
.collect();
let content_chunks_hex: HashMap<String, Vec<String>> = content_chunks
.iter()
.map(|(k, v)| (k.clone(), v.iter().map(hex::encode).collect()))
.collect();
let saved = SavedIndex {
chunks: chunks_hex,
content_chunks: content_chunks_hex,
stats: stats.clone(),
};
let data = serde_json::to_vec_pretty(&saved)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let index_path = self.base_path.join("meta").join("index.json");
fs::write(&index_path, data).await?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ContentDedupInfo {
pub cid: String,
pub total_chunks: u64,
pub unique_chunks: u64,
pub shared_chunks: u64,
pub total_size: u64,
}
#[derive(Debug, Clone)]
pub struct GcResult {
pub chunks_removed: u64,
pub bytes_freed: u64,
}
#[derive(Debug, serde::Serialize, serde::Deserialize)]
struct SavedIndex {
chunks: HashMap<String, ChunkRef>,
content_chunks: HashMap<String, Vec<String>>,
stats: DedupStats,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ReferenceEntry {
pub cid: String,
pub chunk_index: u64,
pub created_at: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EnhancedChunkRef {
pub hash: [u8; 32],
pub size: u64,
pub references: Vec<ReferenceEntry>,
pub storage_path: String,
}
impl EnhancedChunkRef {
#[must_use]
#[inline]
pub fn ref_count(&self) -> u32 {
self.references.len() as u32
}
#[must_use]
#[inline]
pub fn is_referenced_by(&self, cid: &str) -> bool {
self.references.iter().any(|r| r.cid == cid)
}
#[must_use]
#[inline]
pub fn get_referencing_cids(&self) -> Vec<String> {
self.references.iter().map(|r| r.cid.clone()).collect()
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct IntegrityCheckResult {
pub chunks_checked: u64,
pub mismatches_found: u64,
pub orphaned_chunks: u64,
pub missing_files: u64,
pub orphaned_bytes: u64,
}
impl DedupStore {
#[must_use]
pub async fn get_chunk_references(&self, hash: &[u8; 32]) -> Option<Vec<ChunkReference>> {
let content_chunks = self.content_chunks.read().await;
let mut references = Vec::new();
for (cid, hashes) in content_chunks.iter() {
for (index, chunk_hash) in hashes.iter().enumerate() {
if chunk_hash == hash {
references.push(ChunkReference {
cid: cid.clone(),
chunk_index: index as u64,
});
}
}
}
if references.is_empty() {
None
} else {
Some(references)
}
}
#[must_use]
pub async fn get_content_chunks_detailed(&self, cid: &str) -> Option<Vec<EnhancedChunkRef>> {
let chunk_data: Vec<([u8; 32], u64, String)> = {
let index = self.index.read().await;
let content_chunks = self.content_chunks.read().await;
if let Some(hashes) = content_chunks.get(cid) {
hashes
.iter()
.filter_map(|hash| {
index.get(hash).map(|chunk_ref| {
(*hash, chunk_ref.size, chunk_ref.storage_path.clone())
})
})
.collect()
} else {
return None;
}
};
let mut result = Vec::new();
for (hash, size, storage_path) in chunk_data {
let refs = self.get_chunk_references(&hash).await.unwrap_or_default();
let references: Vec<ReferenceEntry> = refs
.into_iter()
.map(|r| ReferenceEntry {
cid: r.cid,
chunk_index: r.chunk_index,
created_at: current_timestamp(),
})
.collect();
result.push(EnhancedChunkRef {
hash,
size,
references,
storage_path,
});
}
Some(result)
}
pub async fn verify_integrity(&self) -> std::io::Result<IntegrityCheckResult> {
let index = self.index.read().await;
let content_chunks = self.content_chunks.read().await;
let mut chunks_checked = 0u64;
let mut mismatches_found = 0u64;
let mut orphaned_chunks = 0u64;
let mut missing_files = 0u64;
let mut orphaned_bytes = 0u64;
let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
for hashes in content_chunks.values() {
for hash in hashes {
*actual_refs.entry(*hash).or_insert(0) += 1;
}
}
for (hash, chunk_ref) in index.iter() {
chunks_checked += 1;
let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
if actual_count != chunk_ref.ref_count {
mismatches_found += 1;
tracing::warn!(
"Ref count mismatch for chunk {:?}: stored={}, actual={}",
hex::encode(&hash[..8]),
chunk_ref.ref_count,
actual_count
);
}
if actual_count == 0 {
orphaned_chunks += 1;
orphaned_bytes += chunk_ref.size;
}
let path = Path::new(&chunk_ref.storage_path);
if !path.exists() {
missing_files += 1;
tracing::warn!(
"Missing chunk file for hash {:?}: {}",
hex::encode(&hash[..8]),
chunk_ref.storage_path
);
}
}
Ok(IntegrityCheckResult {
chunks_checked,
mismatches_found,
orphaned_chunks,
missing_files,
orphaned_bytes,
})
}
pub async fn repair_references(&self) -> std::io::Result<u64> {
let mut index = self.index.write().await;
let content_chunks = self.content_chunks.read().await;
let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
for hashes in content_chunks.values() {
for hash in hashes {
*actual_refs.entry(*hash).or_insert(0) += 1;
}
}
let mut repaired = 0u64;
for (hash, chunk_ref) in index.iter_mut() {
let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
if actual_count != chunk_ref.ref_count {
tracing::info!(
"Repairing chunk {:?}: {} -> {}",
hex::encode(&hash[..8]),
chunk_ref.ref_count,
actual_count
);
chunk_ref.ref_count = actual_count;
repaired += 1;
}
}
drop(index);
drop(content_chunks);
if repaired > 0 {
self.save_index().await?;
}
info!("Repaired {} reference counts", repaired);
Ok(repaired)
}
#[must_use]
pub async fn ref_count_distribution(&self) -> HashMap<u32, u64> {
let index = self.index.read().await;
let mut distribution: HashMap<u32, u64> = HashMap::new();
for chunk_ref in index.values() {
*distribution.entry(chunk_ref.ref_count).or_insert(0) += 1;
}
distribution
}
#[must_use]
pub async fn most_referenced_chunks(&self, limit: usize) -> Vec<([u8; 32], u32, u64)> {
let index = self.index.read().await;
let mut chunks: Vec<_> = index
.iter()
.map(|(hash, chunk_ref)| (*hash, chunk_ref.ref_count, chunk_ref.size))
.collect();
chunks.sort_by(|a, b| b.1.cmp(&a.1));
chunks.truncate(limit);
chunks
}
#[must_use]
pub async fn calculate_removal_impact(&self, cid: &str) -> Option<RemovalImpact> {
let index = self.index.read().await;
let content_chunks = self.content_chunks.read().await;
if let Some(hashes) = content_chunks.get(cid) {
let mut bytes_freed = 0u64;
let mut exclusive_chunks = 0u64;
let mut shared_chunks = 0u64;
for hash in hashes {
if let Some(chunk_ref) = index.get(hash) {
if chunk_ref.ref_count == 1 {
bytes_freed += chunk_ref.size;
exclusive_chunks += 1;
} else {
shared_chunks += 1;
}
}
}
Some(RemovalImpact {
cid: cid.to_string(),
bytes_freed,
exclusive_chunks,
shared_chunks,
total_chunks: hashes.len() as u64,
})
} else {
None
}
}
pub async fn add_references_batch(
&self,
cid: &str,
chunk_hashes: Vec<[u8; 32]>,
) -> std::io::Result<u64> {
let mut index = self.index.write().await;
let mut content_chunks = self.content_chunks.write().await;
let mut refs_added = 0u64;
for hash in &chunk_hashes {
if let Some(chunk_ref) = index.get_mut(hash) {
chunk_ref.ref_count += 1;
refs_added += 1;
}
}
content_chunks.insert(cid.to_string(), chunk_hashes);
drop(index);
drop(content_chunks);
self.save_index().await?;
Ok(refs_added)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct RemovalImpact {
pub cid: String,
pub bytes_freed: u64,
pub exclusive_chunks: u64,
pub shared_chunks: u64,
pub total_chunks: u64,
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub async fn find_duplicates(store: &DedupStore, cid1: &str, cid2: &str) -> Vec<[u8; 32]> {
let content_chunks = store.content_chunks.read().await;
let hashes1 = content_chunks.get(cid1);
let hashes2 = content_chunks.get(cid2);
match (hashes1, hashes2) {
(Some(h1), Some(h2)) => {
let set1: std::collections::HashSet<_> = h1.iter().collect();
h2.iter().filter(|h| set1.contains(h)).copied().collect()
}
_ => Vec::new(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_dedup_store() {
let temp_dir = std::env::temp_dir().join("chie_dedup_test");
let _ = fs::remove_dir_all(&temp_dir).await;
let store = DedupStore::new(temp_dir.clone(), DedupConfig::default())
.await
.unwrap();
let data = vec![0u8; 8192];
let result1 = store.store_chunk("cid1", 0, &data).await.unwrap();
assert!(matches!(result1, StoreResult::Stored { .. }));
let result2 = store.store_chunk("cid2", 0, &data).await.unwrap();
assert!(matches!(result2, StoreResult::Deduplicated { .. }));
let stats = store.stats().await;
assert_eq!(stats.unique_chunks, 1);
assert_eq!(stats.total_references, 2);
assert!(stats.bytes_saved > 0);
let _ = fs::remove_dir_all(&temp_dir).await;
}
}