ipfrs_storage/
dedup.rs

1//! Block deduplication using content-defined chunking.
2//!
3//! This module provides transparent deduplication for block storage using
4//! FastCDC (Fast Content-Defined Chunking) algorithm.
5//!
6//! # Features
7//! - FastCDC algorithm for reliable boundary detection
8//! - Chunk-level deduplication with reference counting
9//! - Deduplication statistics and savings tracking
10//! - Transparent block reconstruction
11//! - Automatic garbage collection of unreferenced chunks
12
13use crate::traits::BlockStore;
14use async_trait::async_trait;
15use dashmap::DashMap;
16use ipfrs_core::{Block, Cid, Error, Result};
17use parking_lot::RwLock;
18use std::sync::Arc;
19
20/// Chunk configuration for content-defined chunking
21#[derive(Debug, Clone)]
22pub struct ChunkingConfig {
23    /// Minimum chunk size (default: 256KB)
24    pub min_chunk_size: usize,
25    /// Target chunk size (default: 1MB)
26    pub target_chunk_size: usize,
27    /// Maximum chunk size (default: 4MB)
28    pub max_chunk_size: usize,
29    /// Rolling hash mask (determines avg chunk size)
30    pub hash_mask: u32,
31}
32
33impl Default for ChunkingConfig {
34    fn default() -> Self {
35        Self {
36            min_chunk_size: 256 * 1024,      // 256KB
37            target_chunk_size: 1024 * 1024,  // 1MB
38            max_chunk_size: 4 * 1024 * 1024, // 4MB
39            hash_mask: 0xFFFF,               // ~64KB avg chunks
40        }
41    }
42}
43
44impl ChunkingConfig {
45    /// Create config optimized for small blocks
46    pub fn small() -> Self {
47        Self {
48            min_chunk_size: 64 * 1024,     // 64KB
49            target_chunk_size: 256 * 1024, // 256KB
50            max_chunk_size: 1024 * 1024,   // 1MB
51            hash_mask: 0x3FFF,             // ~16KB avg chunks
52        }
53    }
54
55    /// Create config optimized for large blocks
56    pub fn large() -> Self {
57        Self {
58            min_chunk_size: 1024 * 1024,        // 1MB
59            target_chunk_size: 4 * 1024 * 1024, // 4MB
60            max_chunk_size: 16 * 1024 * 1024,   // 16MB
61            hash_mask: 0x1FFFF,                 // ~128KB avg chunks
62        }
63    }
64}
65
66/// Chunk metadata stored in the dedup index
67#[derive(Debug, Clone)]
68struct ChunkMeta {
69    /// CID of the chunk data
70    cid: Cid,
71    /// Reference count
72    ref_count: usize,
73    /// Chunk size in bytes
74    size: usize,
75}
76
77/// Block manifest mapping a block to its chunks
78#[derive(Debug, Clone)]
79struct BlockManifest {
80    /// Original block size
81    original_size: usize,
82    /// List of chunk CIDs that make up this block
83    chunks: Vec<Cid>,
84}
85
86/// Deduplication statistics
87#[derive(Debug, Clone, Default)]
88pub struct DedupStats {
89    /// Total number of blocks stored
90    pub blocks_stored: usize,
91    /// Total bytes before deduplication
92    pub bytes_original: usize,
93    /// Total bytes after deduplication (unique chunks)
94    pub bytes_stored: usize,
95    /// Number of unique chunks
96    pub unique_chunks: usize,
97    /// Number of duplicate chunks avoided
98    pub duplicate_chunks_avoided: usize,
99}
100
101impl DedupStats {
102    /// Calculate deduplication ratio (savings)
103    pub fn dedup_ratio(&self) -> f64 {
104        if self.bytes_original == 0 {
105            return 0.0;
106        }
107        1.0 - (self.bytes_stored as f64 / self.bytes_original as f64)
108    }
109
110    /// Calculate space saved in bytes
111    pub fn bytes_saved(&self) -> usize {
112        self.bytes_original.saturating_sub(self.bytes_stored)
113    }
114
115    /// Calculate average chunk size
116    pub fn avg_chunk_size(&self) -> usize {
117        if self.unique_chunks == 0 {
118            return 0;
119        }
120        self.bytes_stored / self.unique_chunks
121    }
122}
123
124/// FastCDC gear hash table (precomputed random values for each byte)
125/// Reserved for future use with gear-based hashing
126#[allow(dead_code)]
127const GEAR: [u64; 256] = [
128    0x5c95c078, 0x22408989, 0x2d48a214, 0x12842087, 0x530f8afb, 0x2aaa3f86, 0x7f1bd89f, 0x62534467,
129    0x22c4b83b, 0x3e36d3e7, 0x4c9fa05b, 0x0b20f0e3, 0x441c8a8c, 0x7cc27988, 0x5505c6c0, 0x3c9ae0da,
130    0x153e46cd, 0x0d05f5b5, 0x51c9c3b5, 0x02e57b86, 0x74a8d4ba, 0x6f16cbb5, 0x2ffc27ea, 0x5fa83e0f,
131    0x75ab67e2, 0x3ff15813, 0x2ec58ac7, 0x6f1f0520, 0x0c5d7dba, 0x4a9f5e76, 0x4ec58e64, 0x6a470c8e,
132    0x40edf2ca, 0x1a1c0c8d, 0x4e32e5e4, 0x6c7a7fda, 0x4b3be9e4, 0x64d8e67b, 0x2ef8ad98, 0x34d9f7e5,
133    0x7e7e4a36, 0x1a1c54d1, 0x5e2a9e7a, 0x3e5f0a8e, 0x0e01d1a0, 0x1f31aa27, 0x049c9e3e, 0x7c38f56e,
134    0x4b8d9ef0, 0x0b9c4d05, 0x55f59f0d, 0x3e8e02ae, 0x25c46f84, 0x6e6fdc6f, 0x440ae4a7, 0x3e38a0e6,
135    0x5b96c3d1, 0x72a06105, 0x52cd5e2d, 0x3d015fb3, 0x4d7c7064, 0x1c8c169c, 0x5c95e834, 0x0c4d9d42,
136    0x3c9c8ea3, 0x10a5d9d6, 0x7dcb9d63, 0x3ecf9e96, 0x1f5c9e5f, 0x7e7854c5, 0x48a05ae3, 0x0c4e9419,
137    0x6b5c9b6f, 0x7e1a6dc0, 0x3b8f9fe8, 0x6f6e8e3f, 0x39f48adb, 0x7b8d9e72, 0x29e18dc5, 0x7e6c3fc4,
138    0x5d9c4ab8, 0x1f6e9dc2, 0x3e8f9fc3, 0x7d9c8ea6, 0x0e1f8d9c, 0x5f9d8e72, 0x3e9f8dcb, 0x7d8e9f72,
139    0x2f9d8ea5, 0x6e8f9dc4, 0x3d9f8ec5, 0x7e8d9f63, 0x1f9e8dc3, 0x6d8f9ec4, 0x3e9d8fc5, 0x7d9e8f62,
140    0x2e9f8dc4, 0x6f8d9ec5, 0x3d9e8fc3, 0x7e9d8f64, 0x1f8e9dc5, 0x6e9f8dc4, 0x3d8e9fc5, 0x7d9f8e63,
141    0x2f8d9ec4, 0x6e8f9dc5, 0x3e9d8fc4, 0x7d8e9f65, 0x1f9d8ec5, 0x6d9f8dc4, 0x3e8d9fc5, 0x7e9f8d62,
142    0x2d8e9fc4, 0x6f9d8ec5, 0x3d8f9dc4, 0x7e8d9f66, 0x1e9f8dc5, 0x6d8e9fc4, 0x3f9d8ec5, 0x7d9e8f61,
143    0x2f9d8ec4, 0x6e8d9fc5, 0x3d9f8dc4, 0x7e8f9d67, 0x1f8d9ec5, 0x6e9d8fc4, 0x3d8e9fc5, 0x7f9d8e60,
144    0x2e8f9dc4, 0x6f9e8dc5, 0x3d8d9fc4, 0x7e9f8d68, 0x1d9e8fc5, 0x6f8d9ec4, 0x3e9f8dc5, 0x7d8e9f5f,
145    0x2f8e9dc4, 0x6d9f8ec5, 0x3e8d9fc4, 0x7f9e8d69, 0x1f9d8ec5, 0x6e8f9dc4, 0x3d9e8fc5, 0x7e8d9f5e,
146    0x2d9f8ec4, 0x6f8e9dc5, 0x3d8f9fc4, 0x7e9d8e6a, 0x1e8f9dc5, 0x6d9e8fc4, 0x3f8d9ec5, 0x7d9f8e5d,
147    0x2f8d9fc4, 0x6e9f8ec5, 0x3d8e9dc4, 0x7f8d9e6b, 0x1f8e9fc5, 0x6e8d9ec4, 0x3d9f8fc5, 0x7e9e8d5c,
148    0x2e9d8fc4, 0x6f8e9dc5, 0x3e8f9ec4, 0x7d9e8f6c, 0x1f9e8dc5, 0x6d8f9fc4, 0x3e9d8ec5, 0x7d8f9e5b,
149    0x2f9e8dc4, 0x6e8d9fc5, 0x3d9f8ec4, 0x7e8f9d6d, 0x1e9d8fc5, 0x6f8e9dc4, 0x3d8f9ec5, 0x7e9d8f5a,
150    0x2d8f9ec4, 0x6e9d8fc5, 0x3f8e9dc4, 0x7d9f8e6e, 0x1f8d9fc5, 0x6e9e8dc4, 0x3d8f9fc5, 0x7f8e9d59,
151    0x2e8d9fc4, 0x6f9e8dc5, 0x3d9f8ec4, 0x7e8d9f6f, 0x1d9f8ec5, 0x6f8d9dc4, 0x3e8e9fc5, 0x7d9f8e58,
152    0x2f8e9fc4, 0x6d9f8dc5, 0x3e8d9ec4, 0x7f9e8d70, 0x1f8e9dc5, 0x6d8f9ec4, 0x3f9d8fc5, 0x7e8f9d57,
153    0x2d9e8fc4, 0x6f8e9dc5, 0x3d8f9ec4, 0x7e9d8f71, 0x1e9f8dc5, 0x6f8d9ec4, 0x3d9e8fc5, 0x7f8d9e56,
154    0x2f8d9ec4, 0x6e9f8dc5, 0x3e8d9fc4, 0x7d9e8f72, 0x1f9d8fc5, 0x6e8f9dc4, 0x3d8e9fc5, 0x7e9f8d55,
155    0x2e8f9fc4, 0x6d9e8dc5, 0x3f8d9ec4, 0x7e8f9d73, 0x1d9f8fc5, 0x6f8e9dc4, 0x3e8d9fc5, 0x7d8f9e54,
156    0x2f9e8dc4, 0x6e8f9fc5, 0x3d9d8ec4, 0x7f8e9d74, 0x1e8d9fc5, 0x6d9f8ec4, 0x3f8e9dc5, 0x7e9d8f53,
157    0x2d8e9fc4, 0x6f9d8ec5, 0x3d8f9fc4, 0x7e9f8d75, 0x1f8d9ec5, 0x6e9d8fc4, 0x3d9f8ec5, 0x7f8e9d52,
158    0x2e9f8dc4, 0x6d8e9fc5, 0x3f9d8ec4, 0x7d8f9e76, 0x1f9e8dc5, 0x6f8d9ec4, 0x3e9f8fc5, 0x7d9e8f51,
159    0x2f8d9fc4, 0x6e9e8dc5, 0x3d8f9ec4, 0x7e8d9f77, 0x1e9f8dc5, 0x6d8f9fc4, 0x3f8e9dc5, 0x7e9d8e50,
160];
161
162/// Content-defined chunking engine using FastCDC algorithm
163struct Chunker {
164    config: ChunkingConfig,
165}
166
167impl Chunker {
168    fn new(config: ChunkingConfig) -> Self {
169        Self { config }
170    }
171
172    /// Split data into content-defined chunks using FastCDC
173    fn chunk(&self, data: &[u8]) -> Vec<Vec<u8>> {
174        if data.len() <= self.config.min_chunk_size {
175            return vec![data.to_vec()];
176        }
177
178        let mut chunks = Vec::new();
179        let mut start = 0;
180
181        while start < data.len() {
182            let remaining = data.len() - start;
183
184            // If remaining is less than min, add it as final chunk
185            if remaining <= self.config.min_chunk_size {
186                chunks.push(data[start..].to_vec());
187                break;
188            }
189
190            // Find chunk boundary using FastCDC
191            let boundary = self.find_boundary(&data[start..]);
192            let end = start + boundary;
193
194            chunks.push(data[start..end].to_vec());
195            start = end;
196        }
197
198        chunks
199    }
200
201    /// Find chunk boundary using FastCDC algorithm with normalized chunking
202    #[allow(clippy::needless_range_loop)]
203    fn find_boundary(&self, data: &[u8]) -> usize {
204        let max_scan = self.config.max_chunk_size.min(data.len());
205        let min_size = self.config.min_chunk_size.min(data.len());
206
207        // FastCDC uses normalized chunking with two levels
208        let nc_level = min_size + (self.config.target_chunk_size - min_size) / 4;
209
210        let mut hash: u64 = 0;
211        const PRIME: u64 = 0x01000193; // FNV prime
212        let mask_s = self.config.hash_mask as u64; // Mask for smaller chunks
213        let mask_l = (self.config.hash_mask >> 1) as u64; // Mask for larger chunks
214
215        // Start from minimum chunk size (range needed for offset calculation)
216        for idx in min_size..max_scan {
217            let byte = data[idx];
218
219            // Update rolling hash using FNV-like hash for better distribution
220            hash = hash.wrapping_mul(PRIME) ^ (byte as u64);
221
222            // Use different mask based on position (normalized chunking)
223            let mask = if idx < nc_level { mask_s } else { mask_l };
224
225            // Check if we hit a boundary
226            if (hash & mask) == 0 {
227                return idx + 1;
228            }
229        }
230
231        // Return max chunk size if no boundary found
232        max_scan
233    }
234}
235
236/// Deduplicating block store wrapper
237pub struct DedupBlockStore<S> {
238    inner: S,
239    config: ChunkingConfig,
240    /// Chunk index: chunk_cid -> ChunkMeta
241    chunk_index: Arc<DashMap<Cid, ChunkMeta>>,
242    /// Block manifests: block_cid -> BlockManifest
243    manifests: Arc<DashMap<Cid, BlockManifest>>,
244    /// Statistics
245    stats: Arc<RwLock<DedupStats>>,
246}
247
248impl<S: BlockStore> DedupBlockStore<S> {
249    /// Create a new deduplicating block store
250    pub fn new(inner: S, config: ChunkingConfig) -> Self {
251        Self {
252            inner,
253            config,
254            chunk_index: Arc::new(DashMap::new()),
255            manifests: Arc::new(DashMap::new()),
256            stats: Arc::new(RwLock::new(DedupStats::default())),
257        }
258    }
259
260    /// Create with default configuration
261    pub fn with_defaults(inner: S) -> Self {
262        Self::new(inner, ChunkingConfig::default())
263    }
264
265    /// Get deduplication statistics
266    pub fn stats(&self) -> DedupStats {
267        self.stats.read().clone()
268    }
269
270    /// Get the underlying store
271    pub fn into_inner(self) -> S {
272        self.inner
273    }
274
275    /// Get a reference to the underlying store
276    pub fn inner(&self) -> &S {
277        &self.inner
278    }
279
280    /// Store a chunk and update the dedup index
281    async fn store_chunk(&self, chunk_data: &[u8]) -> Result<Cid> {
282        // Create chunk block to get its CID
283        let chunk_block = Block::new(bytes::Bytes::copy_from_slice(chunk_data))?;
284        let chunk_cid = *chunk_block.cid();
285
286        // Check if chunk already exists
287        if let Some(mut meta) = self.chunk_index.get_mut(&chunk_cid) {
288            // Increment reference count
289            meta.ref_count += 1;
290
291            // Update stats for duplicate
292            let mut stats = self.stats.write();
293            stats.duplicate_chunks_avoided += 1;
294
295            return Ok(meta.cid);
296        }
297
298        // New chunk - store it
299        self.inner.put(&chunk_block).await?;
300
301        // Add to index
302        self.chunk_index.insert(
303            chunk_cid,
304            ChunkMeta {
305                cid: chunk_cid,
306                ref_count: 1,
307                size: chunk_data.len(),
308            },
309        );
310
311        // Update stats
312        let mut stats = self.stats.write();
313        stats.unique_chunks += 1;
314        stats.bytes_stored += chunk_data.len();
315
316        Ok(chunk_cid)
317    }
318
319    /// Retrieve chunks and reconstruct block
320    async fn reconstruct_block(&self, manifest: &BlockManifest) -> Result<Block> {
321        let mut data = Vec::with_capacity(manifest.original_size);
322
323        for chunk_cid in &manifest.chunks {
324            let chunk_block = self
325                .inner
326                .get(chunk_cid)
327                .await?
328                .ok_or_else(|| Error::BlockNotFound(chunk_cid.to_string()))?;
329            data.extend_from_slice(chunk_block.data());
330        }
331
332        Block::new(bytes::Bytes::from(data))
333    }
334
335    /// Decrement chunk reference counts
336    async fn decrement_chunk_refs(&self, chunk_cids: &[Cid]) -> Result<()> {
337        let mut to_delete = Vec::new();
338
339        for cid in chunk_cids {
340            let should_delete = {
341                if let Some(mut entry) = self.chunk_index.get_mut(cid) {
342                    entry.ref_count = entry.ref_count.saturating_sub(1);
343                    entry.ref_count == 0
344                } else {
345                    false
346                }
347            };
348
349            if should_delete {
350                to_delete.push(*cid);
351            }
352        }
353
354        // Delete unreferenced chunks
355        for cid in to_delete {
356            if let Some((_, meta)) = self.chunk_index.remove(&cid) {
357                self.inner.delete(&cid).await?;
358
359                // Update stats
360                let mut stats = self.stats.write();
361                stats.unique_chunks = stats.unique_chunks.saturating_sub(1);
362                stats.bytes_stored = stats.bytes_stored.saturating_sub(meta.size);
363            }
364        }
365
366        Ok(())
367    }
368}
369
370#[async_trait]
371impl<S: BlockStore> BlockStore for DedupBlockStore<S> {
372    async fn put(&self, block: &Block) -> Result<()> {
373        let data = block.data();
374        let original_size = data.len();
375        let block_cid = *block.cid();
376
377        // Check if block already exists
378        let is_new_block = !self.manifests.contains_key(&block_cid);
379
380        // If block exists with same CID, it's the same data - no need to re-store
381        // Just update the manifest (idempotent operation)
382        if !is_new_block {
383            // Same CID means same data, chunks will be identical
384            // Just ensure manifest exists (it already does)
385            return Ok(());
386        }
387
388        // Chunk the data
389        let chunker = Chunker::new(self.config.clone());
390        let chunks = chunker.chunk(data);
391
392        // Store each chunk
393        let mut chunk_cids = Vec::new();
394        for chunk in chunks {
395            let cid = self.store_chunk(&chunk).await?;
396            chunk_cids.push(cid);
397        }
398
399        // Create and store manifest
400        let manifest = BlockManifest {
401            original_size,
402            chunks: chunk_cids,
403        };
404
405        self.manifests.insert(block_cid, manifest);
406
407        // Update stats for new block
408        let mut stats = self.stats.write();
409        stats.blocks_stored += 1;
410        stats.bytes_original += original_size;
411
412        Ok(())
413    }
414
415    async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
416        // Get manifest
417        let manifest = match self.manifests.get(cid) {
418            Some(m) => m.clone(),
419            None => return Ok(None),
420        };
421
422        // Reconstruct block from chunks
423        let block = self.reconstruct_block(&manifest).await?;
424        Ok(Some(block))
425    }
426
427    async fn has(&self, cid: &Cid) -> Result<bool> {
428        Ok(self.manifests.contains_key(cid))
429    }
430
431    async fn delete(&self, cid: &Cid) -> Result<()> {
432        // Get and remove manifest
433        let manifest = match self.manifests.remove(cid) {
434            Some((_, m)) => m,
435            None => return Ok(()),
436        };
437
438        // Decrement chunk reference counts
439        self.decrement_chunk_refs(&manifest.chunks).await?;
440
441        // Update stats
442        let mut stats = self.stats.write();
443        stats.blocks_stored = stats.blocks_stored.saturating_sub(1);
444        stats.bytes_original = stats.bytes_original.saturating_sub(manifest.original_size);
445
446        Ok(())
447    }
448
449    fn list_cids(&self) -> Result<Vec<Cid>> {
450        let cids: Vec<Cid> = self.manifests.iter().map(|entry| *entry.key()).collect();
451        Ok(cids)
452    }
453
454    fn len(&self) -> usize {
455        self.manifests.len()
456    }
457
458    fn is_empty(&self) -> bool {
459        self.manifests.is_empty()
460    }
461
462    async fn flush(&self) -> Result<()> {
463        self.inner.flush().await
464    }
465
466    async fn close(&self) -> Result<()> {
467        self.inner.close().await
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::blockstore::{BlockStoreConfig, SledBlockStore};
475    use std::path::PathBuf;
476
477    #[test]
478    fn test_chunking_config() {
479        let config = ChunkingConfig::default();
480        assert_eq!(config.min_chunk_size, 256 * 1024);
481        assert_eq!(config.target_chunk_size, 1024 * 1024);
482
483        let small = ChunkingConfig::small();
484        assert!(small.min_chunk_size < config.min_chunk_size);
485
486        let large = ChunkingConfig::large();
487        assert!(large.min_chunk_size > config.min_chunk_size);
488    }
489
490    #[test]
491    fn test_chunker_basic() {
492        let config = ChunkingConfig {
493            min_chunk_size: 16 * 1024,
494            target_chunk_size: 64 * 1024,
495            max_chunk_size: 128 * 1024,
496            hash_mask: 0xFFF,
497        };
498        let chunker = Chunker::new(config.clone());
499
500        // Data smaller than min should be single chunk
501        let small_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); // 10KB
502        let chunks = chunker.chunk(&small_data);
503        assert_eq!(chunks.len(), 1, "10KB data should be 1 chunk (min is 16KB)");
504        assert_eq!(chunks[0].len(), 10240);
505
506        // Identical data should produce identical chunks
507        let small_data2: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); // 10KB
508        let chunks2 = chunker.chunk(&small_data2);
509        assert_eq!(chunks2.len(), 1);
510        assert_eq!(
511            chunks[0], chunks2[0],
512            "Identical data should produce identical chunks"
513        );
514
515        // Check that chunk CIDs would be the same
516        let chunk_block1 = Block::new(bytes::Bytes::copy_from_slice(&chunks[0])).unwrap();
517        let chunk_block2 = Block::new(bytes::Bytes::copy_from_slice(&chunks2[0])).unwrap();
518        assert_eq!(
519            chunk_block1.cid(),
520            chunk_block2.cid(),
521            "Identical chunks should have same CID"
522        );
523    }
524
525    #[test]
526    fn test_dedup_stats() {
527        let stats = DedupStats {
528            blocks_stored: 0,
529            bytes_original: 1000,
530            bytes_stored: 600,
531            unique_chunks: 0,
532            duplicate_chunks_avoided: 0,
533        };
534
535        assert_eq!(stats.dedup_ratio(), 0.4); // 40% savings
536        assert_eq!(stats.bytes_saved(), 400);
537    }
538
539    #[test]
540    fn test_chunker() {
541        let config = ChunkingConfig::small();
542        let chunker = Chunker::new(config.clone());
543
544        // Small data should be single chunk
545        let small_data = vec![0u8; 32 * 1024]; // 32KB
546        let chunks = chunker.chunk(&small_data);
547        assert_eq!(chunks.len(), 1);
548
549        // Larger data with varied content should be chunked
550        // FastCDC works best with non-uniform data
551        let mut large_data = Vec::new();
552        for i in 0..500 {
553            // Create 1KB blocks of varying data
554            let block: Vec<u8> = (0..1024).map(|j| ((i * 1024 + j) % 256) as u8).collect();
555            large_data.extend_from_slice(&block);
556        }
557        let chunks = chunker.chunk(&large_data);
558
559        // With varied data, FastCDC should find boundaries
560        // The exact number depends on content, but should be > 1 for 500KB
561        assert!(
562            chunks.len() > 1,
563            "Expected multiple chunks for 500KB of varied data"
564        );
565
566        // Verify chunks respect size constraints
567        for (i, chunk) in chunks.iter().enumerate() {
568            if i < chunks.len() - 1 {
569                // Not the last chunk
570                assert!(
571                    chunk.len() >= config.min_chunk_size,
572                    "Chunk {} size {} < min {}",
573                    i,
574                    chunk.len(),
575                    config.min_chunk_size
576                );
577                assert!(
578                    chunk.len() <= config.max_chunk_size,
579                    "Chunk {} size {} > max {}",
580                    i,
581                    chunk.len(),
582                    config.max_chunk_size
583                );
584            }
585        }
586    }
587
588    #[tokio::test]
589    async fn test_dedup_blockstore_basic() {
590        let config = BlockStoreConfig {
591            path: PathBuf::from("/tmp/ipfrs-test-dedup-basic"),
592            cache_size: 1024 * 1024,
593        };
594
595        // Clean up
596        let _ = std::fs::remove_dir_all(&config.path);
597
598        let inner = SledBlockStore::new(config).unwrap();
599        let store = DedupBlockStore::with_defaults(inner);
600
601        // Store a block
602        let data = bytes::Bytes::from(vec![1u8; 100 * 1024]); // 100KB
603        let block = Block::new(data.clone()).unwrap();
604
605        store.put(&block).await.unwrap();
606
607        // Retrieve it
608        let retrieved = store.get(block.cid()).await.unwrap().unwrap();
609        assert_eq!(retrieved.data(), block.data());
610
611        // Check stats
612        let stats = store.stats();
613        assert_eq!(stats.blocks_stored, 1);
614        assert_eq!(stats.bytes_original, 100 * 1024);
615    }
616
617    #[tokio::test]
618    async fn test_dedup_duplicate_blocks() {
619        let config = BlockStoreConfig {
620            path: PathBuf::from("/tmp/ipfrs-test-dedup-duplicates"),
621            cache_size: 1024 * 1024,
622        };
623
624        // Clean up
625        let _ = std::fs::remove_dir_all(&config.path);
626
627        let inner = SledBlockStore::new(config).unwrap();
628        // Use custom config optimized for dedup testing
629        let chunk_config = ChunkingConfig {
630            min_chunk_size: 32 * 1024,    // 32KB min
631            target_chunk_size: 64 * 1024, // 64KB target
632            max_chunk_size: 128 * 1024,   // 128KB max
633            hash_mask: 0x1FFF,            // ~8KB avg for boundary detection
634        };
635        let store = DedupBlockStore::new(inner, chunk_config);
636
637        // Create varied data patterns that FastCDC can chunk consistently
638        // Use a repeating pattern that will create natural boundaries
639        let mut chunk_data = Vec::new();
640        for i in 0..40 {
641            let pattern: Vec<u8> = (0..1024).map(|j| ((i * 1024 + j) % 256) as u8).collect();
642            chunk_data.extend_from_slice(&pattern);
643        }
644        // chunk_data is now 40KB of patterned data
645
646        let block1 = Block::new(bytes::Bytes::from(chunk_data.clone())).unwrap();
647
648        // Create block2 with the same pattern repeated - FastCDC should find same chunks
649        let mut data2 = chunk_data.clone();
650        data2.extend_from_slice(&chunk_data); // 80KB total
651        let block2 = Block::new(bytes::Bytes::from(data2)).unwrap();
652
653        // Store block1
654        store.put(&block1).await.unwrap();
655
656        let stats_after_first = store.stats();
657        let first_chunks = stats_after_first.unique_chunks;
658        assert!(first_chunks >= 1, "Expected at least 1 chunk");
659
660        // Store block2 - should reuse chunks from block1 where content matches
661        store.put(&block2).await.unwrap();
662
663        let stats = store.stats();
664        assert_eq!(stats.blocks_stored, 2);
665
666        // With identical content patterns, block2 should reuse at least some chunks
667        // The exact number depends on where FastCDC finds boundaries
668        assert!(
669            stats.duplicate_chunks_avoided > 0,
670            "Expected some duplicate chunks to be avoided"
671        );
672
673        // Verify both blocks can be retrieved correctly
674        let retrieved1 = store.get(block1.cid()).await.unwrap().unwrap();
675        let retrieved2 = store.get(block2.cid()).await.unwrap().unwrap();
676
677        assert_eq!(retrieved1.data(), block1.data());
678        assert_eq!(retrieved2.data(), block2.data());
679    }
680
681    #[tokio::test]
682    async fn test_dedup_delete() {
683        let config = BlockStoreConfig {
684            path: PathBuf::from("/tmp/ipfrs-test-dedup-delete"),
685            cache_size: 1024 * 1024,
686        };
687
688        // Clean up
689        let _ = std::fs::remove_dir_all(&config.path);
690
691        let inner = SledBlockStore::new(config).unwrap();
692        let store = DedupBlockStore::with_defaults(inner);
693
694        // Store a block
695        let data = bytes::Bytes::from(vec![3u8; 200 * 1024]);
696        let block = Block::new(data).unwrap();
697
698        store.put(&block).await.unwrap();
699
700        let stats_before = store.stats();
701        assert_eq!(stats_before.blocks_stored, 1);
702
703        // Delete it
704        store.delete(block.cid()).await.unwrap();
705
706        let stats_after = store.stats();
707        assert_eq!(stats_after.blocks_stored, 0);
708
709        // Should not be retrievable
710        let retrieved = store.get(block.cid()).await.unwrap();
711        assert!(retrieved.is_none());
712    }
713
714    #[tokio::test]
715    async fn test_dedup_reference_counting() {
716        let config = BlockStoreConfig {
717            path: PathBuf::from("/tmp/ipfrs-test-dedup-refcount"),
718            cache_size: 1024 * 1024,
719        };
720
721        // Clean up
722        let _ = std::fs::remove_dir_all(&config.path);
723
724        let inner = SledBlockStore::new(config).unwrap();
725        let chunk_config = ChunkingConfig {
726            min_chunk_size: 16 * 1024,
727            target_chunk_size: 64 * 1024,
728            max_chunk_size: 128 * 1024,
729            hash_mask: 0xFFF,
730        };
731        let store = DedupBlockStore::new(inner, chunk_config);
732
733        // Create blocks that are under min_chunk_size (will be single chunks)
734        // Use patterned data to avoid issues with uniform data
735        let data1: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); // 10KB varied
736        let data2 = data1.clone(); // Same content
737        let data3: Vec<u8> = (0..10240).map(|i| ((i + 100) % 256) as u8).collect(); // 10KB different
738
739        let block1 = Block::new(bytes::Bytes::from(data1)).unwrap();
740        let block2 = Block::new(bytes::Bytes::from(data2)).unwrap();
741        let block3 = Block::new(bytes::Bytes::from(data3)).unwrap();
742
743        // block1 and block2 have same content, so same CID
744        assert_eq!(block1.cid(), block2.cid());
745        // block3 is different
746        assert_ne!(block1.cid(), block3.cid());
747
748        // Store block1
749        store.put(&block1).await.unwrap();
750        let stats1 = store.stats();
751        assert_eq!(stats1.unique_chunks, 1, "block1 should be 1 chunk");
752        assert_eq!(stats1.blocks_stored, 1);
753
754        // Store block2 (same CID as block1) - idempotent, no-op
755        store.put(&block2).await.unwrap();
756        let stats2 = store.stats();
757        // Same CID means same data - put() is idempotent, no changes
758        assert_eq!(
759            stats2.unique_chunks, 1,
760            "block2 is same as block1 (same CID)"
761        );
762        assert_eq!(stats2.blocks_stored, 1, "Still 1 block (same CID)");
763        assert_eq!(
764            stats2.duplicate_chunks_avoided, 0,
765            "No chunking happened for duplicate CID"
766        );
767
768        // Store block3 (different) - should create new chunk
769        store.put(&block3).await.unwrap();
770        let stats3 = store.stats();
771        assert_eq!(stats3.unique_chunks, 2, "block3 adds a new unique chunk");
772        assert_eq!(stats3.blocks_stored, 2, "Now have 2 different blocks");
773
774        // Verify retrieval
775        let retrieved1 = store.get(block1.cid()).await.unwrap().unwrap();
776        assert_eq!(retrieved1.data(), block1.data());
777
778        let retrieved3 = store.get(block3.cid()).await.unwrap().unwrap();
779        assert_eq!(retrieved3.data(), block3.data());
780
781        // Delete block1/block2 (same CID) - should free its chunk
782        store.delete(block1.cid()).await.unwrap();
783        let stats_after_delete = store.stats();
784        assert_eq!(
785            stats_after_delete.unique_chunks, 1,
786            "Only block3's chunk remains"
787        );
788        assert_eq!(stats_after_delete.blocks_stored, 1);
789
790        // Delete block3 - should free remaining chunk
791        store.delete(block3.cid()).await.unwrap();
792
793        let stats_final = store.stats();
794        assert_eq!(stats_final.unique_chunks, 0);
795        assert_eq!(stats_final.bytes_stored, 0);
796        assert_eq!(stats_final.blocks_stored, 0);
797    }
798}