1use ipld_core::cid::Cid;
2use multihash::{self, Multihash};
3
4use crate::pb::{FlatUnixFs, PBLink, UnixFs, UnixFsType};
5use alloc::borrow::Cow;
6use core::fmt;
7use quick_protobuf::{MessageWrite, Writer};
8
9use sha2::{Digest, Sha256};
10
11#[derive(Default)]
20pub struct FileAdder {
21 chunker: Chunker,
22 collector: Collector,
23 block_buffer: Vec<u8>,
24 unflushed_links: Vec<Link>,
30}
31
32impl fmt::Debug for FileAdder {
33 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
34 write!(
35 fmt,
36 "FileAdder {{ chunker: {:?}, block_buffer: {}/{}, unflushed_links: {} }}",
37 self.chunker,
38 self.block_buffer.len(),
39 self.block_buffer.capacity(),
40 LinkFormatter(&self.unflushed_links),
41 )
42 }
43}
44
45struct LinkFormatter<'a>(&'a [Link]);
46
47impl fmt::Display for LinkFormatter<'_> {
48 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
49 let mut iter = self.0.iter().peekable();
50
51 write!(fmt, "[")?;
52
53 let mut current = match iter.peek() {
54 Some(Link { depth, .. }) => depth,
55 None => return write!(fmt, "]"),
56 };
57
58 let mut count = 0;
59
60 for Link {
61 depth: next_depth, ..
62 } in iter
63 {
64 if current == next_depth {
65 count += 1;
66 } else {
67 write!(fmt, "{current}: {count}/")?;
68
69 let steps_between = if current > next_depth {
70 current - next_depth
71 } else {
72 next_depth - current
73 };
74
75 for _ in 0..steps_between - 1 {
76 write!(fmt, "0/")?;
77 }
78 count = 1;
79 current = next_depth;
80 }
81 }
82
83 write!(fmt, "{current}: {count}]")
84 }
85}
86
87struct Link {
90 depth: usize,
93 target: Cid,
95 total_size: u64,
97 file_size: u64,
100}
101
102impl fmt::Debug for Link {
103 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
104 fmt.debug_struct("Link")
105 .field("depth", &self.depth)
106 .field("target", &format_args!("{}", self.target))
107 .field("total_size", &self.total_size)
108 .field("file_size", &self.file_size)
109 .finish()
110 }
111}
112
113#[derive(Default)]
115pub struct FileAdderBuilder {
116 chunker: Chunker,
117 collector: Collector,
118}
119
120impl FileAdderBuilder {
121 pub fn with_chunker(self, chunker: Chunker) -> Self {
123 FileAdderBuilder { chunker, ..self }
124 }
125
126 pub fn with_collector(self, collector: impl Into<Collector>) -> Self {
128 FileAdderBuilder {
129 collector: collector.into(),
130 ..self
131 }
132 }
133
134 pub fn build(self) -> FileAdder {
136 let FileAdderBuilder { chunker, collector } = self;
137
138 FileAdder {
139 chunker,
140 collector,
141 ..Default::default()
142 }
143 }
144}
145
146impl FileAdder {
147 pub fn builder() -> FileAdderBuilder {
149 FileAdderBuilder::default()
150 }
151
152 pub fn size_hint(&self) -> usize {
157 self.chunker.size_hint()
158 }
159
160 pub fn push(&mut self, input: &[u8]) -> (impl Iterator<Item = (Cid, Vec<u8>)>, usize) {
165 let (accepted, ready) = self.chunker.accept(input, &self.block_buffer);
166
167 if self.block_buffer.is_empty() && ready {
168 let leaf = Self::flush_buffered_leaf(accepted, &mut self.unflushed_links, false);
176 assert!(leaf.is_some(), "chunk completed, must produce a new block");
177 self.block_buffer.clear();
178 let links = self.flush_buffered_links(false);
179 (leaf.into_iter().chain(links), accepted.len())
180 } else {
181 if self.block_buffer.capacity() == 0 {
184 self.block_buffer.reserve(self.size_hint());
187 }
188
189 self.block_buffer.extend_from_slice(accepted);
190 let written = accepted.len();
191
192 let (leaf, links) = if !ready {
193 (None, Vec::new())
195 } else {
196 let leaf = Self::flush_buffered_leaf(
198 self.block_buffer.as_slice(),
199 &mut self.unflushed_links,
200 false,
201 );
202 assert!(leaf.is_some(), "chunk completed, must produce a new block");
203 self.block_buffer.clear();
204 let links = self.flush_buffered_links(false);
205
206 (leaf, links)
207 };
208 (leaf.into_iter().chain(links), written)
209 }
210 }
211
212 pub fn finish(mut self) -> impl Iterator<Item = (Cid, Vec<u8>)> {
219 let last_leaf =
220 Self::flush_buffered_leaf(&self.block_buffer, &mut self.unflushed_links, true);
221 let root_links = self.flush_buffered_links(true);
222 last_leaf.into_iter().chain(root_links)
224 }
225
226 fn flush_buffered_leaf(
229 input: &[u8],
230 unflushed_links: &mut Vec<Link>,
231 finishing: bool,
232 ) -> Option<(Cid, Vec<u8>)> {
233 if input.is_empty() && (!finishing || !unflushed_links.is_empty()) {
234 return None;
235 }
236
237 let data = if !input.is_empty() {
240 Some(Cow::Borrowed(input))
241 } else {
242 None
243 };
244
245 let filesize = Some(input.len() as u64);
246
247 let inner = FlatUnixFs {
248 links: Vec::new(),
249 data: UnixFs {
250 Type: UnixFsType::File,
251 Data: data,
252 filesize,
253 ..Default::default()
255 },
256 };
257
258 let (cid, vec) = render_and_hash(&inner);
259
260 let total_size = vec.len();
261
262 let link = Link {
263 depth: 0,
264 target: cid,
265 total_size: total_size as u64,
266 file_size: input.len() as u64,
267 };
268
269 unflushed_links.push(link);
270
271 Some((cid, vec))
272 }
273
274 fn flush_buffered_links(&mut self, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
275 self.collector
276 .flush_links(&mut self.unflushed_links, finishing)
277 }
278
279 #[cfg(test)]
284 fn collect_blocks(mut self, all_content: &[u8], mut amt: usize) -> Vec<(Cid, Vec<u8>)> {
285 let mut written = 0;
286 let mut blocks_received = Vec::new();
287
288 if amt == 0 {
289 amt = all_content.len();
290 }
291
292 while written < all_content.len() {
293 let end = written + (all_content.len() - written).min(amt);
294 let slice = &all_content[written..end];
295
296 let (blocks, pushed) = self.push(slice);
297 blocks_received.extend(blocks);
298 written += pushed;
299 }
300
301 let last_blocks = self.finish();
302 blocks_received.extend(last_blocks);
303
304 blocks_received
305 }
306}
307
308fn render_and_hash(flat: &FlatUnixFs<'_>) -> (Cid, Vec<u8>) {
309 let mut out = Vec::with_capacity(flat.get_size());
313 let mut writer = Writer::new(&mut out);
314 flat.write_message(&mut writer)
315 .expect("unsure how this could fail");
316 let mh = Multihash::wrap(
317 multihash_codetable::Code::Sha2_256.into(),
318 &Sha256::digest(&out),
319 )
320 .unwrap();
321 let cid = Cid::new_v0(mh).expect("sha2_256 is the correct multihash for cidv0");
322 (cid, out)
323}
324
325#[derive(Debug, Clone, Copy)]
327pub enum Chunker {
328 Size(usize),
330}
331
332impl Default for Chunker {
333 fn default() -> Self {
335 Chunker::Size(256 * 1024)
336 }
337}
338
339impl Chunker {
340 fn accept<'a>(&mut self, input: &'a [u8], buffered: &[u8]) -> (&'a [u8], bool) {
341 use Chunker::*;
342
343 match self {
344 Size(max) => {
345 let l = input.len().min(*max - buffered.len());
346 let accepted = &input[..l];
347 let ready = buffered.len() + l >= *max;
348 (accepted, ready)
349 }
350 }
351 }
352
353 fn size_hint(&self) -> usize {
354 use Chunker::*;
355
356 match self {
357 Size(max) => *max,
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
367pub enum Collector {
368 Balanced(BalancedCollector),
370}
371
372impl Default for Collector {
373 fn default() -> Self {
374 Collector::Balanced(Default::default())
375 }
376}
377
378impl Collector {
379 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
380 use Collector::*;
381
382 match self {
383 Balanced(bc) => bc.flush_links(pending, finishing),
384 }
385 }
386}
387
388#[derive(Clone)]
391pub struct BalancedCollector {
392 branching_factor: usize,
393 reused_links: Vec<PBLink<'static>>,
395 reused_blocksizes: Vec<u64>,
397}
398
399impl fmt::Debug for BalancedCollector {
400 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
401 write!(
402 fmt,
403 "BalancedCollector {{ branching_factor: {} }}",
404 self.branching_factor
405 )
406 }
407}
408
409impl Default for BalancedCollector {
410 fn default() -> Self {
418 Self::with_branching_factor(174)
419 }
420}
421
422impl From<BalancedCollector> for Collector {
423 fn from(b: BalancedCollector) -> Self {
424 Collector::Balanced(b)
425 }
426}
427
428impl BalancedCollector {
429 pub fn with_branching_factor(branching_factor: usize) -> Self {
431 assert!(branching_factor > 0);
432
433 Self {
434 branching_factor,
435 reused_links: Vec::new(),
436 reused_blocksizes: Vec::new(),
437 }
438 }
439
440 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
444 let mut ret = Vec::new();
491
492 let mut reused_links = core::mem::take(&mut self.reused_links);
493 let mut reused_blocksizes = core::mem::take(&mut self.reused_blocksizes);
494
495 if let Some(need) = self.branching_factor.checked_sub(reused_links.capacity()) {
496 reused_links.reserve(need);
497 }
498
499 if let Some(need) = self
500 .branching_factor
501 .checked_sub(reused_blocksizes.capacity())
502 {
503 reused_blocksizes.reserve(need);
504 }
505
506 'outer: for level in 0.. {
507 if pending.len() == 1 && finishing
508 || pending.len() <= self.branching_factor && !finishing
509 {
510 break;
516 }
517
518 let mut starting_point = 0;
522
523 let mut last_overwrite = None;
527
528 while let Some(mut first_at) = &pending[starting_point..]
529 .iter()
530 .position(|Link { depth, .. }| depth == &level)
531 {
532 first_at += starting_point;
535
536 if !finishing && pending[first_at..].len() <= self.branching_factor {
537 if let Some(last_overwrite) = last_overwrite {
538 pending.drain((last_overwrite + 1)..first_at);
540 }
541 break 'outer;
542 }
543
544 reused_links.clear();
545 reused_blocksizes.clear();
546
547 let mut nested_size = 0;
548 let mut nested_total_size = 0;
549
550 let last = (first_at + self.branching_factor).min(pending.len());
551
552 for (index, link) in pending[first_at..last].iter().enumerate() {
553 assert_eq!(
554 link.depth,
555 level,
556 "unexpected link depth {} when searching at level {} index {}",
557 link.depth,
558 level,
559 index + first_at
560 );
561
562 Self::partition_link(
563 link,
564 &mut reused_links,
565 &mut reused_blocksizes,
566 &mut nested_size,
567 &mut nested_total_size,
568 );
569 }
570
571 debug_assert_eq!(reused_links.len(), reused_blocksizes.len());
572
573 let inner = FlatUnixFs {
574 links: reused_links,
575 data: UnixFs {
576 Type: UnixFsType::File,
577 filesize: Some(nested_size),
578 blocksizes: reused_blocksizes,
579 ..Default::default()
580 },
581 };
582
583 let (cid, vec) = render_and_hash(&inner);
584
585 let index = last_overwrite.map(|i| i + 1).unwrap_or(first_at);
588 pending[index] = Link {
589 depth: level + 1,
590 target: cid,
591 total_size: nested_total_size + vec.len() as u64,
592 file_size: nested_size,
593 };
594
595 ret.push((cid, vec));
596
597 reused_links = inner.links;
598 reused_blocksizes = inner.data.blocksizes;
599
600 starting_point = last;
601 last_overwrite = Some(index);
602 }
603
604 if let Some(last_overwrite) = last_overwrite {
605 pending.truncate(last_overwrite + 1);
606 }
607
608 debug_assert_eq!(
611 pending.iter().position(|l| l.depth == level),
612 None,
613 "should have no more of depth {}: {}",
614 level,
615 LinkFormatter(pending.as_slice())
616 );
617 }
618
619 self.reused_links = reused_links;
620 self.reused_blocksizes = reused_blocksizes;
621
622 ret
623 }
624
625 fn partition_link(
628 link: &Link,
629 links: &mut Vec<PBLink<'static>>,
630 blocksizes: &mut Vec<u64>,
631 nested_size: &mut u64,
632 nested_total_size: &mut u64,
633 ) {
634 links.push(PBLink {
635 Hash: Some(link.target.to_bytes().into()),
636 Name: Some("".into()),
637 Tsize: Some(link.total_size),
638 });
639 blocksizes.push(link.file_size);
640 *nested_size += link.file_size;
641 *nested_total_size += link.total_size;
642 }
643}
644
645#[cfg(test)]
646mod tests {
647
648 use super::{BalancedCollector, Chunker, FileAdder};
649 use crate::test_support::FakeBlockstore;
650 use core::convert::TryFrom;
651 use hex_literal::hex;
652 use ipld_core::cid::Cid;
653
654 #[test]
655 fn test_size_chunker() {
656 assert_eq!(size_chunker_scenario(1, 4, 0), (1, true));
657 assert_eq!(size_chunker_scenario(2, 4, 0), (2, true));
658 assert_eq!(size_chunker_scenario(2, 1, 0), (1, false));
659 assert_eq!(size_chunker_scenario(2, 1, 1), (1, true));
660 assert_eq!(size_chunker_scenario(32, 3, 29), (3, true));
661 assert_eq!(size_chunker_scenario(32, 4, 29), (3, true));
663 }
664
665 fn size_chunker_scenario(max: usize, input_len: usize, existing_len: usize) -> (usize, bool) {
666 let input = vec![0; input_len];
667 let existing = vec![0; existing_len];
668
669 let (accepted, ready) = Chunker::Size(max).accept(&input, &existing);
670 (accepted.len(), ready)
671 }
672
673 #[test]
674 fn favourite_single_block_file() {
675 let blocks = FakeBlockstore::with_fixtures();
676 let content = b"foobar\n";
678
679 let mut adder = FileAdder::default();
680
681 {
682 let (mut ready_blocks, bytes) = adder.push(content);
683 assert!(ready_blocks.next().is_none());
684 assert_eq!(bytes, content.len());
685 }
686
687 let (_, file_block) = adder
690 .finish()
691 .next()
692 .expect("there must have been the root block");
693
694 assert_eq!(
695 blocks.get_by_str("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"),
696 file_block.as_slice()
697 );
698 }
699
700 #[test]
701 fn favourite_multi_block_file() {
702 let blocks = FakeBlockstore::with_fixtures();
705 let content = b"foobar\n";
706 let adder = FileAdder::builder().with_chunker(Chunker::Size(2)).build();
707
708 let blocks_received = adder.collect_blocks(content, 0);
709
710 let expected = [
714 "QmfVyMoStzTvdnUR7Uotzh82gmL427q9z3xW5Y8fUoszi4",
715 "QmdPyW4CWE3QBkgjWfjM5f7Tjb3HukxVuBXZtkqAGwsMnm",
716 "QmNhDQpphvMWhdCzP74taRzXDaEfPGq8vWfFRzD7mEgePM",
717 "Qmc5m94Gu7z62RC8waSKkZUrCCBJPyHbkpmGzEePxy2oXJ",
718 "QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6",
719 ]
720 .iter()
721 .map(|key| {
722 let cid = Cid::try_from(*key).unwrap();
723 let block = blocks.get_by_str(key).to_vec();
724 (cid, block)
725 })
726 .collect::<Vec<_>>();
727
728 assert_eq!(blocks_received, expected);
729 }
730
731 #[test]
732 fn three_layers() {
733 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
734 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
735 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
736
737 assert!(content.len() > 174 && content.len() < 2 * 174);
738
739 let adder = FileAdder::builder().with_chunker(Chunker::Size(1)).build();
747
748 let blocks_received = adder.collect_blocks(content, 0);
749
750 assert_eq!(blocks_received.len(), 240);
751
752 assert_eq!(
753 blocks_received.last().unwrap().0.to_string(),
754 "QmRQ6NZNUs4JrCT2y7tmCC1wUhjqYuTssB8VXbbN3rMffg"
755 );
756 }
757
758 #[test]
759 fn three_layers_all_subchunks() {
760 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
761 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
762 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
763
764 for amt in 1..32 {
765 let adder = FileAdder::builder().with_chunker(Chunker::Size(32)).build();
766 let blocks_received = adder.collect_blocks(content, amt);
767 assert_eq!(
768 blocks_received.last().unwrap().0.to_string(),
769 "QmYSLcVQqxKygiq7x9w1XGYxU29EShB8ZemiaQ8GAAw17h",
770 "amt: {amt}"
771 );
772 }
773 }
774
775 #[test]
776 fn empty_file() {
777 let blocks = FileAdder::default().collect_blocks(b"", 0);
778 assert_eq!(blocks.len(), 1);
779 assert_eq!(blocks[0].1.as_slice(), &hex!("0a 04 08 02 18 00"));
786 assert_eq!(
787 blocks[0].0.to_string(),
788 "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
789 );
790 }
791
792 #[test]
793 fn full_link_block_and_a_byte() {
794 let buf = vec![0u8; 2];
795
796 let branching_factor = 174;
804
805 let mut adder = FileAdder::builder()
806 .with_chunker(Chunker::Size(2))
807 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
808 .build();
809 let mut blocks_count = 0;
810
811 for _ in 0..branching_factor {
812 let (blocks, written) = adder.push(buf.as_slice());
813 assert_eq!(written, buf.len());
814
815 blocks_count += blocks.count();
816 }
817
818 let (blocks, written) = adder.push(&buf[0..1]);
819 assert_eq!(written, 1);
820 blocks_count += blocks.count();
821
822 let last_blocks = adder.finish().collect::<Vec<_>>();
823 blocks_count += last_blocks.len();
824
825 assert_eq!(blocks_count, branching_factor + 1 + 1 + 1 + 1);
831
832 assert_eq!(
833 last_blocks.last().unwrap().0.to_string(),
834 "QmcHNWF1d56uCDSfJPA7t9fadZRV9we5HGSTGSmwuqmMP9"
835 );
836 }
837
838 #[test]
839 fn full_link_block() {
840 let buf = vec![0u8; 1];
841
842 let branching_factor = 174;
843
844 let mut adder = FileAdder::builder()
845 .with_chunker(Chunker::Size(1))
846 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
847 .build();
848 let mut blocks_count = 0;
849
850 for _ in 0..branching_factor {
851 let (blocks, written) = adder.push(buf.as_slice());
852 assert_eq!(written, buf.len());
853
854 blocks_count += blocks.count();
855 }
856
857 let mut last_blocks = adder.finish();
858
859 let last_block = last_blocks.next().expect("must not have flushed yet");
862 blocks_count += 1;
863
864 assert_eq!(last_blocks.next(), None);
865
866 assert_eq!(
867 last_block.0.to_string(),
868 "QmdgQac8c6Bo3MP5bHAg2yQ25KebFUsmkZFvyByYzf8UCB"
869 );
870
871 assert_eq!(blocks_count, 175);
872 }
873}