chie_core/
dedup.rs

1//! Content deduplication for storage efficiency.
2//!
3//! This module provides chunk-level deduplication using BLAKE3 hashes
4//! to identify identical chunks across different content.
5
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::fs;
10use tokio::sync::RwLock;
11use tracing::{debug, info};
12
13/// Deduplication configuration.
14#[derive(Debug, Clone)]
15pub struct DedupConfig {
16    /// Minimum reference count before a chunk is eligible for dedup.
17    pub min_ref_count: u32,
18    /// Enable inline deduplication (during storage).
19    pub enable_inline_dedup: bool,
20    /// Enable background deduplication.
21    pub enable_background_dedup: bool,
22    /// Minimum chunk size for deduplication (bytes).
23    pub min_chunk_size: usize,
24}
25
26impl Default for DedupConfig {
27    fn default() -> Self {
28        Self {
29            min_ref_count: 2,
30            enable_inline_dedup: true,
31            enable_background_dedup: true,
32            min_chunk_size: 4096, // 4 KB minimum
33        }
34    }
35}
36
37/// Reference to a deduplicated chunk.
38#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
39pub struct ChunkRef {
40    /// BLAKE3 hash of the chunk (content-addressable key).
41    pub hash: [u8; 32],
42    /// Size of the chunk in bytes.
43    pub size: u64,
44    /// Reference count (how many content items reference this chunk).
45    pub ref_count: u32,
46    /// Path to the actual chunk data (relative to dedup store).
47    pub storage_path: String,
48}
49
50/// Deduplication entry for tracking chunk usage.
51#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
52pub struct DedupEntry {
53    /// Chunk hash.
54    pub hash: [u8; 32],
55    /// Content CIDs that reference this chunk.
56    pub references: Vec<ChunkReference>,
57    /// Total size saved by deduplication (bytes).
58    pub bytes_saved: u64,
59    /// When this entry was created.
60    pub created_at: chrono::DateTime<chrono::Utc>,
61}
62
63/// Reference from content to a chunk.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct ChunkReference {
66    /// Content CID.
67    pub cid: String,
68    /// Chunk index within the content.
69    pub chunk_index: u64,
70}
71
72/// Deduplication store for managing deduplicated chunks.
73pub struct DedupStore {
74    config: DedupConfig,
75    /// Base path for dedup storage.
76    base_path: PathBuf,
77    /// In-memory index of chunk hashes to refs.
78    index: Arc<RwLock<HashMap<[u8; 32], ChunkRef>>>,
79    /// Reverse index: content CID -> chunk hashes.
80    content_chunks: Arc<RwLock<HashMap<String, Vec<[u8; 32]>>>>,
81    /// Deduplication statistics.
82    stats: Arc<RwLock<DedupStats>>,
83}
84
85/// Deduplication statistics.
86#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
87pub struct DedupStats {
88    /// Total unique chunks stored.
89    pub unique_chunks: u64,
90    /// Total chunk references (including duplicates).
91    pub total_references: u64,
92    /// Total bytes saved by deduplication.
93    pub bytes_saved: u64,
94    /// Total bytes stored (after dedup).
95    pub bytes_stored: u64,
96    /// Deduplication ratio (total_references / unique_chunks).
97    pub dedup_ratio: f64,
98    /// Space savings percentage.
99    pub space_savings_percent: f64,
100}
101
102impl DedupStats {
103    /// Update derived statistics.
104    #[inline]
105    pub fn update(&mut self) {
106        if self.unique_chunks > 0 {
107            self.dedup_ratio = self.total_references as f64 / self.unique_chunks as f64;
108        }
109        let total_logical = self.bytes_stored + self.bytes_saved;
110        if total_logical > 0 {
111            self.space_savings_percent = (self.bytes_saved as f64 / total_logical as f64) * 100.0;
112        }
113    }
114}
115
116/// Result of storing a chunk with deduplication.
117#[derive(Debug, Clone)]
118pub enum StoreResult {
119    /// Chunk was new and stored.
120    Stored { hash: [u8; 32], size: u64 },
121    /// Chunk was a duplicate, reference added.
122    Deduplicated { hash: [u8; 32], bytes_saved: u64 },
123}
124
125impl DedupStore {
126    /// Create a new deduplication store.
127    pub async fn new(base_path: PathBuf, config: DedupConfig) -> std::io::Result<Self> {
128        // Create directories
129        fs::create_dir_all(&base_path).await?;
130        fs::create_dir_all(base_path.join("chunks")).await?;
131        fs::create_dir_all(base_path.join("meta")).await?;
132
133        let store = Self {
134            config,
135            base_path,
136            index: Arc::new(RwLock::new(HashMap::new())),
137            content_chunks: Arc::new(RwLock::new(HashMap::new())),
138            stats: Arc::new(RwLock::new(DedupStats::default())),
139        };
140
141        // Load existing index
142        store.load_index().await?;
143
144        Ok(store)
145    }
146
147    /// Store a chunk with deduplication.
148    pub async fn store_chunk(
149        &self,
150        cid: &str,
151        _chunk_index: u64,
152        data: &[u8],
153    ) -> std::io::Result<StoreResult> {
154        // Skip small chunks
155        if data.len() < self.config.min_chunk_size {
156            return Ok(StoreResult::Stored {
157                hash: [0u8; 32],
158                size: data.len() as u64,
159            });
160        }
161
162        // Calculate hash
163        let hash = chie_crypto::hash(data);
164
165        let mut index = self.index.write().await;
166        let mut content_chunks = self.content_chunks.write().await;
167        let mut stats = self.stats.write().await;
168
169        // Check if chunk already exists
170        if let Some(chunk_ref) = index.get_mut(&hash) {
171            // Duplicate found
172            chunk_ref.ref_count += 1;
173            let bytes_saved = data.len() as u64;
174
175            // Add to content's chunk list
176            content_chunks
177                .entry(cid.to_string())
178                .or_default()
179                .push(hash);
180
181            // Update stats
182            stats.total_references += 1;
183            stats.bytes_saved += bytes_saved;
184            stats.update();
185
186            debug!(
187                "Deduplicated chunk: {} refs for hash {:?}",
188                chunk_ref.ref_count,
189                hex::encode(&hash[..8])
190            );
191
192            return Ok(StoreResult::Deduplicated { hash, bytes_saved });
193        }
194
195        // New chunk - store it
196        let storage_path = self.chunk_path(&hash);
197        // Ensure parent directory exists
198        if let Some(parent) = storage_path.parent() {
199            fs::create_dir_all(parent).await?;
200        }
201        fs::write(&storage_path, data).await?;
202
203        let chunk_ref = ChunkRef {
204            hash,
205            size: data.len() as u64,
206            ref_count: 1,
207            storage_path: storage_path.to_string_lossy().to_string(),
208        };
209
210        index.insert(hash, chunk_ref);
211
212        // Add to content's chunk list
213        content_chunks
214            .entry(cid.to_string())
215            .or_default()
216            .push(hash);
217
218        // Update stats
219        stats.unique_chunks += 1;
220        stats.total_references += 1;
221        stats.bytes_stored += data.len() as u64;
222        stats.update();
223
224        // Save index periodically
225        drop(index);
226        drop(content_chunks);
227        drop(stats);
228        self.save_index().await?;
229
230        Ok(StoreResult::Stored {
231            hash,
232            size: data.len() as u64,
233        })
234    }
235
236    /// Retrieve a chunk by hash.
237    pub async fn get_chunk(&self, hash: &[u8; 32]) -> std::io::Result<Option<Vec<u8>>> {
238        let index = self.index.read().await;
239
240        if let Some(chunk_ref) = index.get(hash) {
241            let path = Path::new(&chunk_ref.storage_path);
242            if path.exists() {
243                let data = fs::read(path).await?;
244                return Ok(Some(data));
245            }
246        }
247
248        Ok(None)
249    }
250
251    /// Get a chunk by content CID and chunk index.
252    pub async fn get_content_chunk(
253        &self,
254        cid: &str,
255        chunk_index: u64,
256    ) -> std::io::Result<Option<Vec<u8>>> {
257        let content_chunks = self.content_chunks.read().await;
258
259        if let Some(hashes) = content_chunks.get(cid) {
260            if let Some(hash) = hashes.get(chunk_index as usize) {
261                return self.get_chunk(hash).await;
262            }
263        }
264
265        Ok(None)
266    }
267
268    /// Remove references for a content item.
269    pub async fn remove_content(&self, cid: &str) -> std::io::Result<u64> {
270        let mut index = self.index.write().await;
271        let mut content_chunks = self.content_chunks.write().await;
272        let mut stats = self.stats.write().await;
273
274        let mut bytes_freed = 0u64;
275
276        if let Some(hashes) = content_chunks.remove(cid) {
277            for hash in hashes {
278                if let Some(chunk_ref) = index.get_mut(&hash) {
279                    chunk_ref.ref_count -= 1;
280                    stats.total_references -= 1;
281
282                    if chunk_ref.ref_count == 0 {
283                        // No more references, delete the chunk
284                        let path = Path::new(&chunk_ref.storage_path);
285                        if path.exists() {
286                            fs::remove_file(path).await?;
287                        }
288                        bytes_freed += chunk_ref.size;
289                        stats.unique_chunks -= 1;
290                        stats.bytes_stored -= chunk_ref.size;
291                        index.remove(&hash);
292                    }
293                }
294            }
295        }
296
297        stats.update();
298
299        drop(index);
300        drop(content_chunks);
301        drop(stats);
302        self.save_index().await?;
303
304        info!("Removed content {} - freed {} bytes", cid, bytes_freed);
305        Ok(bytes_freed)
306    }
307
308    /// Get deduplication statistics.
309    #[must_use]
310    #[inline]
311    pub async fn stats(&self) -> DedupStats {
312        self.stats.read().await.clone()
313    }
314
315    /// Check if a chunk hash exists.
316    #[must_use]
317    #[inline]
318    pub async fn contains(&self, hash: &[u8; 32]) -> bool {
319        let index = self.index.read().await;
320        index.contains_key(hash)
321    }
322
323    /// Get chunk reference count.
324    #[must_use]
325    #[inline]
326    pub async fn ref_count(&self, hash: &[u8; 32]) -> Option<u32> {
327        let index = self.index.read().await;
328        index.get(hash).map(|r| r.ref_count)
329    }
330
331    /// List all content CIDs in the store.
332    #[must_use]
333    #[inline]
334    pub async fn list_content(&self) -> Vec<String> {
335        let content_chunks = self.content_chunks.read().await;
336        content_chunks.keys().cloned().collect()
337    }
338
339    /// Get content info.
340    #[must_use]
341    #[inline]
342    pub async fn content_info(&self, cid: &str) -> Option<ContentDedupInfo> {
343        let index = self.index.read().await;
344        let content_chunks = self.content_chunks.read().await;
345
346        if let Some(hashes) = content_chunks.get(cid) {
347            let mut total_size = 0u64;
348            let mut unique_chunks = 0u64;
349            let mut shared_chunks = 0u64;
350
351            for hash in hashes {
352                if let Some(chunk_ref) = index.get(hash) {
353                    total_size += chunk_ref.size;
354                    if chunk_ref.ref_count == 1 {
355                        unique_chunks += 1;
356                    } else {
357                        shared_chunks += 1;
358                    }
359                }
360            }
361
362            return Some(ContentDedupInfo {
363                cid: cid.to_string(),
364                total_chunks: hashes.len() as u64,
365                unique_chunks,
366                shared_chunks,
367                total_size,
368            });
369        }
370
371        None
372    }
373
374    /// Run garbage collection to remove orphaned chunks.
375    pub async fn gc(&self) -> std::io::Result<GcResult> {
376        let mut index = self.index.write().await;
377        let mut stats = self.stats.write().await;
378
379        let mut orphaned: Vec<[u8; 32]> = Vec::new();
380        let mut bytes_freed = 0u64;
381
382        for (hash, chunk_ref) in index.iter() {
383            if chunk_ref.ref_count == 0 {
384                orphaned.push(*hash);
385            }
386        }
387
388        for hash in &orphaned {
389            if let Some(chunk_ref) = index.remove(hash) {
390                let path = Path::new(&chunk_ref.storage_path);
391                if path.exists() {
392                    fs::remove_file(path).await?;
393                }
394                bytes_freed += chunk_ref.size;
395                stats.unique_chunks -= 1;
396                stats.bytes_stored -= chunk_ref.size;
397            }
398        }
399
400        stats.update();
401
402        info!(
403            "GC completed: {} orphaned chunks removed, {} bytes freed",
404            orphaned.len(),
405            bytes_freed
406        );
407
408        Ok(GcResult {
409            chunks_removed: orphaned.len() as u64,
410            bytes_freed,
411        })
412    }
413
414    // Internal methods
415
416    fn chunk_path(&self, hash: &[u8; 32]) -> PathBuf {
417        let hash_hex = hex::encode(hash);
418        // Use first 2 chars as subdirectory for better filesystem performance
419        let subdir = &hash_hex[..2];
420        self.base_path.join("chunks").join(subdir).join(&hash_hex)
421    }
422
423    async fn load_index(&self) -> std::io::Result<()> {
424        let index_path = self.base_path.join("meta").join("index.json");
425        if !index_path.exists() {
426            return Ok(());
427        }
428
429        let data = fs::read(&index_path).await?;
430        let saved: SavedIndex = serde_json::from_slice(&data)
431            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
432
433        // Convert hex strings back to byte arrays
434        let mut index = self.index.write().await;
435        for (hex_key, value) in saved.chunks {
436            if let Ok(bytes) = hex::decode(&hex_key) {
437                if bytes.len() == 32 {
438                    let mut key = [0u8; 32];
439                    key.copy_from_slice(&bytes);
440                    index.insert(key, value);
441                }
442            }
443        }
444
445        let mut content_chunks = self.content_chunks.write().await;
446        for (cid, hex_hashes) in saved.content_chunks {
447            let hashes: Vec<[u8; 32]> = hex_hashes
448                .iter()
449                .filter_map(|h| {
450                    hex::decode(h).ok().and_then(|bytes| {
451                        if bytes.len() == 32 {
452                            let mut arr = [0u8; 32];
453                            arr.copy_from_slice(&bytes);
454                            Some(arr)
455                        } else {
456                            None
457                        }
458                    })
459                })
460                .collect();
461            content_chunks.insert(cid, hashes);
462        }
463
464        let mut stats = self.stats.write().await;
465        *stats = saved.stats;
466
467        Ok(())
468    }
469
470    async fn save_index(&self) -> std::io::Result<()> {
471        let index = self.index.read().await;
472        let content_chunks = self.content_chunks.read().await;
473        let stats = self.stats.read().await;
474
475        // Convert byte array keys to hex strings for JSON
476        let chunks_hex: HashMap<String, ChunkRef> = index
477            .iter()
478            .map(|(k, v)| (hex::encode(k), v.clone()))
479            .collect();
480
481        let content_chunks_hex: HashMap<String, Vec<String>> = content_chunks
482            .iter()
483            .map(|(k, v)| (k.clone(), v.iter().map(hex::encode).collect()))
484            .collect();
485
486        let saved = SavedIndex {
487            chunks: chunks_hex,
488            content_chunks: content_chunks_hex,
489            stats: stats.clone(),
490        };
491
492        let data = serde_json::to_vec_pretty(&saved)
493            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
494
495        let index_path = self.base_path.join("meta").join("index.json");
496        fs::write(&index_path, data).await?;
497
498        Ok(())
499    }
500}
501
502/// Content deduplication info.
503#[derive(Debug, Clone)]
504pub struct ContentDedupInfo {
505    /// Content CID.
506    pub cid: String,
507    /// Total number of chunks.
508    pub total_chunks: u64,
509    /// Chunks unique to this content.
510    pub unique_chunks: u64,
511    /// Chunks shared with other content.
512    pub shared_chunks: u64,
513    /// Total logical size.
514    pub total_size: u64,
515}
516
517/// Garbage collection result.
518#[derive(Debug, Clone)]
519pub struct GcResult {
520    /// Number of chunks removed.
521    pub chunks_removed: u64,
522    /// Bytes freed.
523    pub bytes_freed: u64,
524}
525
526/// Saved index structure (uses hex strings for JSON compatibility).
527#[derive(Debug, serde::Serialize, serde::Deserialize)]
528struct SavedIndex {
529    chunks: HashMap<String, ChunkRef>,
530    content_chunks: HashMap<String, Vec<String>>,
531    stats: DedupStats,
532}
533
534/// Reference tracking entry for detailed auditing.
535#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
536pub struct ReferenceEntry {
537    /// Content CID that references this chunk.
538    pub cid: String,
539    /// Chunk index within the content.
540    pub chunk_index: u64,
541    /// When this reference was created.
542    pub created_at: u64,
543}
544
545/// Enhanced chunk reference with detailed tracking.
546#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
547pub struct EnhancedChunkRef {
548    /// BLAKE3 hash of the chunk.
549    pub hash: [u8; 32],
550    /// Size of the chunk in bytes.
551    pub size: u64,
552    /// List of all references to this chunk.
553    pub references: Vec<ReferenceEntry>,
554    /// Path to the actual chunk data.
555    pub storage_path: String,
556}
557
558impl EnhancedChunkRef {
559    /// Get the reference count.
560    #[must_use]
561    #[inline]
562    pub fn ref_count(&self) -> u32 {
563        self.references.len() as u32
564    }
565
566    /// Check if a specific content references this chunk.
567    #[must_use]
568    #[inline]
569    pub fn is_referenced_by(&self, cid: &str) -> bool {
570        self.references.iter().any(|r| r.cid == cid)
571    }
572
573    /// Get all CIDs that reference this chunk.
574    #[must_use]
575    #[inline]
576    pub fn get_referencing_cids(&self) -> Vec<String> {
577        self.references.iter().map(|r| r.cid.clone()).collect()
578    }
579}
580
581/// Result of a reference integrity check.
582#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
583pub struct IntegrityCheckResult {
584    /// Number of chunks checked.
585    pub chunks_checked: u64,
586    /// Number of chunks with mismatched ref counts.
587    pub mismatches_found: u64,
588    /// Number of orphaned chunks (zero refs).
589    pub orphaned_chunks: u64,
590    /// Number of missing chunk files.
591    pub missing_files: u64,
592    /// Total bytes in orphaned chunks.
593    pub orphaned_bytes: u64,
594}
595
596impl DedupStore {
597    /// Get all references for a specific chunk hash.
598    #[must_use]
599    pub async fn get_chunk_references(&self, hash: &[u8; 32]) -> Option<Vec<ChunkReference>> {
600        let content_chunks = self.content_chunks.read().await;
601
602        let mut references = Vec::new();
603        for (cid, hashes) in content_chunks.iter() {
604            for (index, chunk_hash) in hashes.iter().enumerate() {
605                if chunk_hash == hash {
606                    references.push(ChunkReference {
607                        cid: cid.clone(),
608                        chunk_index: index as u64,
609                    });
610                }
611            }
612        }
613
614        if references.is_empty() {
615            None
616        } else {
617            Some(references)
618        }
619    }
620
621    /// List all chunks referenced by a specific content.
622    #[must_use]
623    pub async fn get_content_chunks_detailed(&self, cid: &str) -> Option<Vec<EnhancedChunkRef>> {
624        // First, collect the chunk data we need
625        let chunk_data: Vec<([u8; 32], u64, String)> = {
626            let index = self.index.read().await;
627            let content_chunks = self.content_chunks.read().await;
628
629            if let Some(hashes) = content_chunks.get(cid) {
630                hashes
631                    .iter()
632                    .filter_map(|hash| {
633                        index.get(hash).map(|chunk_ref| {
634                            (*hash, chunk_ref.size, chunk_ref.storage_path.clone())
635                        })
636                    })
637                    .collect()
638            } else {
639                return None;
640            }
641        };
642
643        // Now get references for each chunk (locks are released)
644        let mut result = Vec::new();
645        for (hash, size, storage_path) in chunk_data {
646            let refs = self.get_chunk_references(&hash).await.unwrap_or_default();
647            let references: Vec<ReferenceEntry> = refs
648                .into_iter()
649                .map(|r| ReferenceEntry {
650                    cid: r.cid,
651                    chunk_index: r.chunk_index,
652                    created_at: current_timestamp(),
653                })
654                .collect();
655
656            result.push(EnhancedChunkRef {
657                hash,
658                size,
659                references,
660                storage_path,
661            });
662        }
663
664        Some(result)
665    }
666
667    /// Verify reference count integrity across the store.
668    pub async fn verify_integrity(&self) -> std::io::Result<IntegrityCheckResult> {
669        let index = self.index.read().await;
670        let content_chunks = self.content_chunks.read().await;
671
672        let mut chunks_checked = 0u64;
673        let mut mismatches_found = 0u64;
674        let mut orphaned_chunks = 0u64;
675        let mut missing_files = 0u64;
676        let mut orphaned_bytes = 0u64;
677
678        // Count actual references for each chunk
679        let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
680        for hashes in content_chunks.values() {
681            for hash in hashes {
682                *actual_refs.entry(*hash).or_insert(0) += 1;
683            }
684        }
685
686        // Verify each chunk
687        for (hash, chunk_ref) in index.iter() {
688            chunks_checked += 1;
689
690            let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
691
692            // Check if ref count matches
693            if actual_count != chunk_ref.ref_count {
694                mismatches_found += 1;
695                tracing::warn!(
696                    "Ref count mismatch for chunk {:?}: stored={}, actual={}",
697                    hex::encode(&hash[..8]),
698                    chunk_ref.ref_count,
699                    actual_count
700                );
701            }
702
703            // Check if orphaned
704            if actual_count == 0 {
705                orphaned_chunks += 1;
706                orphaned_bytes += chunk_ref.size;
707            }
708
709            // Check if file exists
710            let path = Path::new(&chunk_ref.storage_path);
711            if !path.exists() {
712                missing_files += 1;
713                tracing::warn!(
714                    "Missing chunk file for hash {:?}: {}",
715                    hex::encode(&hash[..8]),
716                    chunk_ref.storage_path
717                );
718            }
719        }
720
721        Ok(IntegrityCheckResult {
722            chunks_checked,
723            mismatches_found,
724            orphaned_chunks,
725            missing_files,
726            orphaned_bytes,
727        })
728    }
729
730    /// Repair reference counts based on actual references.
731    pub async fn repair_references(&self) -> std::io::Result<u64> {
732        let mut index = self.index.write().await;
733        let content_chunks = self.content_chunks.read().await;
734
735        // Count actual references for each chunk
736        let mut actual_refs: HashMap<[u8; 32], u32> = HashMap::new();
737        for hashes in content_chunks.values() {
738            for hash in hashes {
739                *actual_refs.entry(*hash).or_insert(0) += 1;
740            }
741        }
742
743        let mut repaired = 0u64;
744
745        // Update ref counts
746        for (hash, chunk_ref) in index.iter_mut() {
747            let actual_count = actual_refs.get(hash).copied().unwrap_or(0);
748
749            if actual_count != chunk_ref.ref_count {
750                tracing::info!(
751                    "Repairing chunk {:?}: {} -> {}",
752                    hex::encode(&hash[..8]),
753                    chunk_ref.ref_count,
754                    actual_count
755                );
756                chunk_ref.ref_count = actual_count;
757                repaired += 1;
758            }
759        }
760
761        drop(index);
762        drop(content_chunks);
763
764        if repaired > 0 {
765            self.save_index().await?;
766        }
767
768        info!("Repaired {} reference counts", repaired);
769        Ok(repaired)
770    }
771
772    /// Get reference count distribution statistics.
773    #[must_use]
774    pub async fn ref_count_distribution(&self) -> HashMap<u32, u64> {
775        let index = self.index.read().await;
776
777        let mut distribution: HashMap<u32, u64> = HashMap::new();
778        for chunk_ref in index.values() {
779            *distribution.entry(chunk_ref.ref_count).or_insert(0) += 1;
780        }
781
782        distribution
783    }
784
785    /// Find the most frequently referenced chunks.
786    #[must_use]
787    pub async fn most_referenced_chunks(&self, limit: usize) -> Vec<([u8; 32], u32, u64)> {
788        let index = self.index.read().await;
789
790        let mut chunks: Vec<_> = index
791            .iter()
792            .map(|(hash, chunk_ref)| (*hash, chunk_ref.ref_count, chunk_ref.size))
793            .collect();
794
795        chunks.sort_by(|a, b| b.1.cmp(&a.1));
796        chunks.truncate(limit);
797
798        chunks
799    }
800
801    /// Calculate potential savings if content were removed.
802    #[must_use]
803    pub async fn calculate_removal_impact(&self, cid: &str) -> Option<RemovalImpact> {
804        let index = self.index.read().await;
805        let content_chunks = self.content_chunks.read().await;
806
807        if let Some(hashes) = content_chunks.get(cid) {
808            let mut bytes_freed = 0u64;
809            let mut exclusive_chunks = 0u64;
810            let mut shared_chunks = 0u64;
811
812            for hash in hashes {
813                if let Some(chunk_ref) = index.get(hash) {
814                    if chunk_ref.ref_count == 1 {
815                        // This chunk would be deleted
816                        bytes_freed += chunk_ref.size;
817                        exclusive_chunks += 1;
818                    } else {
819                        // This chunk is shared
820                        shared_chunks += 1;
821                    }
822                }
823            }
824
825            Some(RemovalImpact {
826                cid: cid.to_string(),
827                bytes_freed,
828                exclusive_chunks,
829                shared_chunks,
830                total_chunks: hashes.len() as u64,
831            })
832        } else {
833            None
834        }
835    }
836
837    /// Batch add references for multiple chunks.
838    pub async fn add_references_batch(
839        &self,
840        cid: &str,
841        chunk_hashes: Vec<[u8; 32]>,
842    ) -> std::io::Result<u64> {
843        let mut index = self.index.write().await;
844        let mut content_chunks = self.content_chunks.write().await;
845
846        let mut refs_added = 0u64;
847
848        for hash in &chunk_hashes {
849            if let Some(chunk_ref) = index.get_mut(hash) {
850                chunk_ref.ref_count += 1;
851                refs_added += 1;
852            }
853        }
854
855        content_chunks.insert(cid.to_string(), chunk_hashes);
856
857        drop(index);
858        drop(content_chunks);
859        self.save_index().await?;
860
861        Ok(refs_added)
862    }
863}
864
865/// Impact analysis for content removal.
866#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
867pub struct RemovalImpact {
868    /// Content CID.
869    pub cid: String,
870    /// Bytes that would be freed.
871    pub bytes_freed: u64,
872    /// Number of exclusive chunks (would be deleted).
873    pub exclusive_chunks: u64,
874    /// Number of shared chunks (would remain).
875    pub shared_chunks: u64,
876    /// Total chunks in content.
877    pub total_chunks: u64,
878}
879
880/// Get current Unix timestamp.
881fn current_timestamp() -> u64 {
882    std::time::SystemTime::now()
883        .duration_since(std::time::UNIX_EPOCH)
884        .map(|d| d.as_secs())
885        .unwrap_or(0)
886}
887
888/// Find duplicate chunks between two content items.
889pub async fn find_duplicates(store: &DedupStore, cid1: &str, cid2: &str) -> Vec<[u8; 32]> {
890    let content_chunks = store.content_chunks.read().await;
891
892    let hashes1 = content_chunks.get(cid1);
893    let hashes2 = content_chunks.get(cid2);
894
895    match (hashes1, hashes2) {
896        (Some(h1), Some(h2)) => {
897            let set1: std::collections::HashSet<_> = h1.iter().collect();
898            h2.iter().filter(|h| set1.contains(h)).copied().collect()
899        }
900        _ => Vec::new(),
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907
908    #[tokio::test]
909    async fn test_dedup_store() {
910        let temp_dir = std::env::temp_dir().join("chie_dedup_test");
911        let _ = fs::remove_dir_all(&temp_dir).await;
912
913        let store = DedupStore::new(temp_dir.clone(), DedupConfig::default())
914            .await
915            .unwrap();
916
917        // Store same chunk twice for different content
918        let data = vec![0u8; 8192]; // 8KB chunk
919
920        let result1 = store.store_chunk("cid1", 0, &data).await.unwrap();
921        assert!(matches!(result1, StoreResult::Stored { .. }));
922
923        let result2 = store.store_chunk("cid2", 0, &data).await.unwrap();
924        assert!(matches!(result2, StoreResult::Deduplicated { .. }));
925
926        let stats = store.stats().await;
927        assert_eq!(stats.unique_chunks, 1);
928        assert_eq!(stats.total_references, 2);
929        assert!(stats.bytes_saved > 0);
930
931        // Cleanup
932        let _ = fs::remove_dir_all(&temp_dir).await;
933    }
934}