1use cid::Cid;
2
3use crate::pb::{FlatUnixFs, PBLink, UnixFs, UnixFsType};
4use alloc::borrow::Cow;
5use core::fmt;
6use quick_protobuf::{MessageWrite, Writer};
7
8use sha2::{Digest, Sha256};
9
10#[derive(Default)]
19pub struct FileAdder {
20 chunker: Chunker,
21 collector: Collector,
22 block_buffer: Vec<u8>,
23 unflushed_links: Vec<Link>,
29}
30
31impl fmt::Debug for FileAdder {
32 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
33 write!(
34 fmt,
35 "FileAdder {{ chunker: {:?}, block_buffer: {}/{}, unflushed_links: {} }}",
36 self.chunker,
37 self.block_buffer.len(),
38 self.block_buffer.capacity(),
39 LinkFormatter(&self.unflushed_links),
40 )
41 }
42}
43
44struct LinkFormatter<'a>(&'a [Link]);
45
46impl fmt::Display for LinkFormatter<'_> {
47 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
48 let mut iter = self.0.iter().peekable();
49
50 write!(fmt, "[")?;
51
52 let mut current = match iter.peek() {
53 Some(Link { depth, .. }) => depth,
54 None => return write!(fmt, "]"),
55 };
56
57 let mut count = 0;
58
59 for Link {
60 depth: next_depth, ..
61 } in iter
62 {
63 if current == next_depth {
64 count += 1;
65 } else {
66 write!(fmt, "{}: {}/", current, count)?;
67
68 let steps_between = if current > next_depth {
69 current - next_depth
70 } else {
71 next_depth - current
72 };
73
74 for _ in 0..steps_between - 1 {
75 write!(fmt, "0/")?;
76 }
77 count = 1;
78 current = next_depth;
79 }
80 }
81
82 write!(fmt, "{}: {}]", current, count)
83 }
84}
85
86struct Link {
89 depth: usize,
92 target: Cid,
94 total_size: u64,
96 file_size: u64,
99}
100
101impl fmt::Debug for Link {
102 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
103 fmt.debug_struct("Link")
104 .field("depth", &self.depth)
105 .field("target", &format_args!("{}", self.target))
106 .field("total_size", &self.total_size)
107 .field("file_size", &self.file_size)
108 .finish()
109 }
110}
111
112#[derive(Default)]
114pub struct FileAdderBuilder {
115 chunker: Chunker,
116 collector: Collector,
117}
118
119impl FileAdderBuilder {
120 pub fn with_chunker(self, chunker: Chunker) -> Self {
122 FileAdderBuilder { chunker, ..self }
123 }
124
125 pub fn with_collector(self, collector: impl Into<Collector>) -> Self {
127 FileAdderBuilder {
128 collector: collector.into(),
129 ..self
130 }
131 }
132
133 pub fn build(self) -> FileAdder {
135 let FileAdderBuilder { chunker, collector } = self;
136
137 FileAdder {
138 chunker,
139 collector,
140 ..Default::default()
141 }
142 }
143}
144
145impl FileAdder {
146 pub fn builder() -> FileAdderBuilder {
148 FileAdderBuilder::default()
149 }
150
151 pub fn size_hint(&self) -> usize {
156 self.chunker.size_hint()
157 }
158
159 pub fn push(&mut self, input: &[u8]) -> (impl Iterator<Item = (Cid, Vec<u8>)>, usize) {
164 let (accepted, ready) = self.chunker.accept(input, &self.block_buffer);
165
166 if self.block_buffer.is_empty() && ready {
167 let leaf = Self::flush_buffered_leaf(accepted, &mut self.unflushed_links, false);
175 assert!(leaf.is_some(), "chunk completed, must produce a new block");
176 self.block_buffer.clear();
177 let links = self.flush_buffered_links(false);
178 (leaf.into_iter().chain(links.into_iter()), accepted.len())
179 } else {
180 if self.block_buffer.capacity() == 0 {
183 self.block_buffer.reserve(self.size_hint());
186 }
187
188 self.block_buffer.extend_from_slice(accepted);
189 let written = accepted.len();
190
191 let (leaf, links) = if !ready {
192 (None, Vec::new())
194 } else {
195 let leaf = Self::flush_buffered_leaf(
197 self.block_buffer.as_slice(),
198 &mut self.unflushed_links,
199 false,
200 );
201 assert!(leaf.is_some(), "chunk completed, must produce a new block");
202 self.block_buffer.clear();
203 let links = self.flush_buffered_links(false);
204
205 (leaf, links)
206 };
207 (leaf.into_iter().chain(links.into_iter()), written)
208 }
209 }
210
211 pub fn finish(mut self) -> impl Iterator<Item = (Cid, Vec<u8>)> {
218 let last_leaf = Self::flush_buffered_leaf(
219 &self.block_buffer.as_slice(),
220 &mut self.unflushed_links,
221 true,
222 );
223 let root_links = self.flush_buffered_links(true);
224 last_leaf.into_iter().chain(root_links.into_iter())
226 }
227
228 fn flush_buffered_leaf(
231 input: &[u8],
232 unflushed_links: &mut Vec<Link>,
233 finishing: bool,
234 ) -> Option<(Cid, Vec<u8>)> {
235 if input.is_empty() && (!finishing || !unflushed_links.is_empty()) {
236 return None;
237 }
238
239 let data = if !input.is_empty() {
242 Some(Cow::Borrowed(input))
243 } else {
244 None
245 };
246
247 let filesize = Some(input.len() as u64);
248
249 let inner = FlatUnixFs {
250 links: Vec::new(),
251 data: UnixFs {
252 Type: UnixFsType::File,
253 Data: data,
254 filesize,
255 ..Default::default()
257 },
258 };
259
260 let (cid, vec) = render_and_hash(&inner);
261
262 let total_size = vec.len();
263
264 let link = Link {
265 depth: 0,
266 target: cid.clone(),
267 total_size: total_size as u64,
268 file_size: input.len() as u64,
269 };
270
271 unflushed_links.push(link);
272
273 Some((cid, vec))
274 }
275
276 fn flush_buffered_links(&mut self, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
277 self.collector
278 .flush_links(&mut self.unflushed_links, finishing)
279 }
280
281 #[cfg(test)]
286 fn collect_blocks(mut self, all_content: &[u8], mut amt: usize) -> Vec<(Cid, Vec<u8>)> {
287 let mut written = 0;
288 let mut blocks_received = Vec::new();
289
290 if amt == 0 {
291 amt = all_content.len();
292 }
293
294 while written < all_content.len() {
295 let end = written + (all_content.len() - written).min(amt);
296 let slice = &all_content[written..end];
297
298 let (blocks, pushed) = self.push(slice);
299 blocks_received.extend(blocks);
300 written += pushed;
301 }
302
303 let last_blocks = self.finish();
304 blocks_received.extend(last_blocks);
305
306 blocks_received
307 }
308}
309
310fn render_and_hash(flat: &FlatUnixFs<'_>) -> (Cid, Vec<u8>) {
311 let mut out = Vec::with_capacity(flat.get_size());
315 let mut writer = Writer::new(&mut out);
316 flat.write_message(&mut writer)
317 .expect("unsure how this could fail");
318 let mh = multihash::wrap(multihash::Code::Sha2_256, &Sha256::digest(&out));
319 let cid = Cid::new_v0(mh).expect("sha2_256 is the correct multihash for cidv0");
320 (cid, out)
321}
322
323#[derive(Debug, Clone)]
325pub enum Chunker {
326 Size(usize),
328}
329
330impl Default for Chunker {
331 fn default() -> Self {
333 Chunker::Size(256 * 1024)
334 }
335}
336
337impl Chunker {
338 fn accept<'a>(&mut self, input: &'a [u8], buffered: &[u8]) -> (&'a [u8], bool) {
339 use Chunker::*;
340
341 match self {
342 Size(max) => {
343 let l = input.len().min(*max - buffered.len());
344 let accepted = &input[..l];
345 let ready = buffered.len() + l >= *max;
346 (accepted, ready)
347 }
348 }
349 }
350
351 fn size_hint(&self) -> usize {
352 use Chunker::*;
353
354 match self {
355 Size(max) => *max,
356 }
357 }
358}
359
360#[derive(Debug, Clone)]
365pub enum Collector {
366 Balanced(BalancedCollector),
368}
369
370impl Default for Collector {
371 fn default() -> Self {
372 Collector::Balanced(Default::default())
373 }
374}
375
376impl Collector {
377 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
378 use Collector::*;
379
380 match self {
381 Balanced(bc) => bc.flush_links(pending, finishing),
382 }
383 }
384}
385
386#[derive(Clone)]
389pub struct BalancedCollector {
390 branching_factor: usize,
391 reused_links: Vec<PBLink<'static>>,
393 reused_blocksizes: Vec<u64>,
395}
396
397impl fmt::Debug for BalancedCollector {
398 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
399 write!(
400 fmt,
401 "BalancedCollector {{ branching_factor: {} }}",
402 self.branching_factor
403 )
404 }
405}
406
407impl Default for BalancedCollector {
408 fn default() -> Self {
416 Self::with_branching_factor(174)
417 }
418}
419
420impl From<BalancedCollector> for Collector {
421 fn from(b: BalancedCollector) -> Self {
422 Collector::Balanced(b)
423 }
424}
425
426impl BalancedCollector {
427 pub fn with_branching_factor(branching_factor: usize) -> Self {
429 assert!(branching_factor > 0);
430
431 Self {
432 branching_factor,
433 reused_links: Vec::new(),
434 reused_blocksizes: Vec::new(),
435 }
436 }
437
438 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
442 let mut ret = Vec::new();
489
490 let mut reused_links = core::mem::take(&mut self.reused_links);
491 let mut reused_blocksizes = core::mem::take(&mut self.reused_blocksizes);
492
493 if let Some(need) = self.branching_factor.checked_sub(reused_links.capacity()) {
494 reused_links.reserve(need);
495 }
496
497 if let Some(need) = self
498 .branching_factor
499 .checked_sub(reused_blocksizes.capacity())
500 {
501 reused_blocksizes.reserve(need);
502 }
503
504 'outer: for level in 0.. {
505 if pending.len() == 1 && finishing
506 || pending.len() <= self.branching_factor && !finishing
507 {
508 break;
514 }
515
516 let mut starting_point = 0;
520
521 let mut last_overwrite = None;
525
526 while let Some(mut first_at) = &pending[starting_point..]
527 .iter()
528 .position(|Link { depth, .. }| depth == &level)
529 {
530 first_at += starting_point;
533
534 if !finishing && pending[first_at..].len() <= self.branching_factor {
535 if let Some(last_overwrite) = last_overwrite {
536 pending.drain((last_overwrite + 1)..first_at);
538 }
539 break 'outer;
540 }
541
542 reused_links.clear();
543 reused_blocksizes.clear();
544
545 let mut nested_size = 0;
546 let mut nested_total_size = 0;
547
548 let last = (first_at + self.branching_factor).min(pending.len());
549
550 for (index, link) in pending[first_at..last].iter().enumerate() {
551 assert_eq!(
552 link.depth,
553 level,
554 "unexpected link depth {} when searching at level {} index {}",
555 link.depth,
556 level,
557 index + first_at
558 );
559
560 Self::partition_link(
561 link,
562 &mut reused_links,
563 &mut reused_blocksizes,
564 &mut nested_size,
565 &mut nested_total_size,
566 );
567 }
568
569 debug_assert_eq!(reused_links.len(), reused_blocksizes.len());
570
571 let inner = FlatUnixFs {
572 links: reused_links,
573 data: UnixFs {
574 Type: UnixFsType::File,
575 filesize: Some(nested_size),
576 blocksizes: reused_blocksizes,
577 ..Default::default()
578 },
579 };
580
581 let (cid, vec) = render_and_hash(&inner);
582
583 let index = last_overwrite.map(|i| i + 1).unwrap_or(first_at);
586 pending[index] = Link {
587 depth: level + 1,
588 target: cid.clone(),
589 total_size: nested_total_size + vec.len() as u64,
590 file_size: nested_size,
591 };
592
593 ret.push((cid, vec));
594
595 reused_links = inner.links;
596 reused_blocksizes = inner.data.blocksizes;
597
598 starting_point = last;
599 last_overwrite = Some(index);
600 }
601
602 if let Some(last_overwrite) = last_overwrite {
603 pending.truncate(last_overwrite + 1);
604 }
605
606 debug_assert_eq!(
609 pending.iter().position(|l| l.depth == level),
610 None,
611 "should have no more of depth {}: {}",
612 level,
613 LinkFormatter(pending.as_slice())
614 );
615 }
616
617 self.reused_links = reused_links;
618 self.reused_blocksizes = reused_blocksizes;
619
620 ret
621 }
622
623 fn partition_link(
626 link: &Link,
627 links: &mut Vec<PBLink<'static>>,
628 blocksizes: &mut Vec<u64>,
629 nested_size: &mut u64,
630 nested_total_size: &mut u64,
631 ) {
632 links.push(PBLink {
633 Hash: Some(link.target.to_bytes().into()),
634 Name: Some("".into()),
635 Tsize: Some(link.total_size),
636 });
637 blocksizes.push(link.file_size);
638 *nested_size += link.file_size;
639 *nested_total_size += link.total_size;
640 }
641}
642
643#[cfg(test)]
644mod tests {
645
646 use super::{BalancedCollector, Chunker, FileAdder};
647 use crate::test_support::FakeBlockstore;
648 use cid::Cid;
649 use core::convert::TryFrom;
650 use hex_literal::hex;
651
652 #[test]
653 fn test_size_chunker() {
654 assert_eq!(size_chunker_scenario(1, 4, 0), (1, true));
655 assert_eq!(size_chunker_scenario(2, 4, 0), (2, true));
656 assert_eq!(size_chunker_scenario(2, 1, 0), (1, false));
657 assert_eq!(size_chunker_scenario(2, 1, 1), (1, true));
658 assert_eq!(size_chunker_scenario(32, 3, 29), (3, true));
659 assert_eq!(size_chunker_scenario(32, 4, 29), (3, true));
661 }
662
663 fn size_chunker_scenario(max: usize, input_len: usize, existing_len: usize) -> (usize, bool) {
664 let input = vec![0; input_len];
665 let existing = vec![0; existing_len];
666
667 let (accepted, ready) = Chunker::Size(max).accept(&input, &existing);
668 (accepted.len(), ready)
669 }
670
671 #[test]
672 fn favourite_single_block_file() {
673 let blocks = FakeBlockstore::with_fixtures();
674 let content = b"foobar\n";
676
677 let mut adder = FileAdder::default();
678
679 {
680 let (mut ready_blocks, bytes) = adder.push(content);
681 assert!(ready_blocks.next().is_none());
682 assert_eq!(bytes, content.len());
683 }
684
685 let (_, file_block) = adder
688 .finish()
689 .next()
690 .expect("there must have been the root block");
691
692 assert_eq!(
693 blocks.get_by_str("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"),
694 file_block.as_slice()
695 );
696 }
697
698 #[test]
699 fn favourite_multi_block_file() {
700 let blocks = FakeBlockstore::with_fixtures();
703 let content = b"foobar\n";
704 let adder = FileAdder::builder().with_chunker(Chunker::Size(2)).build();
705
706 let blocks_received = adder.collect_blocks(content, 0);
707
708 let expected = [
712 "QmfVyMoStzTvdnUR7Uotzh82gmL427q9z3xW5Y8fUoszi4",
713 "QmdPyW4CWE3QBkgjWfjM5f7Tjb3HukxVuBXZtkqAGwsMnm",
714 "QmNhDQpphvMWhdCzP74taRzXDaEfPGq8vWfFRzD7mEgePM",
715 "Qmc5m94Gu7z62RC8waSKkZUrCCBJPyHbkpmGzEePxy2oXJ",
716 "QmRJHYTNvC3hmd9gJQARxLR1QMEincccBV53bBw524yyq6",
717 ]
718 .iter()
719 .map(|key| {
720 let cid = Cid::try_from(*key).unwrap();
721 let block = blocks.get_by_str(key).to_vec();
722 (cid, block)
723 })
724 .collect::<Vec<_>>();
725
726 assert_eq!(blocks_received, expected);
727 }
728
729 #[test]
730 fn three_layers() {
731 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
732 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
733 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
734
735 assert!(content.len() > 174 && content.len() < 2 * 174);
736
737 let adder = FileAdder::builder().with_chunker(Chunker::Size(1)).build();
745
746 let blocks_received = adder.collect_blocks(content, 0);
747
748 assert_eq!(blocks_received.len(), 240);
749
750 assert_eq!(
751 blocks_received.last().unwrap().0.to_string(),
752 "QmRQ6NZNUs4JrCT2y7tmCC1wUhjqYuTssB8VXbbN3rMffg"
753 );
754 }
755
756 #[test]
757 fn three_layers_all_subchunks() {
758 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
759 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
760 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
761
762 for amt in 1..32 {
763 let adder = FileAdder::builder().with_chunker(Chunker::Size(32)).build();
764 let blocks_received = adder.collect_blocks(content, amt);
765 assert_eq!(
766 blocks_received.last().unwrap().0.to_string(),
767 "QmYSLcVQqxKygiq7x9w1XGYxU29EShB8ZemiaQ8GAAw17h",
768 "amt: {}",
769 amt
770 );
771 }
772 }
773
774 #[test]
775 fn empty_file() {
776 let blocks = FileAdder::default().collect_blocks(b"", 0);
777 assert_eq!(blocks.len(), 1);
778 assert_eq!(blocks[0].1.as_slice(), &hex!("0a 04 08 02 18 00"));
785 assert_eq!(
786 blocks[0].0.to_string(),
787 "QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH"
788 );
789 }
790
791 #[test]
792 fn full_link_block_and_a_byte() {
793 let buf = vec![0u8; 2];
794
795 let branching_factor = 174;
803
804 let mut adder = FileAdder::builder()
805 .with_chunker(Chunker::Size(2))
806 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
807 .build();
808 let mut blocks_count = 0;
809
810 for _ in 0..branching_factor {
811 let (blocks, written) = adder.push(buf.as_slice());
812 assert_eq!(written, buf.len());
813
814 blocks_count += blocks.count();
815 }
816
817 let (blocks, written) = adder.push(&buf[0..1]);
818 assert_eq!(written, 1);
819 blocks_count += blocks.count();
820
821 let last_blocks = adder.finish().collect::<Vec<_>>();
822 blocks_count += last_blocks.len();
823
824 assert_eq!(blocks_count, branching_factor + 1 + 1 + 1 + 1);
830
831 assert_eq!(
832 last_blocks.last().unwrap().0.to_string(),
833 "QmcHNWF1d56uCDSfJPA7t9fadZRV9we5HGSTGSmwuqmMP9"
834 );
835 }
836
837 #[test]
838 fn full_link_block() {
839 let buf = vec![0u8; 1];
840
841 let branching_factor = 174;
842
843 let mut adder = FileAdder::builder()
844 .with_chunker(Chunker::Size(1))
845 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
846 .build();
847 let mut blocks_count = 0;
848
849 for _ in 0..branching_factor {
850 let (blocks, written) = adder.push(buf.as_slice());
851 assert_eq!(written, buf.len());
852
853 blocks_count += blocks.count();
854 }
855
856 let mut last_blocks = adder.finish();
857
858 let last_block = last_blocks.next().expect("must not have flushed yet");
861 blocks_count += 1;
862
863 assert_eq!(last_blocks.next(), None);
864
865 assert_eq!(
866 last_block.0.to_string(),
867 "QmdgQac8c6Bo3MP5bHAg2yQ25KebFUsmkZFvyByYzf8UCB"
868 );
869
870 assert_eq!(blocks_count, 175);
871 }
872}