1use crate::block::{Block, MAX_BLOCK_SIZE};
9use crate::cid::{Cid, CidBuilder, SerializableCid};
10use crate::error::{Error, Result};
11use crate::ipld::Ipld;
12use bytes::Bytes;
13use serde::{Deserialize, Serialize};
14use std::collections::BTreeMap;
15
16pub const DEFAULT_CHUNK_SIZE: usize = 256 * 1024;
18
19pub const MIN_CHUNK_SIZE: usize = 1024;
21
22pub const MAX_CHUNK_SIZE: usize = 1024 * 1024;
24
25pub const MAX_LINKS_PER_NODE: usize = 174;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
31pub enum ChunkingStrategy {
32 #[default]
34 FixedSize,
35 ContentDefined,
38}
39
40#[derive(Debug, Clone)]
42pub struct ChunkingConfig {
43 pub chunk_size: usize,
45 pub strategy: ChunkingStrategy,
47 pub max_links_per_node: usize,
49}
50
51impl Default for ChunkingConfig {
52 fn default() -> Self {
53 Self {
54 chunk_size: DEFAULT_CHUNK_SIZE,
55 strategy: ChunkingStrategy::FixedSize,
56 max_links_per_node: MAX_LINKS_PER_NODE,
57 }
58 }
59}
60
61impl ChunkingConfig {
62 pub fn builder() -> ChunkingConfigBuilder {
64 ChunkingConfigBuilder::new()
65 }
66
67 pub fn with_chunk_size(chunk_size: usize) -> Result<Self> {
69 if chunk_size < MIN_CHUNK_SIZE {
70 return Err(Error::InvalidInput(format!(
71 "Chunk size {} is below minimum {}",
72 chunk_size, MIN_CHUNK_SIZE
73 )));
74 }
75 if chunk_size > MAX_CHUNK_SIZE {
76 return Err(Error::InvalidInput(format!(
77 "Chunk size {} exceeds maximum {}",
78 chunk_size, MAX_CHUNK_SIZE
79 )));
80 }
81 Ok(Self {
82 chunk_size,
83 ..Default::default()
84 })
85 }
86
87 pub fn content_defined() -> Self {
89 Self {
90 chunk_size: DEFAULT_CHUNK_SIZE,
91 strategy: ChunkingStrategy::ContentDefined,
92 max_links_per_node: MAX_LINKS_PER_NODE,
93 }
94 }
95
96 pub fn content_defined_with_size(target_size: usize) -> Result<Self> {
98 if target_size < MIN_CHUNK_SIZE {
99 return Err(Error::InvalidInput(format!(
100 "Target chunk size {} is below minimum {}",
101 target_size, MIN_CHUNK_SIZE
102 )));
103 }
104 if target_size > MAX_CHUNK_SIZE {
105 return Err(Error::InvalidInput(format!(
106 "Target chunk size {} exceeds maximum {}",
107 target_size, MAX_CHUNK_SIZE
108 )));
109 }
110 Ok(Self {
111 chunk_size: target_size,
112 strategy: ChunkingStrategy::ContentDefined,
113 max_links_per_node: MAX_LINKS_PER_NODE,
114 })
115 }
116}
117
118#[derive(Debug, Clone, Default)]
133pub struct ChunkingConfigBuilder {
134 chunk_size: Option<usize>,
135 strategy: Option<ChunkingStrategy>,
136 max_links_per_node: Option<usize>,
137}
138
139impl ChunkingConfigBuilder {
140 pub fn new() -> Self {
142 Self::default()
143 }
144
145 pub fn chunk_size(mut self, size: usize) -> Self {
147 self.chunk_size = Some(size);
148 self
149 }
150
151 pub fn strategy(mut self, strategy: ChunkingStrategy) -> Self {
153 self.strategy = Some(strategy);
154 self
155 }
156
157 pub fn max_links_per_node(mut self, max_links: usize) -> Self {
159 self.max_links_per_node = Some(max_links);
160 self
161 }
162
163 pub fn build(self) -> Result<ChunkingConfig> {
169 let chunk_size = self.chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
170 let strategy = self.strategy.unwrap_or(ChunkingStrategy::FixedSize);
171 let max_links_per_node = self.max_links_per_node.unwrap_or(MAX_LINKS_PER_NODE);
172
173 if chunk_size < MIN_CHUNK_SIZE {
174 return Err(Error::InvalidInput(format!(
175 "Chunk size {} is below minimum {}",
176 chunk_size, MIN_CHUNK_SIZE
177 )));
178 }
179 if chunk_size > MAX_CHUNK_SIZE {
180 return Err(Error::InvalidInput(format!(
181 "Chunk size {} exceeds maximum {}",
182 chunk_size, MAX_CHUNK_SIZE
183 )));
184 }
185
186 Ok(ChunkingConfig {
187 chunk_size,
188 strategy,
189 max_links_per_node,
190 })
191 }
192}
193
194struct RabinChunker {
199 #[allow(dead_code)]
201 target_size: usize,
202 min_size: usize,
204 max_size: usize,
206 polynomial: u64,
208 window_size: usize,
210 mask: u64,
213}
214
215impl RabinChunker {
216 fn new(target_size: usize) -> Self {
218 const POLYNOMIAL: u64 = 0x3DA3358B4DC173;
220
221 const WINDOW_SIZE: usize = 48;
223
224 let mask_bits = (target_size as f64).log2().floor() as u32;
228 let mask = (1u64 << mask_bits) - 1;
229
230 Self {
231 target_size,
232 min_size: target_size / 4,
233 max_size: target_size * 4,
234 polynomial: POLYNOMIAL,
235 window_size: WINDOW_SIZE,
236 mask,
237 }
238 }
239
240 fn find_boundaries(&self, data: &[u8]) -> Vec<usize> {
243 if data.len() <= self.min_size {
244 return vec![data.len()];
245 }
246
247 let mut boundaries = Vec::new();
248 let mut hash: u64 = 0;
249 let mut window = vec![0u8; self.window_size];
250 let mut window_pos = 0;
251 let mut last_boundary = 0;
252
253 for (i, &byte) in data.iter().enumerate() {
254 let out_byte = window[window_pos];
256 window[window_pos] = byte;
257 window_pos = (window_pos + 1) % self.window_size;
258
259 hash = hash.rotate_left(1);
261 hash ^= self.out_table(out_byte);
262 hash ^= self.in_table(byte);
263
264 let chunk_size = i - last_boundary;
265
266 if chunk_size >= self.min_size {
268 if chunk_size >= self.max_size {
270 boundaries.push(i);
271 last_boundary = i;
272 hash = 0;
273 window.fill(0);
274 window_pos = 0;
275 }
276 else if (hash & self.mask) == 0 {
278 boundaries.push(i);
279 last_boundary = i;
280 }
281 }
282 }
283
284 if last_boundary < data.len() {
286 boundaries.push(data.len());
287 }
288
289 boundaries
290 }
291
292 #[inline]
294 fn out_table(&self, byte: u8) -> u64 {
295 let mut val = byte as u64;
298 for _ in 0..self.window_size {
299 val = val.rotate_left(1) ^ self.polynomial;
300 }
301 val
302 }
303
304 #[inline]
306 fn in_table(&self, byte: u8) -> u64 {
307 byte as u64
308 }
309}
310
311#[derive(Debug, Clone, Default)]
313pub struct DeduplicationStats {
314 pub total_chunks: usize,
316 pub unique_chunks: usize,
318 pub reused_chunks: usize,
320 pub space_savings_percent: f64,
322 pub total_data_size: u64,
324 pub deduplicated_size: u64,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct DagLink {
331 pub cid: SerializableCid,
333 pub size: u64,
335 #[serde(skip_serializing_if = "Option::is_none")]
337 pub name: Option<String>,
338}
339
340impl DagLink {
341 pub fn new(cid: Cid, size: u64) -> Self {
343 Self {
344 cid: SerializableCid(cid),
345 size,
346 name: None,
347 }
348 }
349
350 pub fn with_name(cid: Cid, size: u64, name: impl Into<String>) -> Self {
352 Self {
353 cid: SerializableCid(cid),
354 size,
355 name: Some(name.into()),
356 }
357 }
358}
359
360#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct DagNode {
363 pub links: Vec<DagLink>,
365 pub total_size: u64,
367 #[serde(skip_serializing_if = "Option::is_none")]
369 pub data: Option<Vec<u8>>,
370}
371
372impl DagNode {
373 pub fn leaf(data: Vec<u8>) -> Self {
375 let size = data.len() as u64;
376 Self {
377 links: Vec::new(),
378 total_size: size,
379 data: Some(data),
380 }
381 }
382
383 pub fn intermediate(links: Vec<DagLink>) -> Self {
385 let total_size = links.iter().map(|l| l.size).sum();
386 Self {
387 links,
388 total_size,
389 data: None,
390 }
391 }
392
393 #[must_use]
395 pub fn is_leaf(&self) -> bool {
396 self.links.is_empty()
397 }
398
399 pub fn link_count(&self) -> usize {
401 self.links.len()
402 }
403
404 pub fn to_ipld(&self) -> Ipld {
406 let mut map = BTreeMap::new();
407
408 let links: Vec<Ipld> = self
410 .links
411 .iter()
412 .map(|link| {
413 let mut link_map = BTreeMap::new();
414 link_map.insert("cid".to_string(), Ipld::Link(link.cid));
415 link_map.insert("size".to_string(), Ipld::Integer(link.size as i128));
416 if let Some(name) = &link.name {
417 link_map.insert("name".to_string(), Ipld::String(name.clone()));
418 }
419 Ipld::Map(link_map)
420 })
421 .collect();
422 map.insert("links".to_string(), Ipld::List(links));
423
424 map.insert(
426 "totalSize".to_string(),
427 Ipld::Integer(self.total_size as i128),
428 );
429
430 if let Some(data) = &self.data {
432 map.insert("data".to_string(), Ipld::Bytes(data.clone()));
433 }
434
435 Ipld::Map(map)
436 }
437
438 pub fn to_dag_cbor(&self) -> Result<Vec<u8>> {
440 self.to_ipld().to_dag_cbor()
441 }
442}
443
444#[derive(Debug, Clone)]
446pub struct ChunkedFile {
447 pub root_cid: Cid,
449 pub blocks: Vec<Block>,
451 pub total_size: u64,
453 pub chunk_count: usize,
455 pub dedup_stats: Option<DeduplicationStats>,
457}
458
459#[derive(Default)]
461pub struct Chunker {
462 config: ChunkingConfig,
463}
464
465impl Chunker {
466 pub fn new() -> Self {
468 Self::default()
469 }
470
471 pub fn with_config(config: ChunkingConfig) -> Self {
473 Self { config }
474 }
475
476 pub fn chunk(&self, data: &[u8]) -> Result<ChunkedFile> {
480 if data.is_empty() {
481 return Err(Error::InvalidInput("Cannot chunk empty data".to_string()));
482 }
483
484 if data.len() <= self.config.chunk_size {
486 let block = Block::new(Bytes::copy_from_slice(data))?;
487 let dedup_stats = DeduplicationStats {
488 total_chunks: 1,
489 unique_chunks: 1,
490 reused_chunks: 0,
491 space_savings_percent: 0.0,
492 total_data_size: data.len() as u64,
493 deduplicated_size: data.len() as u64,
494 };
495 return Ok(ChunkedFile {
496 root_cid: *block.cid(),
497 blocks: vec![block],
498 total_size: data.len() as u64,
499 chunk_count: 1,
500 dedup_stats: Some(dedup_stats),
501 });
502 }
503
504 let chunk_slices: Vec<&[u8]> = match self.config.strategy {
506 ChunkingStrategy::FixedSize => {
507 data.chunks(self.config.chunk_size).collect()
509 }
510 ChunkingStrategy::ContentDefined => {
511 let rabin = RabinChunker::new(self.config.chunk_size);
513 let boundaries = rabin.find_boundaries(data);
514 let mut chunks = Vec::with_capacity(boundaries.len());
515 let mut start = 0;
516 for &end in &boundaries {
517 chunks.push(&data[start..end]);
518 start = end;
519 }
520 chunks
521 }
522 };
523
524 let chunk_count = chunk_slices.len();
525
526 let mut leaf_blocks: Vec<Block> = Vec::with_capacity(chunk_slices.len());
528 let mut leaf_links: Vec<DagLink> = Vec::with_capacity(chunk_slices.len());
529 let mut seen_cids = std::collections::HashMap::new();
530
531 for chunk in chunk_slices {
532 let block = Block::new(Bytes::copy_from_slice(chunk))?;
533 let cid = *block.cid();
534
535 seen_cids.entry(cid).or_insert(chunk.len());
537
538 leaf_links.push(DagLink::new(cid, chunk.len() as u64));
539 leaf_blocks.push(block);
540 }
541
542 let total_data_size = data.len() as u64;
544 let deduplicated_size: u64 = seen_cids.values().map(|&size| size as u64).sum();
545 let reused_chunks = chunk_count.saturating_sub(seen_cids.len());
546 let space_savings_percent = if total_data_size > 0 {
547 ((total_data_size - deduplicated_size) as f64 / total_data_size as f64) * 100.0
548 } else {
549 0.0
550 };
551
552 let dedup_stats = DeduplicationStats {
553 total_chunks: chunk_count,
554 unique_chunks: seen_cids.len(),
555 reused_chunks,
556 space_savings_percent,
557 total_data_size,
558 deduplicated_size,
559 };
560
561 let mut all_blocks = leaf_blocks;
563 let mut current_links = leaf_links;
564
565 while current_links.len() > 1 {
566 let mut next_level_links = Vec::new();
567 let mut next_level_blocks = Vec::new();
568
569 for link_chunk in current_links.chunks(self.config.max_links_per_node) {
570 let node = DagNode::intermediate(link_chunk.to_vec());
571 let node_bytes = node.to_dag_cbor()?;
572
573 if node_bytes.len() > MAX_BLOCK_SIZE {
575 return Err(Error::Internal(
576 "DAG node too large, reduce max_links_per_node".to_string(),
577 ));
578 }
579
580 let block = Block::builder()
581 .codec(crate::cid::codec::DAG_CBOR)
582 .build(Bytes::from(node_bytes))?;
583
584 next_level_links.push(DagLink::new(*block.cid(), node.total_size));
585 next_level_blocks.push(block);
586 }
587
588 all_blocks.extend(next_level_blocks);
589 current_links = next_level_links;
590 }
591
592 let root_cid = current_links[0].cid.0;
594
595 Ok(ChunkedFile {
596 root_cid,
597 blocks: all_blocks,
598 total_size: data.len() as u64,
599 chunk_count,
600 dedup_stats: Some(dedup_stats),
601 })
602 }
603
604 #[must_use]
606 pub fn needs_chunking(&self, data_len: usize) -> bool {
607 data_len > self.config.chunk_size
608 }
609
610 pub fn estimate_chunk_count(&self, data_len: usize) -> usize {
612 if data_len == 0 {
613 return 0;
614 }
615 data_len.div_ceil(self.config.chunk_size)
616 }
617}
618
619pub struct DagBuilder {
621 config: ChunkingConfig,
622 #[allow(dead_code)]
623 cid_builder: CidBuilder,
624}
625
626impl Default for DagBuilder {
627 fn default() -> Self {
628 Self {
629 config: ChunkingConfig::default(),
630 cid_builder: CidBuilder::new(),
631 }
632 }
633}
634
635impl DagBuilder {
636 pub fn new() -> Self {
638 Self::default()
639 }
640
641 pub fn with_config(mut self, config: ChunkingConfig) -> Self {
643 self.config = config;
644 self
645 }
646
647 pub fn create_directory(&self, entries: Vec<(String, Cid, u64)>) -> Result<(DagNode, Block)> {
649 let links: Vec<DagLink> = entries
650 .into_iter()
651 .map(|(name, cid, size)| DagLink::with_name(cid, size, name))
652 .collect();
653
654 let node = DagNode::intermediate(links);
655 let node_bytes = node.to_dag_cbor()?;
656 let block = Block::builder()
657 .codec(crate::cid::codec::DAG_CBOR)
658 .build(Bytes::from(node_bytes))?;
659
660 Ok((node, block))
661 }
662
663 pub fn create_file(&self, data: &[u8]) -> Result<ChunkedFile> {
665 Chunker::with_config(self.config.clone()).chunk(data)
666 }
667}
668
669#[cfg(test)]
670mod tests {
671 use super::*;
672
673 #[test]
674 fn test_small_file_single_block() {
675 let chunker = Chunker::new();
676 let data = b"Hello, IPFS!";
677
678 let result = chunker.chunk(data).unwrap();
679
680 assert_eq!(result.chunk_count, 1);
681 assert_eq!(result.blocks.len(), 1);
682 assert_eq!(result.total_size, data.len() as u64);
683 }
684
685 #[test]
686 fn test_large_file_chunking() {
687 let config = ChunkingConfig::with_chunk_size(1024).unwrap();
688 let chunker = Chunker::with_config(config);
689
690 let data: Vec<u8> = (0..5000).map(|i| (i % 256) as u8).collect();
692 let result = chunker.chunk(&data).unwrap();
693
694 assert_eq!(result.chunk_count, 5); assert!(result.blocks.len() >= 5); assert_eq!(result.total_size, 5000);
697 }
698
699 #[test]
700 fn test_estimate_chunk_count() {
701 let config = ChunkingConfig::with_chunk_size(1024).unwrap();
702 let chunker = Chunker::with_config(config);
703
704 assert_eq!(chunker.estimate_chunk_count(0), 0);
705 assert_eq!(chunker.estimate_chunk_count(512), 1);
706 assert_eq!(chunker.estimate_chunk_count(1024), 1);
707 assert_eq!(chunker.estimate_chunk_count(1025), 2);
708 assert_eq!(chunker.estimate_chunk_count(3000), 3);
709 }
710
711 #[test]
712 fn test_needs_chunking() {
713 let config = ChunkingConfig::with_chunk_size(1024).unwrap();
714 let chunker = Chunker::with_config(config);
715
716 assert!(!chunker.needs_chunking(512));
717 assert!(!chunker.needs_chunking(1024));
718 assert!(chunker.needs_chunking(1025));
719 }
720
721 #[test]
722 fn test_dag_node_to_ipld() {
723 let cid = CidBuilder::new().build(b"test").unwrap();
724 let link = DagLink::with_name(cid, 100, "test.txt");
725 let node = DagNode::intermediate(vec![link]);
726
727 let ipld = node.to_ipld();
728 assert!(matches!(ipld, Ipld::Map(_)));
729
730 let cbor = node.to_dag_cbor().unwrap();
732 assert!(!cbor.is_empty());
733 }
734
735 #[test]
736 fn test_directory_creation() {
737 let builder = DagBuilder::new();
738 let cid1 = CidBuilder::new().build(b"file1").unwrap();
739 let cid2 = CidBuilder::new().build(b"file2").unwrap();
740
741 let entries = vec![
742 ("file1.txt".to_string(), cid1, 100),
743 ("file2.txt".to_string(), cid2, 200),
744 ];
745
746 let (node, block) = builder.create_directory(entries).unwrap();
747
748 assert_eq!(node.link_count(), 2);
749 assert_eq!(node.total_size, 300);
750 assert!(block.size() > 0);
751 }
752
753 #[test]
754 fn test_content_defined_chunking() {
755 let config = ChunkingConfig::content_defined();
756 let chunker = Chunker::with_config(config);
757
758 let mut data = Vec::with_capacity(1_000_000);
760 for i in 0..1_000_000 {
761 data.push((i % 256) as u8);
762 }
763
764 let result = chunker.chunk(&data).unwrap();
765
766 assert!(result.chunk_count > 0);
768 assert!(!result.blocks.is_empty());
769 assert_eq!(result.total_size, 1_000_000);
770
771 assert!(result.dedup_stats.is_some());
773 let stats = result.dedup_stats.unwrap();
774 assert_eq!(stats.total_chunks, result.chunk_count);
775 }
776
777 #[test]
778 fn test_cdc_deduplication() {
779 let config = ChunkingConfig::content_defined_with_size(4096).unwrap();
780 let chunker = Chunker::with_config(config);
781
782 let pattern = b"Hello, IPFS! This is a test pattern. ".repeat(100);
784 let mut data = pattern.clone();
785 data.extend_from_slice(&pattern); let result = chunker.chunk(&data).unwrap();
788
789 let stats = result.dedup_stats.unwrap();
791 assert!(stats.total_chunks > 0);
792
793 assert_eq!(
796 stats.total_chunks,
797 stats.unique_chunks + stats.reused_chunks
798 );
799 }
800
801 #[test]
802 fn test_cdc_deterministic() {
803 let config = ChunkingConfig::content_defined();
804 let chunker = Chunker::with_config(config);
805
806 let data: Vec<u8> = (0..100_000).map(|i| (i % 256) as u8).collect();
807
808 let result1 = chunker.chunk(&data).unwrap();
810 let result2 = chunker.chunk(&data).unwrap();
811
812 assert_eq!(result1.root_cid, result2.root_cid);
814 assert_eq!(result1.chunk_count, result2.chunk_count);
815 assert_eq!(result1.total_size, result2.total_size);
816 }
817
818 #[test]
819 fn test_cdc_vs_fixed_size() {
820 let data: Vec<u8> = (0..100_000).map(|i| (i % 256) as u8).collect();
821
822 let fixed_config = ChunkingConfig::with_chunk_size(8192).unwrap();
824 let fixed_chunker = Chunker::with_config(fixed_config);
825 let fixed_result = fixed_chunker.chunk(&data).unwrap();
826
827 let cdc_config = ChunkingConfig::content_defined_with_size(8192).unwrap();
829 let cdc_chunker = Chunker::with_config(cdc_config);
830 let cdc_result = cdc_chunker.chunk(&data).unwrap();
831
832 assert!(fixed_result.chunk_count > 0);
834 assert!(cdc_result.chunk_count > 0);
835
836 assert_eq!(fixed_result.chunk_count, 100_000 / 8192 + 1);
838
839 assert!(cdc_result.chunk_count >= 1);
843 assert!(cdc_result.chunk_count < 200); assert_eq!(fixed_result.total_size, cdc_result.total_size);
847 }
848
849 #[test]
850 fn test_rabin_chunker_boundaries() {
851 let rabin = RabinChunker::new(8192);
852 let data: Vec<u8> = (0..50_000).map(|i| (i % 256) as u8).collect();
853
854 let boundaries = rabin.find_boundaries(&data);
855
856 assert!(!boundaries.is_empty());
858
859 assert_eq!(*boundaries.last().unwrap(), data.len());
861
862 for &boundary in &boundaries {
864 assert!(boundary <= data.len());
865 }
866
867 for i in 1..boundaries.len() {
869 assert!(boundaries[i] > boundaries[i - 1]);
870 }
871 }
872
873 #[test]
874 fn test_rabin_min_max_chunk_size() {
875 let rabin = RabinChunker::new(8192);
876 let data: Vec<u8> = (0..500_000).map(|i| (i % 256) as u8).collect();
877
878 let boundaries = rabin.find_boundaries(&data);
879
880 let mut start = 0;
882 for &end in &boundaries {
883 let chunk_size = end - start;
884
885 if end < data.len() {
887 assert!(
888 chunk_size >= rabin.min_size,
889 "Chunk size {} is below min {}",
890 chunk_size,
891 rabin.min_size
892 );
893 assert!(
894 chunk_size <= rabin.max_size,
895 "Chunk size {} exceeds max {}",
896 chunk_size,
897 rabin.max_size
898 );
899 }
900
901 start = end;
902 }
903 }
904
905 #[test]
906 fn test_deduplication_stats_calculation() {
907 let config = ChunkingConfig::content_defined();
908 let chunker = Chunker::with_config(config);
909
910 let data: Vec<u8> = (0..50_000).map(|i| (i % 256) as u8).collect();
911 let result = chunker.chunk(&data).unwrap();
912
913 let stats = result.dedup_stats.unwrap();
914
915 assert_eq!(
917 stats.total_chunks,
918 stats.unique_chunks + stats.reused_chunks
919 );
920 assert_eq!(stats.total_data_size, 50_000);
921 assert!(stats.deduplicated_size <= stats.total_data_size);
922 assert!(stats.space_savings_percent >= 0.0);
923 assert!(stats.space_savings_percent <= 100.0);
924 }
925}