1use crate::traits::BlockStore;
14use async_trait::async_trait;
15use dashmap::DashMap;
16use ipfrs_core::{Block, Cid, Error, Result};
17use parking_lot::RwLock;
18use std::sync::Arc;
19
20#[derive(Debug, Clone)]
22pub struct ChunkingConfig {
23 pub min_chunk_size: usize,
25 pub target_chunk_size: usize,
27 pub max_chunk_size: usize,
29 pub hash_mask: u32,
31}
32
33impl Default for ChunkingConfig {
34 fn default() -> Self {
35 Self {
36 min_chunk_size: 256 * 1024, target_chunk_size: 1024 * 1024, max_chunk_size: 4 * 1024 * 1024, hash_mask: 0xFFFF, }
41 }
42}
43
44impl ChunkingConfig {
45 pub fn small() -> Self {
47 Self {
48 min_chunk_size: 64 * 1024, target_chunk_size: 256 * 1024, max_chunk_size: 1024 * 1024, hash_mask: 0x3FFF, }
53 }
54
55 pub fn large() -> Self {
57 Self {
58 min_chunk_size: 1024 * 1024, target_chunk_size: 4 * 1024 * 1024, max_chunk_size: 16 * 1024 * 1024, hash_mask: 0x1FFFF, }
63 }
64}
65
66#[derive(Debug, Clone)]
68struct ChunkMeta {
69 cid: Cid,
71 ref_count: usize,
73 size: usize,
75}
76
77#[derive(Debug, Clone)]
79struct BlockManifest {
80 original_size: usize,
82 chunks: Vec<Cid>,
84}
85
86#[derive(Debug, Clone, Default)]
88pub struct DedupStats {
89 pub blocks_stored: usize,
91 pub bytes_original: usize,
93 pub bytes_stored: usize,
95 pub unique_chunks: usize,
97 pub duplicate_chunks_avoided: usize,
99}
100
101impl DedupStats {
102 pub fn dedup_ratio(&self) -> f64 {
104 if self.bytes_original == 0 {
105 return 0.0;
106 }
107 1.0 - (self.bytes_stored as f64 / self.bytes_original as f64)
108 }
109
110 pub fn bytes_saved(&self) -> usize {
112 self.bytes_original.saturating_sub(self.bytes_stored)
113 }
114
115 pub fn avg_chunk_size(&self) -> usize {
117 if self.unique_chunks == 0 {
118 return 0;
119 }
120 self.bytes_stored / self.unique_chunks
121 }
122}
123
124#[allow(dead_code)]
127const GEAR: [u64; 256] = [
128 0x5c95c078, 0x22408989, 0x2d48a214, 0x12842087, 0x530f8afb, 0x2aaa3f86, 0x7f1bd89f, 0x62534467,
129 0x22c4b83b, 0x3e36d3e7, 0x4c9fa05b, 0x0b20f0e3, 0x441c8a8c, 0x7cc27988, 0x5505c6c0, 0x3c9ae0da,
130 0x153e46cd, 0x0d05f5b5, 0x51c9c3b5, 0x02e57b86, 0x74a8d4ba, 0x6f16cbb5, 0x2ffc27ea, 0x5fa83e0f,
131 0x75ab67e2, 0x3ff15813, 0x2ec58ac7, 0x6f1f0520, 0x0c5d7dba, 0x4a9f5e76, 0x4ec58e64, 0x6a470c8e,
132 0x40edf2ca, 0x1a1c0c8d, 0x4e32e5e4, 0x6c7a7fda, 0x4b3be9e4, 0x64d8e67b, 0x2ef8ad98, 0x34d9f7e5,
133 0x7e7e4a36, 0x1a1c54d1, 0x5e2a9e7a, 0x3e5f0a8e, 0x0e01d1a0, 0x1f31aa27, 0x049c9e3e, 0x7c38f56e,
134 0x4b8d9ef0, 0x0b9c4d05, 0x55f59f0d, 0x3e8e02ae, 0x25c46f84, 0x6e6fdc6f, 0x440ae4a7, 0x3e38a0e6,
135 0x5b96c3d1, 0x72a06105, 0x52cd5e2d, 0x3d015fb3, 0x4d7c7064, 0x1c8c169c, 0x5c95e834, 0x0c4d9d42,
136 0x3c9c8ea3, 0x10a5d9d6, 0x7dcb9d63, 0x3ecf9e96, 0x1f5c9e5f, 0x7e7854c5, 0x48a05ae3, 0x0c4e9419,
137 0x6b5c9b6f, 0x7e1a6dc0, 0x3b8f9fe8, 0x6f6e8e3f, 0x39f48adb, 0x7b8d9e72, 0x29e18dc5, 0x7e6c3fc4,
138 0x5d9c4ab8, 0x1f6e9dc2, 0x3e8f9fc3, 0x7d9c8ea6, 0x0e1f8d9c, 0x5f9d8e72, 0x3e9f8dcb, 0x7d8e9f72,
139 0x2f9d8ea5, 0x6e8f9dc4, 0x3d9f8ec5, 0x7e8d9f63, 0x1f9e8dc3, 0x6d8f9ec4, 0x3e9d8fc5, 0x7d9e8f62,
140 0x2e9f8dc4, 0x6f8d9ec5, 0x3d9e8fc3, 0x7e9d8f64, 0x1f8e9dc5, 0x6e9f8dc4, 0x3d8e9fc5, 0x7d9f8e63,
141 0x2f8d9ec4, 0x6e8f9dc5, 0x3e9d8fc4, 0x7d8e9f65, 0x1f9d8ec5, 0x6d9f8dc4, 0x3e8d9fc5, 0x7e9f8d62,
142 0x2d8e9fc4, 0x6f9d8ec5, 0x3d8f9dc4, 0x7e8d9f66, 0x1e9f8dc5, 0x6d8e9fc4, 0x3f9d8ec5, 0x7d9e8f61,
143 0x2f9d8ec4, 0x6e8d9fc5, 0x3d9f8dc4, 0x7e8f9d67, 0x1f8d9ec5, 0x6e9d8fc4, 0x3d8e9fc5, 0x7f9d8e60,
144 0x2e8f9dc4, 0x6f9e8dc5, 0x3d8d9fc4, 0x7e9f8d68, 0x1d9e8fc5, 0x6f8d9ec4, 0x3e9f8dc5, 0x7d8e9f5f,
145 0x2f8e9dc4, 0x6d9f8ec5, 0x3e8d9fc4, 0x7f9e8d69, 0x1f9d8ec5, 0x6e8f9dc4, 0x3d9e8fc5, 0x7e8d9f5e,
146 0x2d9f8ec4, 0x6f8e9dc5, 0x3d8f9fc4, 0x7e9d8e6a, 0x1e8f9dc5, 0x6d9e8fc4, 0x3f8d9ec5, 0x7d9f8e5d,
147 0x2f8d9fc4, 0x6e9f8ec5, 0x3d8e9dc4, 0x7f8d9e6b, 0x1f8e9fc5, 0x6e8d9ec4, 0x3d9f8fc5, 0x7e9e8d5c,
148 0x2e9d8fc4, 0x6f8e9dc5, 0x3e8f9ec4, 0x7d9e8f6c, 0x1f9e8dc5, 0x6d8f9fc4, 0x3e9d8ec5, 0x7d8f9e5b,
149 0x2f9e8dc4, 0x6e8d9fc5, 0x3d9f8ec4, 0x7e8f9d6d, 0x1e9d8fc5, 0x6f8e9dc4, 0x3d8f9ec5, 0x7e9d8f5a,
150 0x2d8f9ec4, 0x6e9d8fc5, 0x3f8e9dc4, 0x7d9f8e6e, 0x1f8d9fc5, 0x6e9e8dc4, 0x3d8f9fc5, 0x7f8e9d59,
151 0x2e8d9fc4, 0x6f9e8dc5, 0x3d9f8ec4, 0x7e8d9f6f, 0x1d9f8ec5, 0x6f8d9dc4, 0x3e8e9fc5, 0x7d9f8e58,
152 0x2f8e9fc4, 0x6d9f8dc5, 0x3e8d9ec4, 0x7f9e8d70, 0x1f8e9dc5, 0x6d8f9ec4, 0x3f9d8fc5, 0x7e8f9d57,
153 0x2d9e8fc4, 0x6f8e9dc5, 0x3d8f9ec4, 0x7e9d8f71, 0x1e9f8dc5, 0x6f8d9ec4, 0x3d9e8fc5, 0x7f8d9e56,
154 0x2f8d9ec4, 0x6e9f8dc5, 0x3e8d9fc4, 0x7d9e8f72, 0x1f9d8fc5, 0x6e8f9dc4, 0x3d8e9fc5, 0x7e9f8d55,
155 0x2e8f9fc4, 0x6d9e8dc5, 0x3f8d9ec4, 0x7e8f9d73, 0x1d9f8fc5, 0x6f8e9dc4, 0x3e8d9fc5, 0x7d8f9e54,
156 0x2f9e8dc4, 0x6e8f9fc5, 0x3d9d8ec4, 0x7f8e9d74, 0x1e8d9fc5, 0x6d9f8ec4, 0x3f8e9dc5, 0x7e9d8f53,
157 0x2d8e9fc4, 0x6f9d8ec5, 0x3d8f9fc4, 0x7e9f8d75, 0x1f8d9ec5, 0x6e9d8fc4, 0x3d9f8ec5, 0x7f8e9d52,
158 0x2e9f8dc4, 0x6d8e9fc5, 0x3f9d8ec4, 0x7d8f9e76, 0x1f9e8dc5, 0x6f8d9ec4, 0x3e9f8fc5, 0x7d9e8f51,
159 0x2f8d9fc4, 0x6e9e8dc5, 0x3d8f9ec4, 0x7e8d9f77, 0x1e9f8dc5, 0x6d8f9fc4, 0x3f8e9dc5, 0x7e9d8e50,
160];
161
162struct Chunker {
164 config: ChunkingConfig,
165}
166
167impl Chunker {
168 fn new(config: ChunkingConfig) -> Self {
169 Self { config }
170 }
171
172 fn chunk(&self, data: &[u8]) -> Vec<Vec<u8>> {
174 if data.len() <= self.config.min_chunk_size {
175 return vec![data.to_vec()];
176 }
177
178 let mut chunks = Vec::new();
179 let mut start = 0;
180
181 while start < data.len() {
182 let remaining = data.len() - start;
183
184 if remaining <= self.config.min_chunk_size {
186 chunks.push(data[start..].to_vec());
187 break;
188 }
189
190 let boundary = self.find_boundary(&data[start..]);
192 let end = start + boundary;
193
194 chunks.push(data[start..end].to_vec());
195 start = end;
196 }
197
198 chunks
199 }
200
201 #[allow(clippy::needless_range_loop)]
203 fn find_boundary(&self, data: &[u8]) -> usize {
204 let max_scan = self.config.max_chunk_size.min(data.len());
205 let min_size = self.config.min_chunk_size.min(data.len());
206
207 let nc_level = min_size + (self.config.target_chunk_size - min_size) / 4;
209
210 let mut hash: u64 = 0;
211 const PRIME: u64 = 0x01000193; let mask_s = self.config.hash_mask as u64; let mask_l = (self.config.hash_mask >> 1) as u64; for idx in min_size..max_scan {
217 let byte = data[idx];
218
219 hash = hash.wrapping_mul(PRIME) ^ (byte as u64);
221
222 let mask = if idx < nc_level { mask_s } else { mask_l };
224
225 if (hash & mask) == 0 {
227 return idx + 1;
228 }
229 }
230
231 max_scan
233 }
234}
235
236pub struct DedupBlockStore<S> {
238 inner: S,
239 config: ChunkingConfig,
240 chunk_index: Arc<DashMap<Cid, ChunkMeta>>,
242 manifests: Arc<DashMap<Cid, BlockManifest>>,
244 stats: Arc<RwLock<DedupStats>>,
246}
247
248impl<S: BlockStore> DedupBlockStore<S> {
249 pub fn new(inner: S, config: ChunkingConfig) -> Self {
251 Self {
252 inner,
253 config,
254 chunk_index: Arc::new(DashMap::new()),
255 manifests: Arc::new(DashMap::new()),
256 stats: Arc::new(RwLock::new(DedupStats::default())),
257 }
258 }
259
260 pub fn with_defaults(inner: S) -> Self {
262 Self::new(inner, ChunkingConfig::default())
263 }
264
265 pub fn stats(&self) -> DedupStats {
267 self.stats.read().clone()
268 }
269
270 pub fn into_inner(self) -> S {
272 self.inner
273 }
274
275 pub fn inner(&self) -> &S {
277 &self.inner
278 }
279
280 async fn store_chunk(&self, chunk_data: &[u8]) -> Result<Cid> {
282 let chunk_block = Block::new(bytes::Bytes::copy_from_slice(chunk_data))?;
284 let chunk_cid = *chunk_block.cid();
285
286 if let Some(mut meta) = self.chunk_index.get_mut(&chunk_cid) {
288 meta.ref_count += 1;
290
291 let mut stats = self.stats.write();
293 stats.duplicate_chunks_avoided += 1;
294
295 return Ok(meta.cid);
296 }
297
298 self.inner.put(&chunk_block).await?;
300
301 self.chunk_index.insert(
303 chunk_cid,
304 ChunkMeta {
305 cid: chunk_cid,
306 ref_count: 1,
307 size: chunk_data.len(),
308 },
309 );
310
311 let mut stats = self.stats.write();
313 stats.unique_chunks += 1;
314 stats.bytes_stored += chunk_data.len();
315
316 Ok(chunk_cid)
317 }
318
319 async fn reconstruct_block(&self, manifest: &BlockManifest) -> Result<Block> {
321 let mut data = Vec::with_capacity(manifest.original_size);
322
323 for chunk_cid in &manifest.chunks {
324 let chunk_block = self
325 .inner
326 .get(chunk_cid)
327 .await?
328 .ok_or_else(|| Error::BlockNotFound(chunk_cid.to_string()))?;
329 data.extend_from_slice(chunk_block.data());
330 }
331
332 Block::new(bytes::Bytes::from(data))
333 }
334
335 async fn decrement_chunk_refs(&self, chunk_cids: &[Cid]) -> Result<()> {
337 let mut to_delete = Vec::new();
338
339 for cid in chunk_cids {
340 let should_delete = {
341 if let Some(mut entry) = self.chunk_index.get_mut(cid) {
342 entry.ref_count = entry.ref_count.saturating_sub(1);
343 entry.ref_count == 0
344 } else {
345 false
346 }
347 };
348
349 if should_delete {
350 to_delete.push(*cid);
351 }
352 }
353
354 for cid in to_delete {
356 if let Some((_, meta)) = self.chunk_index.remove(&cid) {
357 self.inner.delete(&cid).await?;
358
359 let mut stats = self.stats.write();
361 stats.unique_chunks = stats.unique_chunks.saturating_sub(1);
362 stats.bytes_stored = stats.bytes_stored.saturating_sub(meta.size);
363 }
364 }
365
366 Ok(())
367 }
368}
369
370#[async_trait]
371impl<S: BlockStore> BlockStore for DedupBlockStore<S> {
372 async fn put(&self, block: &Block) -> Result<()> {
373 let data = block.data();
374 let original_size = data.len();
375 let block_cid = *block.cid();
376
377 let is_new_block = !self.manifests.contains_key(&block_cid);
379
380 if !is_new_block {
383 return Ok(());
386 }
387
388 let chunker = Chunker::new(self.config.clone());
390 let chunks = chunker.chunk(data);
391
392 let mut chunk_cids = Vec::new();
394 for chunk in chunks {
395 let cid = self.store_chunk(&chunk).await?;
396 chunk_cids.push(cid);
397 }
398
399 let manifest = BlockManifest {
401 original_size,
402 chunks: chunk_cids,
403 };
404
405 self.manifests.insert(block_cid, manifest);
406
407 let mut stats = self.stats.write();
409 stats.blocks_stored += 1;
410 stats.bytes_original += original_size;
411
412 Ok(())
413 }
414
415 async fn get(&self, cid: &Cid) -> Result<Option<Block>> {
416 let manifest = match self.manifests.get(cid) {
418 Some(m) => m.clone(),
419 None => return Ok(None),
420 };
421
422 let block = self.reconstruct_block(&manifest).await?;
424 Ok(Some(block))
425 }
426
427 async fn has(&self, cid: &Cid) -> Result<bool> {
428 Ok(self.manifests.contains_key(cid))
429 }
430
431 async fn delete(&self, cid: &Cid) -> Result<()> {
432 let manifest = match self.manifests.remove(cid) {
434 Some((_, m)) => m,
435 None => return Ok(()),
436 };
437
438 self.decrement_chunk_refs(&manifest.chunks).await?;
440
441 let mut stats = self.stats.write();
443 stats.blocks_stored = stats.blocks_stored.saturating_sub(1);
444 stats.bytes_original = stats.bytes_original.saturating_sub(manifest.original_size);
445
446 Ok(())
447 }
448
449 fn list_cids(&self) -> Result<Vec<Cid>> {
450 let cids: Vec<Cid> = self.manifests.iter().map(|entry| *entry.key()).collect();
451 Ok(cids)
452 }
453
454 fn len(&self) -> usize {
455 self.manifests.len()
456 }
457
458 fn is_empty(&self) -> bool {
459 self.manifests.is_empty()
460 }
461
462 async fn flush(&self) -> Result<()> {
463 self.inner.flush().await
464 }
465
466 async fn close(&self) -> Result<()> {
467 self.inner.close().await
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use crate::blockstore::{BlockStoreConfig, SledBlockStore};
475 use std::path::PathBuf;
476
477 #[test]
478 fn test_chunking_config() {
479 let config = ChunkingConfig::default();
480 assert_eq!(config.min_chunk_size, 256 * 1024);
481 assert_eq!(config.target_chunk_size, 1024 * 1024);
482
483 let small = ChunkingConfig::small();
484 assert!(small.min_chunk_size < config.min_chunk_size);
485
486 let large = ChunkingConfig::large();
487 assert!(large.min_chunk_size > config.min_chunk_size);
488 }
489
490 #[test]
491 fn test_chunker_basic() {
492 let config = ChunkingConfig {
493 min_chunk_size: 16 * 1024,
494 target_chunk_size: 64 * 1024,
495 max_chunk_size: 128 * 1024,
496 hash_mask: 0xFFF,
497 };
498 let chunker = Chunker::new(config.clone());
499
500 let small_data: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); let chunks = chunker.chunk(&small_data);
503 assert_eq!(chunks.len(), 1, "10KB data should be 1 chunk (min is 16KB)");
504 assert_eq!(chunks[0].len(), 10240);
505
506 let small_data2: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); let chunks2 = chunker.chunk(&small_data2);
509 assert_eq!(chunks2.len(), 1);
510 assert_eq!(
511 chunks[0], chunks2[0],
512 "Identical data should produce identical chunks"
513 );
514
515 let chunk_block1 = Block::new(bytes::Bytes::copy_from_slice(&chunks[0])).unwrap();
517 let chunk_block2 = Block::new(bytes::Bytes::copy_from_slice(&chunks2[0])).unwrap();
518 assert_eq!(
519 chunk_block1.cid(),
520 chunk_block2.cid(),
521 "Identical chunks should have same CID"
522 );
523 }
524
525 #[test]
526 fn test_dedup_stats() {
527 let stats = DedupStats {
528 blocks_stored: 0,
529 bytes_original: 1000,
530 bytes_stored: 600,
531 unique_chunks: 0,
532 duplicate_chunks_avoided: 0,
533 };
534
535 assert_eq!(stats.dedup_ratio(), 0.4); assert_eq!(stats.bytes_saved(), 400);
537 }
538
539 #[test]
540 fn test_chunker() {
541 let config = ChunkingConfig::small();
542 let chunker = Chunker::new(config.clone());
543
544 let small_data = vec![0u8; 32 * 1024]; let chunks = chunker.chunk(&small_data);
547 assert_eq!(chunks.len(), 1);
548
549 let mut large_data = Vec::new();
552 for i in 0..500 {
553 let block: Vec<u8> = (0..1024).map(|j| ((i * 1024 + j) % 256) as u8).collect();
555 large_data.extend_from_slice(&block);
556 }
557 let chunks = chunker.chunk(&large_data);
558
559 assert!(
562 chunks.len() > 1,
563 "Expected multiple chunks for 500KB of varied data"
564 );
565
566 for (i, chunk) in chunks.iter().enumerate() {
568 if i < chunks.len() - 1 {
569 assert!(
571 chunk.len() >= config.min_chunk_size,
572 "Chunk {} size {} < min {}",
573 i,
574 chunk.len(),
575 config.min_chunk_size
576 );
577 assert!(
578 chunk.len() <= config.max_chunk_size,
579 "Chunk {} size {} > max {}",
580 i,
581 chunk.len(),
582 config.max_chunk_size
583 );
584 }
585 }
586 }
587
588 #[tokio::test]
589 async fn test_dedup_blockstore_basic() {
590 let config = BlockStoreConfig {
591 path: PathBuf::from("/tmp/ipfrs-test-dedup-basic"),
592 cache_size: 1024 * 1024,
593 };
594
595 let _ = std::fs::remove_dir_all(&config.path);
597
598 let inner = SledBlockStore::new(config).unwrap();
599 let store = DedupBlockStore::with_defaults(inner);
600
601 let data = bytes::Bytes::from(vec![1u8; 100 * 1024]); let block = Block::new(data.clone()).unwrap();
604
605 store.put(&block).await.unwrap();
606
607 let retrieved = store.get(block.cid()).await.unwrap().unwrap();
609 assert_eq!(retrieved.data(), block.data());
610
611 let stats = store.stats();
613 assert_eq!(stats.blocks_stored, 1);
614 assert_eq!(stats.bytes_original, 100 * 1024);
615 }
616
617 #[tokio::test]
618 async fn test_dedup_duplicate_blocks() {
619 let config = BlockStoreConfig {
620 path: PathBuf::from("/tmp/ipfrs-test-dedup-duplicates"),
621 cache_size: 1024 * 1024,
622 };
623
624 let _ = std::fs::remove_dir_all(&config.path);
626
627 let inner = SledBlockStore::new(config).unwrap();
628 let chunk_config = ChunkingConfig {
630 min_chunk_size: 32 * 1024, target_chunk_size: 64 * 1024, max_chunk_size: 128 * 1024, hash_mask: 0x1FFF, };
635 let store = DedupBlockStore::new(inner, chunk_config);
636
637 let mut chunk_data = Vec::new();
640 for i in 0..40 {
641 let pattern: Vec<u8> = (0..1024).map(|j| ((i * 1024 + j) % 256) as u8).collect();
642 chunk_data.extend_from_slice(&pattern);
643 }
644 let block1 = Block::new(bytes::Bytes::from(chunk_data.clone())).unwrap();
647
648 let mut data2 = chunk_data.clone();
650 data2.extend_from_slice(&chunk_data); let block2 = Block::new(bytes::Bytes::from(data2)).unwrap();
652
653 store.put(&block1).await.unwrap();
655
656 let stats_after_first = store.stats();
657 let first_chunks = stats_after_first.unique_chunks;
658 assert!(first_chunks >= 1, "Expected at least 1 chunk");
659
660 store.put(&block2).await.unwrap();
662
663 let stats = store.stats();
664 assert_eq!(stats.blocks_stored, 2);
665
666 assert!(
669 stats.duplicate_chunks_avoided > 0,
670 "Expected some duplicate chunks to be avoided"
671 );
672
673 let retrieved1 = store.get(block1.cid()).await.unwrap().unwrap();
675 let retrieved2 = store.get(block2.cid()).await.unwrap().unwrap();
676
677 assert_eq!(retrieved1.data(), block1.data());
678 assert_eq!(retrieved2.data(), block2.data());
679 }
680
681 #[tokio::test]
682 async fn test_dedup_delete() {
683 let config = BlockStoreConfig {
684 path: PathBuf::from("/tmp/ipfrs-test-dedup-delete"),
685 cache_size: 1024 * 1024,
686 };
687
688 let _ = std::fs::remove_dir_all(&config.path);
690
691 let inner = SledBlockStore::new(config).unwrap();
692 let store = DedupBlockStore::with_defaults(inner);
693
694 let data = bytes::Bytes::from(vec![3u8; 200 * 1024]);
696 let block = Block::new(data).unwrap();
697
698 store.put(&block).await.unwrap();
699
700 let stats_before = store.stats();
701 assert_eq!(stats_before.blocks_stored, 1);
702
703 store.delete(block.cid()).await.unwrap();
705
706 let stats_after = store.stats();
707 assert_eq!(stats_after.blocks_stored, 0);
708
709 let retrieved = store.get(block.cid()).await.unwrap();
711 assert!(retrieved.is_none());
712 }
713
714 #[tokio::test]
715 async fn test_dedup_reference_counting() {
716 let config = BlockStoreConfig {
717 path: PathBuf::from("/tmp/ipfrs-test-dedup-refcount"),
718 cache_size: 1024 * 1024,
719 };
720
721 let _ = std::fs::remove_dir_all(&config.path);
723
724 let inner = SledBlockStore::new(config).unwrap();
725 let chunk_config = ChunkingConfig {
726 min_chunk_size: 16 * 1024,
727 target_chunk_size: 64 * 1024,
728 max_chunk_size: 128 * 1024,
729 hash_mask: 0xFFF,
730 };
731 let store = DedupBlockStore::new(inner, chunk_config);
732
733 let data1: Vec<u8> = (0..10240).map(|i| (i % 256) as u8).collect(); let data2 = data1.clone(); let data3: Vec<u8> = (0..10240).map(|i| ((i + 100) % 256) as u8).collect(); let block1 = Block::new(bytes::Bytes::from(data1)).unwrap();
740 let block2 = Block::new(bytes::Bytes::from(data2)).unwrap();
741 let block3 = Block::new(bytes::Bytes::from(data3)).unwrap();
742
743 assert_eq!(block1.cid(), block2.cid());
745 assert_ne!(block1.cid(), block3.cid());
747
748 store.put(&block1).await.unwrap();
750 let stats1 = store.stats();
751 assert_eq!(stats1.unique_chunks, 1, "block1 should be 1 chunk");
752 assert_eq!(stats1.blocks_stored, 1);
753
754 store.put(&block2).await.unwrap();
756 let stats2 = store.stats();
757 assert_eq!(
759 stats2.unique_chunks, 1,
760 "block2 is same as block1 (same CID)"
761 );
762 assert_eq!(stats2.blocks_stored, 1, "Still 1 block (same CID)");
763 assert_eq!(
764 stats2.duplicate_chunks_avoided, 0,
765 "No chunking happened for duplicate CID"
766 );
767
768 store.put(&block3).await.unwrap();
770 let stats3 = store.stats();
771 assert_eq!(stats3.unique_chunks, 2, "block3 adds a new unique chunk");
772 assert_eq!(stats3.blocks_stored, 2, "Now have 2 different blocks");
773
774 let retrieved1 = store.get(block1.cid()).await.unwrap().unwrap();
776 assert_eq!(retrieved1.data(), block1.data());
777
778 let retrieved3 = store.get(block3.cid()).await.unwrap().unwrap();
779 assert_eq!(retrieved3.data(), block3.data());
780
781 store.delete(block1.cid()).await.unwrap();
783 let stats_after_delete = store.stats();
784 assert_eq!(
785 stats_after_delete.unique_chunks, 1,
786 "Only block3's chunk remains"
787 );
788 assert_eq!(stats_after_delete.blocks_stored, 1);
789
790 store.delete(block3.cid()).await.unwrap();
792
793 let stats_final = store.stats();
794 assert_eq!(stats_final.unique_chunks, 0);
795 assert_eq!(stats_final.bytes_stored, 0);
796 assert_eq!(stats_final.blocks_stored, 0);
797 }
798}