1use crate::pb::{FlatUnixFs, PBLink, UnixFs, UnixFsType, DAG_PB};
2use alloc::borrow::Cow;
3use core::fmt;
4use libipld::multihash::{Code, MultihashDigest};
5use libipld::Cid;
6use quick_protobuf::{MessageWrite, Writer};
7
8#[derive(Default)]
17pub struct FileAdder {
18 chunker: Chunker,
19 collector: Collector,
20 block_buffer: Vec<u8>,
21 unflushed_links: Vec<Link>,
27}
28
29impl fmt::Debug for FileAdder {
30 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
31 write!(
32 fmt,
33 "FileAdder {{ chunker: {:?}, block_buffer: {}/{}, unflushed_links: {} }}",
34 self.chunker,
35 self.block_buffer.len(),
36 self.block_buffer.capacity(),
37 LinkFormatter(&self.unflushed_links),
38 )
39 }
40}
41
42struct LinkFormatter<'a>(&'a [Link]);
43
44impl fmt::Display for LinkFormatter<'_> {
45 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
46 let mut iter = self.0.iter().peekable();
47
48 write!(fmt, "[")?;
49
50 let mut current = match iter.peek() {
51 Some(Link { depth, .. }) => depth,
52 None => return write!(fmt, "]"),
53 };
54
55 let mut count = 0;
56
57 for Link {
58 depth: next_depth, ..
59 } in iter
60 {
61 if current == next_depth {
62 count += 1;
63 } else {
64 write!(fmt, "{}: {}/", current, count)?;
65
66 let steps_between = if current > next_depth {
67 current - next_depth
68 } else {
69 next_depth - current
70 };
71
72 for _ in 0..steps_between - 1 {
73 write!(fmt, "0/")?;
74 }
75 count = 1;
76 current = next_depth;
77 }
78 }
79
80 write!(fmt, "{}: {}]", current, count)
81 }
82}
83
84struct Link {
87 depth: usize,
90 target: Cid,
92 total_size: u64,
94 file_size: u64,
97}
98
99impl fmt::Debug for Link {
100 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
101 fmt.debug_struct("Link")
102 .field("depth", &self.depth)
103 .field("target", &format_args!("{}", self.target))
104 .field("total_size", &self.total_size)
105 .field("file_size", &self.file_size)
106 .finish()
107 }
108}
109
110#[derive(Default)]
112pub struct FileAdderBuilder {
113 chunker: Chunker,
114 collector: Collector,
115}
116
117impl FileAdderBuilder {
118 pub fn with_chunker(self, chunker: Chunker) -> Self {
120 FileAdderBuilder { chunker, ..self }
121 }
122
123 pub fn with_collector(self, collector: impl Into<Collector>) -> Self {
125 FileAdderBuilder {
126 collector: collector.into(),
127 ..self
128 }
129 }
130
131 pub fn build(self) -> FileAdder {
133 let FileAdderBuilder { chunker, collector } = self;
134
135 FileAdder {
136 chunker,
137 collector,
138 ..Default::default()
139 }
140 }
141}
142
143impl FileAdder {
144 pub fn builder() -> FileAdderBuilder {
146 FileAdderBuilder::default()
147 }
148
149 pub fn size_hint(&self) -> usize {
154 self.chunker.size_hint()
155 }
156
157 pub fn push(&mut self, input: &[u8]) -> (impl Iterator<Item = (Cid, Vec<u8>)>, usize) {
162 let (accepted, ready) = self.chunker.accept(input, &self.block_buffer);
163
164 if self.block_buffer.is_empty() && ready {
165 let leaf = Self::flush_buffered_leaf(accepted, &mut self.unflushed_links, false);
173 assert!(leaf.is_some(), "chunk completed, must produce a new block");
174 self.block_buffer.clear();
175 let links = self.flush_buffered_links(false);
176 (leaf.into_iter().chain(links.into_iter()), accepted.len())
177 } else {
178 if self.block_buffer.capacity() == 0 {
181 self.block_buffer.reserve(self.size_hint());
184 }
185
186 self.block_buffer.extend_from_slice(accepted);
187 let written = accepted.len();
188
189 let (leaf, links) = if !ready {
190 (None, Vec::new())
192 } else {
193 let leaf = Self::flush_buffered_leaf(
195 self.block_buffer.as_slice(),
196 &mut self.unflushed_links,
197 false,
198 );
199 assert!(leaf.is_some(), "chunk completed, must produce a new block");
200 self.block_buffer.clear();
201 let links = self.flush_buffered_links(false);
202
203 (leaf, links)
204 };
205 (leaf.into_iter().chain(links.into_iter()), written)
206 }
207 }
208
209 pub fn finish(mut self) -> impl Iterator<Item = (Cid, Vec<u8>)> {
216 let last_leaf = Self::flush_buffered_leaf(
217 self.block_buffer.as_slice(),
218 &mut self.unflushed_links,
219 true,
220 );
221 let root_links = self.flush_buffered_links(true);
222 last_leaf.into_iter().chain(root_links.into_iter())
224 }
225
226 fn flush_buffered_leaf(
229 input: &[u8],
230 unflushed_links: &mut Vec<Link>,
231 finishing: bool,
232 ) -> Option<(Cid, Vec<u8>)> {
233 if input.is_empty() && (!finishing || !unflushed_links.is_empty()) {
234 return None;
235 }
236
237 let data = if !input.is_empty() {
240 Some(Cow::Borrowed(input))
241 } else {
242 None
243 };
244
245 let filesize = Some(input.len() as u64);
246
247 let inner = FlatUnixFs {
248 links: Vec::new(),
249 data: UnixFs {
250 Type: UnixFsType::File,
251 Data: data,
252 filesize,
253 ..Default::default()
255 },
256 };
257
258 let (cid, vec) = render_and_hash(&inner);
259
260 let total_size = vec.len();
261
262 let link = Link {
263 depth: 0,
264 target: cid,
265 total_size: total_size as u64,
266 file_size: input.len() as u64,
267 };
268
269 unflushed_links.push(link);
270
271 Some((cid, vec))
272 }
273
274 fn flush_buffered_links(&mut self, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
275 self.collector
276 .flush_links(&mut self.unflushed_links, finishing)
277 }
278
279 #[cfg(test)]
284 fn collect_blocks(mut self, all_content: &[u8], mut amt: usize) -> Vec<(Cid, Vec<u8>)> {
285 let mut written = 0;
286 let mut blocks_received = Vec::new();
287
288 if amt == 0 {
289 amt = all_content.len();
290 }
291
292 while written < all_content.len() {
293 let end = written + (all_content.len() - written).min(amt);
294 let slice = &all_content[written..end];
295
296 let (blocks, pushed) = self.push(slice);
297 blocks_received.extend(blocks);
298 written += pushed;
299 }
300
301 let last_blocks = self.finish();
302 blocks_received.extend(last_blocks);
303
304 blocks_received
305 }
306}
307
308fn render_and_hash(flat: &FlatUnixFs<'_>) -> (Cid, Vec<u8>) {
309 let mut out = Vec::with_capacity(flat.get_size());
313 let mut writer = Writer::new(&mut out);
314 flat.write_message(&mut writer)
315 .expect("unsure how this could fail");
316 let mh = Code::Sha2_256.digest(&out);
317 let cid = Cid::new_v1(DAG_PB, mh);
318 (cid, out)
319}
320
321#[derive(Debug, Clone)]
323pub enum Chunker {
324 Size(usize),
326}
327
328impl Default for Chunker {
329 fn default() -> Self {
331 Chunker::Size(256 * 1024)
332 }
333}
334
335impl Chunker {
336 fn accept<'a>(&mut self, input: &'a [u8], buffered: &[u8]) -> (&'a [u8], bool) {
337 use Chunker::*;
338
339 match self {
340 Size(max) => {
341 let l = input.len().min(*max - buffered.len());
342 let accepted = &input[..l];
343 let ready = buffered.len() + l >= *max;
344 (accepted, ready)
345 }
346 }
347 }
348
349 fn size_hint(&self) -> usize {
350 use Chunker::*;
351
352 match self {
353 Size(max) => *max,
354 }
355 }
356}
357
358#[derive(Debug, Clone)]
363pub enum Collector {
364 Balanced(BalancedCollector),
366}
367
368impl Default for Collector {
369 fn default() -> Self {
370 Collector::Balanced(Default::default())
371 }
372}
373
374impl Collector {
375 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
376 use Collector::*;
377
378 match self {
379 Balanced(bc) => bc.flush_links(pending, finishing),
380 }
381 }
382}
383
384#[derive(Clone)]
387pub struct BalancedCollector {
388 branching_factor: usize,
389 reused_links: Vec<PBLink<'static>>,
391 reused_blocksizes: Vec<u64>,
393}
394
395impl fmt::Debug for BalancedCollector {
396 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
397 write!(
398 fmt,
399 "BalancedCollector {{ branching_factor: {} }}",
400 self.branching_factor
401 )
402 }
403}
404
405impl Default for BalancedCollector {
406 fn default() -> Self {
414 Self::with_branching_factor(174)
415 }
416}
417
418impl From<BalancedCollector> for Collector {
419 fn from(b: BalancedCollector) -> Self {
420 Collector::Balanced(b)
421 }
422}
423
424impl BalancedCollector {
425 pub fn with_branching_factor(branching_factor: usize) -> Self {
427 assert!(branching_factor > 0);
428
429 Self {
430 branching_factor,
431 reused_links: Vec::new(),
432 reused_blocksizes: Vec::new(),
433 }
434 }
435
436 fn flush_links(&mut self, pending: &mut Vec<Link>, finishing: bool) -> Vec<(Cid, Vec<u8>)> {
440 let mut ret = Vec::new();
487
488 let mut reused_links = core::mem::take(&mut self.reused_links);
489 let mut reused_blocksizes = core::mem::take(&mut self.reused_blocksizes);
490
491 if let Some(need) = self.branching_factor.checked_sub(reused_links.capacity()) {
492 reused_links.reserve(need);
493 }
494
495 if let Some(need) = self
496 .branching_factor
497 .checked_sub(reused_blocksizes.capacity())
498 {
499 reused_blocksizes.reserve(need);
500 }
501
502 'outer: for level in 0.. {
503 if pending.len() == 1 && finishing
504 || pending.len() <= self.branching_factor && !finishing
505 {
506 break;
512 }
513
514 let mut starting_point = 0;
518
519 let mut last_overwrite = None;
523
524 while let Some(mut first_at) = &pending[starting_point..]
525 .iter()
526 .position(|Link { depth, .. }| depth == &level)
527 {
528 first_at += starting_point;
531
532 if !finishing && pending[first_at..].len() <= self.branching_factor {
533 if let Some(last_overwrite) = last_overwrite {
534 pending.drain((last_overwrite + 1)..first_at);
536 }
537 break 'outer;
538 }
539
540 reused_links.clear();
541 reused_blocksizes.clear();
542
543 let mut nested_size = 0;
544 let mut nested_total_size = 0;
545
546 let last = (first_at + self.branching_factor).min(pending.len());
547
548 for (index, link) in pending[first_at..last].iter().enumerate() {
549 assert_eq!(
550 link.depth,
551 level,
552 "unexpected link depth {} when searching at level {} index {}",
553 link.depth,
554 level,
555 index + first_at
556 );
557
558 Self::partition_link(
559 link,
560 &mut reused_links,
561 &mut reused_blocksizes,
562 &mut nested_size,
563 &mut nested_total_size,
564 );
565 }
566
567 debug_assert_eq!(reused_links.len(), reused_blocksizes.len());
568
569 let inner = FlatUnixFs {
570 links: reused_links,
571 data: UnixFs {
572 Type: UnixFsType::File,
573 filesize: Some(nested_size),
574 blocksizes: reused_blocksizes,
575 ..Default::default()
576 },
577 };
578
579 let (cid, vec) = render_and_hash(&inner);
580
581 let index = last_overwrite.map(|i| i + 1).unwrap_or(first_at);
584 pending[index] = Link {
585 depth: level + 1,
586 target: cid,
587 total_size: nested_total_size + vec.len() as u64,
588 file_size: nested_size,
589 };
590
591 ret.push((cid, vec));
592
593 reused_links = inner.links;
594 reused_blocksizes = inner.data.blocksizes;
595
596 starting_point = last;
597 last_overwrite = Some(index);
598 }
599
600 if let Some(last_overwrite) = last_overwrite {
601 pending.truncate(last_overwrite + 1);
602 }
603
604 debug_assert_eq!(
607 pending.iter().position(|l| l.depth == level),
608 None,
609 "should have no more of depth {}: {}",
610 level,
611 LinkFormatter(pending.as_slice())
612 );
613 }
614
615 self.reused_links = reused_links;
616 self.reused_blocksizes = reused_blocksizes;
617
618 ret
619 }
620
621 fn partition_link(
624 link: &Link,
625 links: &mut Vec<PBLink<'static>>,
626 blocksizes: &mut Vec<u64>,
627 nested_size: &mut u64,
628 nested_total_size: &mut u64,
629 ) {
630 links.push(PBLink {
631 Hash: Some(link.target.to_bytes().into()),
632 Name: Some("".into()),
633 Tsize: Some(link.total_size),
634 });
635 blocksizes.push(link.file_size);
636 *nested_size += link.file_size;
637 *nested_total_size += link.total_size;
638 }
639}
640
641#[cfg(test)]
642mod tests {
643
644 use super::{BalancedCollector, Chunker, FileAdder};
645 use crate::test_support::FakeBlockstore;
646 use core::convert::TryFrom;
647 use hex_literal::hex;
648 use libipld::Cid;
649
650 #[test]
651 fn test_size_chunker() {
652 assert_eq!(size_chunker_scenario(1, 4, 0), (1, true));
653 assert_eq!(size_chunker_scenario(2, 4, 0), (2, true));
654 assert_eq!(size_chunker_scenario(2, 1, 0), (1, false));
655 assert_eq!(size_chunker_scenario(2, 1, 1), (1, true));
656 assert_eq!(size_chunker_scenario(32, 3, 29), (3, true));
657 assert_eq!(size_chunker_scenario(32, 4, 29), (3, true));
659 }
660
661 fn size_chunker_scenario(max: usize, input_len: usize, existing_len: usize) -> (usize, bool) {
662 let input = vec![0; input_len];
663 let existing = vec![0; existing_len];
664
665 let (accepted, ready) = Chunker::Size(max).accept(&input, &existing);
666 (accepted.len(), ready)
667 }
668
669 #[test]
670 fn favourite_single_block_file() {
671 let blocks = FakeBlockstore::with_fixtures();
672 let content = b"foobar\n";
674
675 let mut adder = FileAdder::default();
676
677 {
678 let (mut ready_blocks, bytes) = adder.push(content);
679 assert!(ready_blocks.next().is_none());
680 assert_eq!(bytes, content.len());
681 }
682
683 let (_, file_block) = adder
686 .finish()
687 .next()
688 .expect("there must have been the root block");
689
690 assert_eq!(
691 blocks.get_by_str("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL"),
692 file_block.as_slice()
693 );
694 }
695
696 #[test]
697 fn favourite_multi_block_file() {
698 let blocks = FakeBlockstore::with_fixtures();
701 let content = b"foobar\n";
702 let adder = FileAdder::builder().with_chunker(Chunker::Size(2)).build();
703
704 let blocks_received = adder.collect_blocks(content, 0);
705
706 let expected = [
710 "bafybeih67h7bqbeufm26dhqulib7tsovzkojs7o2bkkbn47vcwss6gz44e",
711 "bafybeig7xffxllfsbd6uq46yjbzk6wf5mxdtc5ykpvga33vubchiooil7y",
712 "bafybeiafisl24tujqewigj3kjdr6m6ibhj4iw7aowatrfxyvbfoafvwnfq",
713 "bafybeigmgmwown66u7j5pqancojrc5ry2pwzmnlvqnwg2rfcjfi6irgplu",
714 "bafybeicbvfcf3ys43v7u4obvdypbdzdsddiqvagpwldjvlj7kyrkwkpm3u",
715 ]
716 .iter()
717 .map(|key| {
718 let cid = Cid::try_from(*key).unwrap();
719 let block = blocks.get_by_str(key).to_vec();
720 (cid, block)
721 })
722 .collect::<Vec<_>>();
723
724 if blocks_received != expected {
725 for ((actual_cid, actual_block), (expected_cid, expected_block)) in
726 blocks_received.into_iter().zip(expected.into_iter())
727 {
728 assert_eq!(
729 actual_cid, expected_cid,
730 "Expected\n\t{}\n\t{:02x?}\nActual\n\t{}\n\t{:02x?}",
731 expected_cid, expected_block, actual_cid, actual_block
732 );
733 }
734 }
735 }
736
737 #[test]
738 fn three_layers() {
739 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
740 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
741 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
742
743 assert!(content.len() > 174 && content.len() < 2 * 174);
744
745 let adder = FileAdder::builder().with_chunker(Chunker::Size(1)).build();
753
754 let blocks_received = adder.collect_blocks(content, 0);
755
756 assert_eq!(blocks_received.len(), 240);
757
758 assert_eq!(
759 blocks_received.last().unwrap().0.to_string(),
760 "bafybeiedgegddvmfxlnikgwgxet6am5xkrhforeqb6wxyga3p7jmmge27u"
761 );
762 }
763
764 #[test]
765 fn three_layers_all_subchunks() {
766 let content = b"Lorem ipsum dolor sit amet, sit enim montes aliquam. Cras non lorem, \
767 rhoncus condimentum, irure et ante. Pulvinar suscipit odio ante, et tellus a enim, \
768 wisi ipsum, vel rhoncus eget faucibus varius, luctus turpis nibh vel odio nulla pede.";
769
770 for amt in 1..32 {
771 let adder = FileAdder::builder().with_chunker(Chunker::Size(32)).build();
772 let blocks_received = adder.collect_blocks(content, amt);
773 assert_eq!(
774 blocks_received.last().unwrap().0.to_string(),
775 "bafybeiedxvsh3dwkublmenfbir3li4jf52opwcf5dgpi62h6yv4gd7klku",
776 "amt: {}",
777 amt
778 );
779 }
780 }
781
782 #[test]
783 fn empty_file() {
784 let blocks = FileAdder::default().collect_blocks(b"", 0);
785 assert_eq!(blocks.len(), 1);
786 assert_eq!(blocks[0].1.as_slice(), &hex!("0a 04 08 02 18 00"));
793 assert_eq!(
794 blocks[0].0.to_string(),
795 "bafybeif7ztnhq65lumvvtr4ekcwd2ifwgm3awq4zfr3srh462rwyinlb4y"
796 );
797 }
798
799 #[test]
800 fn full_link_block_and_a_byte() {
801 let buf = vec![0u8; 2];
802
803 let branching_factor = 174;
811
812 let mut adder = FileAdder::builder()
813 .with_chunker(Chunker::Size(2))
814 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
815 .build();
816 let mut blocks_count = 0;
817
818 for _ in 0..branching_factor {
819 let (blocks, written) = adder.push(buf.as_slice());
820 assert_eq!(written, buf.len());
821
822 blocks_count += blocks.count();
823 }
824
825 let (blocks, written) = adder.push(&buf[0..1]);
826 assert_eq!(written, 1);
827 blocks_count += blocks.count();
828
829 let last_blocks = adder.finish().collect::<Vec<_>>();
830 blocks_count += last_blocks.len();
831
832 assert_eq!(blocks_count, branching_factor + 1 + 1 + 1 + 1);
838
839 assert_eq!(
840 last_blocks.last().unwrap().0.to_string(),
841 "bafybeiai6dvlhomgaargeb677jd7bugp7pscqeakbvtapdlpvsfuedojqa"
842 );
843 }
844
845 #[test]
846 fn full_link_block() {
847 let buf = vec![0u8; 1];
848
849 let branching_factor = 174;
850
851 let mut adder = FileAdder::builder()
852 .with_chunker(Chunker::Size(1))
853 .with_collector(BalancedCollector::with_branching_factor(branching_factor))
854 .build();
855 let mut blocks_count = 0;
856
857 for _ in 0..branching_factor {
858 let (blocks, written) = adder.push(buf.as_slice());
859 assert_eq!(written, buf.len());
860
861 blocks_count += blocks.count();
862 }
863
864 let mut last_blocks = adder.finish();
865
866 let last_block = last_blocks.next().expect("must not have flushed yet");
869 blocks_count += 1;
870
871 assert_eq!(last_blocks.next(), None);
872
873 assert_eq!(
874 last_block.0.to_string(),
875 "bafybeihkyby6yehjk25b3havpq4jxsfqxf54jizpjqodovxiuwav62uzpy"
876 );
877
878 assert_eq!(blocks_count, 175);
879 }
880}