1use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::fs;
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13#[derive(Debug, Clone)]
15pub struct DedupConfig {
16 pub min_ref_count: u32,
18 pub enable_inline_dedup: bool,
20 pub enable_background_dedup: bool,
22 pub min_chunk_size: usize,
24}
25
26impl Default for DedupConfig {
27 fn default() -> Self {
28 Self {
29 min_ref_count: 2,
30 enable_inline_dedup: true,
31 enable_background_dedup: true,
32 min_chunk_size: 4096, }
34 }
35}
36
37#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct ChunkRef {
40 pub hash: [u8; 32],
42 pub size: u64,
44 pub ref_count: u32,
46 pub storage_path: String,
48}
49
50#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct DedupEntry {
53 pub hash: [u8; 32],
55 pub references: Vec<ChunkReference>,
57 pub bytes_saved: u64,
59 pub created_at: chrono::DateTime<chrono::Utc>,
61}
62
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct ChunkReference {
66 pub cid: String,
68 pub chunk_index: u64,
70}
71
72pub struct DedupStore {
74 config: DedupConfig,
75 base_path: PathBuf,
77 index: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
79 content_chunks: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>,
81 stats: Arc<RwLock<DedupStats>>,
83}
84
85#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
87pub struct DedupStats {
88 pub unique_chunks: u64,
90 pub total_references: u64,
92 pub bytes_saved: u64,
94 pub bytes_stored: u64,
96 pub dedup_ratio: f64,
98 pub space_savings_percent: f64,
100}
101
102impl DedupStats {
103 #[inline]
105 pub fn update(&mut self) {
106 if self.unique_chunks > 0 {
107 self.dedup_ratio = self.total_references as f64 / self.unique_chunks as f64;
108 }
109 let total_logical = self.bytes_stored + self.bytes_saved;
110 if total_logical > 0 {
111 self.space_savings_percent = (self.bytes_saved as f64 / total_logical as f64) * 100.0;
112 }
113 }
114}
115
116#[derive(Debug, Clone)]
118pub enum StoreResult {
119 Stored { hash: [u8; 32], size: u64 },
121 Deduplicated { hash: [u8; 32], bytes_saved: u64 },
123}
124
125impl DedupStore {
126 pub async fn new(base_path: PathBuf, config: DedupConfig) -> std::io::Result<Self> {
128 fs::create_dir_all(&base_path).await?;
130 fs::create_dir_all(base_path.join("chunks")).await?;
131 fs::create_dir_all(base_path.join("meta")).await?;
132
133 let store = Self {
134 config,
135 base_path,
136 index: Arc::new(RwLock::new(HashMap::new())),
137 content_chunks: Arc::new(RwLock::new(HashMap::new())),
138 stats: Arc::new(RwLock::new(DedupStats::default())),
139 };
140
141 store.load_index().await?;
143
144 Ok(store)
145 }
146
147 pub async fn store_chunk(
149 &self,
150 cid: &str,
151 _chunk_index: u64,
152 data: &[u8],
153 ) -> std::io::Result<StoreResult> {
154 if data.len() < self.config.min_chunk_size {
156 return Ok(StoreResult::Stored {
157 hash: [0u8; 32],
158 size: data.len() as u64,
159 });
160 }
161
162 let hash = chie_crypto::hash(data);
164
165 let mut index = self.index.write().await;
166 let mut content_chunks = self.content_chunks.write().await;
167 let mut stats = self.stats.write().await;
168
169 if let Some(chunk_ref) = index.get_mut(&hash) {
171 chunk_ref.ref_count += 1;
173 let bytes_saved = data.len() as u64;
174
175 content_chunks
177 .entry(cid.to_string())
178 .or_default()
179 .push(hash);
180
181 stats.total_references += 1;
183 stats.bytes_saved += bytes_saved;
184 stats.update();
185
186 debug!(
187 "Deduplicated chunk: {} refs for hash {:?}",
188 chunk_ref.ref_count,
189 hex::encode(&hash[..8])
190 );
191
192 return Ok(StoreResult::Deduplicated { hash, bytes_saved });
193 }
194
195 let storage_path = self.chunk_path(&hash);
197 if let Some(parent) = storage_path.parent() {
199 fs::create_dir_all(parent).await?;
200 }
201 fs::write(&storage_path, data).await?;
202
203 let chunk_ref = ChunkRef {
204 hash,
205 size: data.len() as u64,
206 ref_count: 1,
207 storage_path: storage_path.to_string_lossy().to_string(),
208 };
209
210 index.insert(hash, chunk_ref);
211
212 content_chunks
214 .entry(cid.to_string())
215 .or_default()
216 .push(hash);
217
218 stats.unique_chunks += 1;
220 stats.total_references += 1;
221 stats.bytes_stored += data.len() as u64;
222 stats.update();
223
224 drop(index);
226 drop(content_chunks);
227 drop(stats);
228 self.save_index().await?;
229
230 Ok(StoreResult::Stored {
231 hash,
232 size: data.len() as u64,
233 })
234 }
235
236 pub async fn get_chunk(&self, hash: &[u8; 32]) -> std::io::Result<Option<Vec<u8>>> {
238 let index = self.index.read().await;
239
240 if let Some(chunk_ref) = index.get(hash) {
241 let path = Path::new(&chunk_ref.storage_path);
242 if path.exists() {
243 let data = fs::read(path).await?;
244 return Ok(Some(data));
245 }
246 }
247
248 Ok(None)
249 }
250
251 pub async fn get_content_chunk(
253 &self,
254 cid: &str,
255 chunk_index: u64,
256 ) -> std::io::Result<Option<Vec<u8>>> {
257 let content_chunks = self.content_chunks.read().await;
258
259 if let Some(hashes) = content_chunks.get(cid) {
260 if let Some(hash) = hashes.get(chunk_index as usize) {
261 return self.get_chunk(hash).await;
262 }
263 }
264
265 Ok(None)
266 }
267
268 pub async fn remove_content(&self, cid: &str) -> std::io::Result<u64> {
270 let mut index = self.index.write().await;
271 let mut content_chunks = self.content_chunks.write().await;
272 let mut stats = self.stats.write().await;
273
274 let mut bytes_freed = 0u64;
275
276 if let Some(hashes) = content_chunks.remove(cid) {
277 for hash in hashes {
278 if let Some(chunk_ref) = index.get_mut(&hash) {
279 chunk_ref.ref_count -= 1;
280 stats.total_references -= 1;
281
282 if chunk_ref.ref_count == 0 {
283 let path = Path::new(&chunk_ref.storage_path);
285 if path.exists() {
286 fs::remove_file(path).await?;
287 }
288 bytes_freed += chunk_ref.size;
289 stats.unique_chunks -= 1;
290 stats.bytes_stored -= chunk_ref.size;
291 index.remove(&hash);
292 }
293 }
294 }
295 }
296
297 stats.update();
298
299 drop(index);
300 drop(content_chunks);
301 drop(stats);
302 self.save_index().await?;
303
304 info!("Removed content {} - freed {} bytes", cid, bytes_freed);
305 Ok(bytes_freed)
306 }
307
308 #[must_use]
310 #[inline]
311 pub async fn stats(&self) -> DedupStats {
312 self.stats.read().await.clone()
313 }
314
315 #[must_use]
317 #[inline]
318 pub async fn contains(&self, hash: &[u8; 32]) -> bool {
319 let index = self.index.read().await;
320 index.contains_key(hash)
321 }
322
323 #[must_use]
325 #[inline]
326 pub async fn ref_count(&self, hash: &[u8; 32]) -> Option<u32> {
327 let index = self.index.read().await;
328 index.get(hash).map(|r| r.ref_count)
329 }
330
331 #[must_use]
333 #[inline]
334 pub async fn list_content(&self) -> Vec<String> {
335 let content_chunks = self.content_chunks.read().await;
336 content_chunks.keys().cloned().collect()
337 }
338
339 #[must_use]
341 #[inline]
342 pub async fn content_info(&self, cid: &str) -> Option<ContentDedupInfo> {
343 let index = self.index.read().await;
344 let content_chunks = self.content_chunks.read().await;
345
346 if let Some(hashes) = content_chunks.get(cid) {
347 let mut total_size = 0u64;
348 let mut unique_chunks = 0u64;
349 let mut shared_chunks = 0u64;
350
351 for hash in hashes {
352 if let Some(chunk_ref) = index.get(hash) {
353 total_size += chunk_ref.size;
354 if chunk_ref.ref_count == 1 {
355 unique_chunks += 1;
356 } else {
357 shared_chunks += 1;
358 }
359 }
360 }
361
362 return Some(ContentDedupInfo {
363 cid: cid.to_string(),
364 total_chunks: hashes.len() as u64,
365 unique_chunks,
366 shared_chunks,
367 total_size,
368 });
369 }
370
371 None
372 }
373
374 pub async fn gc(&self) -> std::io::Result<GcResult> {
376 let mut index = self.index.write().await;
377 let mut stats = self.stats.write().await;
378
379 let mut orphaned: Vec<[u8; 32]> = Vec::new();
380 let mut bytes_freed = 0u64;
381
382 for (hash, chunk_ref) in index.iter() {
383 if chunk_ref.ref_count == 0 {
384 orphaned.push(*hash);
385 }
386 }
387
388 for hash in &orphaned {
389 if let Some(chunk_ref) = index.remove(hash) {
390 let path = Path::new(&chunk_ref.storage_path);
391 if path.exists() {
392 fs::remove_file(path).await?;
393 }
394 bytes_freed += chunk_ref.size;
395 stats.unique_chunks -= 1;
396 stats.bytes_stored -= chunk_ref.size;
397 }
398 }
399
400 stats.update();
401
402 info!(
403 "GC completed: {} orphaned chunks removed, {} bytes freed",
404 orphaned.len(),
405 bytes_freed
406 );
407
408 Ok(GcResult {
409 chunks_removed: orphaned.len() as u64,
410 bytes_freed,
411 })
412 }
413
414 fn chunk_path(&self, hash: &[u8; 32]) -> PathBuf {
417 let hash_hex = hex::encode(hash);
418 let subdir = &hash_hex[..2];
420 self.base_path.join("chunks").join(subdir).join(&hash_hex)
421 }
422
423 async fn load_index(&self) -> std::io::Result<()> {
424 let index_path = self.base_path.join("meta").join("index.json");
425 if !index_path.exists() {
426 return Ok(());
427 }
428
429 let data = fs::read(&index_path).await?;
430 let saved: SavedIndex = serde_json::from_slice(&data)
431 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
432
433 let mut index = self.index.write().await;
435 for (hex_key, value) in saved.chunks {
436 if let Ok(bytes) = hex::decode(&hex_key) {
437 if bytes.len() == 32 {
438 let mut key = [0u8; 32];
439 key.copy_from_slice(&bytes);
440 index.insert(key, value);
441 }
442 }
443 }
444
445 let mut content_chunks = self.content_chunks.write().await;
446 for (cid, hex_hashes) in saved.content_chunks {
447 let hashes: Vec<[u8; 32]> = hex_hashes
448 .iter()
449 .filter_map(|h| {
450 hex::decode(h).ok().and_then(|bytes| {
451 if bytes.len() == 32 {
452 let mut arr = [0u8; 32];
453 arr.copy_from_slice(&bytes);
454 Some(arr)
455 } else {
456 None
457 }
458 })
459 })
460 .collect();
461 content_chunks.insert(cid, hashes);
462 }
463
464 let mut stats = self.stats.write().await;
465 *stats = saved.stats;
466
467 Ok(())
468 }
469
470 async fn save_index(&self) -> std::io::Result<()> {
471 let index = self.index.read().await;
472 let content_chunks = self.content_chunks.read().await;
473 let stats = self.stats.read().await;
474
475 let chunks_hex: HashMap<String, ChunkRef> = index
477 .iter()
478 .map(|(k, v)| (hex::encode(k), v.clone()))
479 .collect();
480
481 let content_chunks_hex: HashMap<String, Vec<String>> = content_chunks
482 .iter()
483 .map(|(k, v)| (k.clone(), v.iter().map(hex::encode).collect()))
484 .collect();
485
486 let saved = SavedIndex {
487 chunks: chunks_hex,
488 content_chunks: content_chunks_hex,
489 stats: stats.clone(),
490 };
491
492 let data = serde_json::to_vec_pretty(&saved)
493 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
494
495 let index_path = self.base_path.join("meta").join("index.json");
496 fs::write(&index_path, data).await?;
497
498 Ok(())
499 }
500}
501
502#[derive(Debug, Clone)]
504pub struct ContentDedupInfo {
505 pub cid: String,
507 pub total_chunks: u64,
509 pub unique_chunks: u64,
511 pub shared_chunks: u64,
513 pub total_size: u64,
515}
516
517#[derive(Debug, Clone)]
519pub struct GcResult {
520 pub chunks_removed: u64,
522 pub bytes_freed: u64,
524}
525
526#[derive(Debug, serde::Serialize, serde::Deserialize)]
528struct SavedIndex {
529 chunks: HashMap<String, ChunkRef>,
530 content_chunks: HashMap<String, Vec<String>>,
531 stats: DedupStats,
532}
533
534#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct ReferenceEntry {
537 pub cid: String,
539 pub chunk_index: u64,
541 pub created_at: u64,
543}
544
545#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
547pub struct EnhancedChunkRef {
548 pub hash: [u8; 32],
550 pub size: u64,
552 pub references: Vec<ReferenceEntry>,
554 pub storage_path: String,
556}
557
558impl EnhancedChunkRef {
559 #[must_use]
561 #[inline]
562 pub fn ref_count(&self) -> u32 {
563 self.references.len() as u32
564 }
565
566 #[must_use]
568 #[inline]
569 pub fn is_referenced_by(&self, cid: &str) -> bool {
570 self.references.iter().any(|r| r.cid == cid)
571 }
572
573 #[must_use]
575 #[inline]
576 pub fn get_referencing_cids(&self) -> Vec<String> {
577 self.references.iter().map(|r| r.cid.clone()).collect()
578 }
579}
580
581#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
583pub struct IntegrityCheckResult {
584 pub chunks_checked: u64,
586 pub mismatches_found: u64,
588 pub orphaned_chunks: u64,
590 pub missing_files: u64,
592 pub orphaned_bytes: u64,
594}
595
596impl DedupStore {
597 #[must_use]
599 pub async fn get_chunk_references(&self, hash: &[u8; 32]) -> Option<Vec<ChunkReference>> {
600 let content_chunks = self.content_chunks.read().await;
601
602 let mut references = Vec::new();
603 for (cid, hashes) in content_chunks.iter() {
604 for (index, chunk_hash) in hashes.iter().enumerate() {
605 if chunk_hash == hash {
606 references.push(ChunkReference {
607 cid: cid.clone(),
608 chunk_index: index as u64,
609 });
610 }
611 }
612 }
613
614 if references.is_empty() {
615 None
616 } else {
617 Some(references)
618 }
619 }
620
621 #[must_use]
623 pub async fn get_content_chunks_detailed(&self, cid: &str) -> Option<Vec<EnhancedChunkRef>> {
624 let chunk_data: Vec<([u8; 32], u64, String)> = {
626 let index = self.index.read().await;
627 let content_chunks = self.content_chunks.read().await;
628
629 if let Some(hashes) = content_chunks.get(cid) {
630 hashes
631 .iter()
632 .filter_map(|hash| {
633 index.get(hash).map(|chunk_ref| {
634 (*hash, chunk_ref.size, chunk_ref.storage_path.clone())
635 })
636 })
637 .collect()
638 } else {
639 return None;
640 }
641 };
642
643 let mut result = Vec::new();
645 for (hash, size, storage_path) in chunk_data {
646 let refs = self.get_chunk_references(&hash).await.unwrap_or_default();
647 let references: Vec<ReferenceEntry> = refs
648 .into_iter()
649 .map(|r| ReferenceEntry {
650 cid: r.cid,
651 chunk_index: r.chunk_index,
652 created_at: current_timestamp(),
653 })
654 .collect();
655
656 result.push(EnhancedChunkRef {
657 hash,
658 size,
659 references,
660 storage_path,
661 });
662 }
663
664 Some(result)
665 }
666
667 pub async fn verify_integrity(&self) -> std::io::Result<IntegrityCheckResult> {
669 let index = self.index.read().await;
670 let content_chunks = self.content_chunks.read().await;
671
672 let mut chunks_checked = 0u64;
673 let mut mismatches_found = 0u64;
674 let mut orphaned_chunks = 0u64;
675 let mut missing_files = 0u64;
676 let mut orphaned_bytes = 0u64;
677
678 let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
680 for hashes in content_chunks.values() {
681 for hash in hashes {
682 *actual_refs.entry(*hash).or_insert(0) += 1;
683 }
684 }
685
686 for (hash, chunk_ref) in index.iter() {
688 chunks_checked += 1;
689
690 let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
691
692 if actual_count != chunk_ref.ref_count {
694 mismatches_found += 1;
695 tracing::warn!(
696 "Ref count mismatch for chunk {:?}: stored={}, actual={}",
697 hex::encode(&hash[..8]),
698 chunk_ref.ref_count,
699 actual_count
700 );
701 }
702
703 if actual_count == 0 {
705 orphaned_chunks += 1;
706 orphaned_bytes += chunk_ref.size;
707 }
708
709 let path = Path::new(&chunk_ref.storage_path);
711 if !path.exists() {
712 missing_files += 1;
713 tracing::warn!(
714 "Missing chunk file for hash {:?}: {}",
715 hex::encode(&hash[..8]),
716 chunk_ref.storage_path
717 );
718 }
719 }
720
721 Ok(IntegrityCheckResult {
722 chunks_checked,
723 mismatches_found,
724 orphaned_chunks,
725 missing_files,
726 orphaned_bytes,
727 })
728 }
729
730 pub async fn repair_references(&self) -> std::io::Result<u64> {
732 let mut index = self.index.write().await;
733 let content_chunks = self.content_chunks.read().await;
734
735 let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
737 for hashes in content_chunks.values() {
738 for hash in hashes {
739 *actual_refs.entry(*hash).or_insert(0) += 1;
740 }
741 }
742
743 let mut repaired = 0u64;
744
745 for (hash, chunk_ref) in index.iter_mut() {
747 let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
748
749 if actual_count != chunk_ref.ref_count {
750 tracing::info!(
751 "Repairing chunk {:?}: {} -> {}",
752 hex::encode(&hash[..8]),
753 chunk_ref.ref_count,
754 actual_count
755 );
756 chunk_ref.ref_count = actual_count;
757 repaired += 1;
758 }
759 }
760
761 drop(index);
762 drop(content_chunks);
763
764 if repaired > 0 {
765 self.save_index().await?;
766 }
767
768 info!("Repaired {} reference counts", repaired);
769 Ok(repaired)
770 }
771
772 #[must_use]
774 pub async fn ref_count_distribution(&self) -> HashMap<u32, u64> {
775 let index = self.index.read().await;
776
777 let mut distribution: HashMap<u32, u64> = HashMap::new();
778 for chunk_ref in index.values() {
779 *distribution.entry(chunk_ref.ref_count).or_insert(0) += 1;
780 }
781
782 distribution
783 }
784
785 #[must_use]
787 pub async fn most_referenced_chunks(&self, limit: usize) -> Vec<([u8; 32], u32, u64)> {
788 let index = self.index.read().await;
789
790 let mut chunks: Vec<_> = index
791 .iter()
792 .map(|(hash, chunk_ref)| (*hash, chunk_ref.ref_count, chunk_ref.size))
793 .collect();
794
795 chunks.sort_by(|a, b| b.1.cmp(&a.1));
796 chunks.truncate(limit);
797
798 chunks
799 }
800
801 #[must_use]
803 pub async fn calculate_removal_impact(&self, cid: &str) -> Option<RemovalImpact> {
804 let index = self.index.read().await;
805 let content_chunks = self.content_chunks.read().await;
806
807 if let Some(hashes) = content_chunks.get(cid) {
808 let mut bytes_freed = 0u64;
809 let mut exclusive_chunks = 0u64;
810 let mut shared_chunks = 0u64;
811
812 for hash in hashes {
813 if let Some(chunk_ref) = index.get(hash) {
814 if chunk_ref.ref_count == 1 {
815 bytes_freed += chunk_ref.size;
817 exclusive_chunks += 1;
818 } else {
819 shared_chunks += 1;
821 }
822 }
823 }
824
825 Some(RemovalImpact {
826 cid: cid.to_string(),
827 bytes_freed,
828 exclusive_chunks,
829 shared_chunks,
830 total_chunks: hashes.len() as u64,
831 })
832 } else {
833 None
834 }
835 }
836
837 pub async fn add_references_batch(
839 &self,
840 cid: &str,
841 chunk_hashes: Vec<[u8; 32]>,
842 ) -> std::io::Result<u64> {
843 let mut index = self.index.write().await;
844 let mut content_chunks = self.content_chunks.write().await;
845
846 let mut refs_added = 0u64;
847
848 for hash in &chunk_hashes {
849 if let Some(chunk_ref) = index.get_mut(hash) {
850 chunk_ref.ref_count += 1;
851 refs_added += 1;
852 }
853 }
854
855 content_chunks.insert(cid.to_string(), chunk_hashes);
856
857 drop(index);
858 drop(content_chunks);
859 self.save_index().await?;
860
861 Ok(refs_added)
862 }
863}
864
865#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
867pub struct RemovalImpact {
868 pub cid: String,
870 pub bytes_freed: u64,
872 pub exclusive_chunks: u64,
874 pub shared_chunks: u64,
876 pub total_chunks: u64,
878}
879
880fn current_timestamp() -> u64 {
882 std::time::SystemTime::now()
883 .duration_since(std::time::UNIX_EPOCH)
884 .map(|d| d.as_secs())
885 .unwrap_or(0)
886}
887
888pub async fn find_duplicates(store: &DedupStore, cid1: &str, cid2: &str) -> Vec<[u8; 32]> {
890 let content_chunks = store.content_chunks.read().await;
891
892 let hashes1 = content_chunks.get(cid1);
893 let hashes2 = content_chunks.get(cid2);
894
895 match (hashes1, hashes2) {
896 (Some(h1), Some(h2)) => {
897 let set1: std::collections::HashSet<_> = h1.iter().collect();
898 h2.iter().filter(|h| set1.contains(h)).copied().collect()
899 }
900 _ => Vec::new(),
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907
908 #[tokio::test]
909 async fn test_dedup_store() {
910 let temp_dir = std::env::temp_dir().join("chie_dedup_test");
911 let _ = fs::remove_dir_all(&temp_dir).await;
912
913 let store = DedupStore::new(temp_dir.clone(), DedupConfig::default())
914 .await
915 .unwrap();
916
917 let data = vec![0u8; 8192]; let result1 = store.store_chunk("cid1", 0, &data).await.unwrap();
921 assert!(matches!(result1, StoreResult::Stored { .. }));
922
923 let result2 = store.store_chunk("cid2", 0, &data).await.unwrap();
924 assert!(matches!(result2, StoreResult::Deduplicated { .. }));
925
926 let stats = store.stats().await;
927 assert_eq!(stats.unique_chunks, 1);
928 assert_eq!(stats.total_references, 2);
929 assert!(stats.bytes_saved > 0);
930
931 let _ = fs::remove_dir_all(&temp_dir).await;
933 }
934}