wnfs_unixfs_file/
balanced_tree.rs

1use crate::{
2    builder::encode_unixfs_pb,
3    protobufs,
4    types::Block,
5    unixfs::{DataType, Node, UnixFsFile},
6};
7use anyhow::Result;
8use async_stream::try_stream;
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use libipld::Cid;
12use std::collections::VecDeque;
13use wnfs_common::{utils::CondSend, BlockStore};
14
15/// Default degree number for balanced tree, taken from unixfs specs
16/// <https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout>
17pub const DEFAULT_DEGREE: usize = 174;
18
19#[derive(Debug, PartialEq, Eq)]
20pub enum TreeBuilder {
21    /// TreeBuilder that builds a "balanced tree" with a max degree size of
22    /// degree
23    Balanced { degree: usize },
24}
25
26impl TreeBuilder {
27    pub fn balanced_tree() -> Self {
28        Self::balanced_tree_with_degree(DEFAULT_DEGREE)
29    }
30
31    pub fn balanced_tree_with_degree(degree: usize) -> Self {
32        assert!(degree > 1);
33        TreeBuilder::Balanced { degree }
34    }
35
36    pub fn stream_tree<'a>(
37        &self,
38        chunks: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
39        store: &'a impl BlockStore,
40    ) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
41        match self {
42            TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree, store),
43        }
44    }
45}
46
47#[derive(Clone, Debug, PartialEq)]
48struct LinkInfo {
49    raw_data_len: u64,
50    encoded_len: u64,
51}
52
53fn stream_balanced_tree<'a>(
54    in_stream: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
55    degree: usize,
56    store: &'a impl BlockStore,
57) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
58    try_stream! {
59        // degree = 8
60        // VecDeque![ vec![] ]
61        // ..
62        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] ]
63        // VecDeque![ vec![8], vec![p0] ]
64
65        // ..
66
67        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] vec![p0] ]
68        // VecDeque![ vec![], vec![p0, p1]]
69
70        // ..
71
72        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] vec![p0, p1, p2, p3, p4, p5, p6, p7], ]
73        // VecDeque![ vec![], vec![p0, p1, p2, p3, p4, p5, p6, p7], vec![] ]
74        // VecDeque![ vec![8], vec![p8], vec![pp0] ]
75        //
76        // A vecdeque of vecs, the first vec representing the lowest layer of stem nodes
77        // and the last vec representing the root node
78        // Since we emit leaf and stem nodes as we go, we only need to keep track of the
79        // most "recent" branch, storing the links to that node's children & yielding them
80        // when each node reaches `degree` number of links
81        let mut tree: VecDeque<Vec<(Cid, LinkInfo)>> = VecDeque::new();
82        tree.push_back(Vec::with_capacity(degree));
83
84        let in_stream = in_stream.map(|chunk| TreeNode::Leaf(chunk?).encode());
85
86        tokio::pin!(in_stream);
87
88        while let Some(chunk) = in_stream.next().await {
89            let (block, link_info) = chunk?;
90            let tree_len = tree.len();
91
92            // check if the leaf node of the tree is full
93            if tree[0].len() == degree {
94                // if so, iterate through nodes
95                for i in 0..tree_len {
96                    // if we encounter any nodes that are not full, break
97                    if tree[i].len() < degree {
98                        break;
99                    }
100
101                    // in this case we have a full set of links & we are
102                    // at the top of the tree. Time to make a new layer.
103                    if i == tree_len - 1 {
104                        tree.push_back(Vec::with_capacity(degree));
105                    }
106
107                    // create node, keeping the cid
108                    let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree));
109                    let (block, link_info) = TreeNode::Stem(links).encode()?;
110                    let cid = block.store(store).await?;
111                    yield (cid, block);
112
113                    // add link_info to parent node
114                    tree[i+1].push((cid, link_info));
115                }
116                // at this point the tree will be able to recieve new links
117                // without "overflowing", aka the leaf node and stem nodes
118                // have fewer than `degree` number of links
119            }
120
121            // now that we know the tree is in a "healthy" state to
122            // recieve more links, add the link to the tree
123            let cid = block.store(store).await?;
124            tree[0].push((cid, link_info));
125            yield (cid, block);
126            // at this point, the leaf node may have `degree` number of
127            // links, but no other stem node will
128        }
129
130        // our stream had 1 chunk that we have already yielded
131        if tree.len() == 1 && tree[0].len() == 1 {
132            return
133        }
134
135        // clean up, aka yield the rest of the stem nodes
136        // since all the stem nodes are able to recieve links
137        // we don't have to worry about "overflow"
138        while let Some(links) = tree.pop_front() {
139            let (block, link_info) = TreeNode::Stem(links).encode()?;
140            let cid = block.store(store).await?;
141            yield (cid, block);
142
143            if let Some(front) = tree.front_mut() {
144                front.push((cid, link_info));
145            } else {
146                // final root, nothing to do
147            }
148        }
149    }
150}
151
152fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result<UnixFsFile> {
153    let blocksizes: Vec<u64> = links.iter().map(|l| l.1.raw_data_len).collect();
154    let filesize: u64 = blocksizes.iter().sum();
155    let links = links
156        .into_iter()
157        .map(|(cid, l)| protobufs::PbLink {
158            hash: Some(cid.to_bytes()),
159            // ALL "stem" nodes have `name: None`.
160            // In kubo, nodes that have links to `leaf` nodes have `name: Some("".to_string())`
161            name: None,
162            // tsize has no strict definition
163            // Iroh's definiton of `tsize` is "the cumulative size of the encoded tree
164            // pointed to by this link", so not just the size of the raw content, but including
165            // all encoded dag nodes as well.
166            // In the `go-merkledag` package, the `merkledag.proto` file, states that tsize
167            // is the "cumulative size of the target object"
168            // (https://github.com/ipfs/go-merkledag/blob/8335efd4765ed5a512baa7e522c3552d067cf966/pb/merkledag.proto#L29)
169            tsize: Some(l.encoded_len),
170        })
171        .collect();
172
173    // PBNode.Data
174    let inner = protobufs::Data {
175        r#type: DataType::File as i32,
176        // total size of the raw data this node points to
177        filesize: Some(filesize),
178        // sizes of the raw data pointed to by each link in this node
179        blocksizes,
180        ..Default::default()
181    };
182
183    // create PBNode
184    let outer = encode_unixfs_pb(&inner, links)?;
185
186    // create UnixfsNode
187    Ok(UnixFsFile::Node(Node { inner, outer }))
188}
189
190// Leaf and Stem nodes are the two types of nodes that can exist in the tree
191// Leaf nodes encode to `UnixfsNode::Raw`
192// Stem nodes encode to `UnixfsNode::File`
193enum TreeNode {
194    Leaf(Bytes),
195    Stem(Vec<(Cid, LinkInfo)>),
196}
197
198impl TreeNode {
199    fn encode(self) -> Result<(Block, LinkInfo)> {
200        match self {
201            TreeNode::Leaf(bytes) => {
202                let len = bytes.len();
203                let node = UnixFsFile::Raw(bytes);
204                let block = node.encode()?;
205                let link_info = LinkInfo {
206                    // in a leaf the raw data len and encoded len are the same since our leaf
207                    // nodes are raw unixfs nodes
208                    raw_data_len: len as u64,
209                    encoded_len: len as u64,
210                };
211                Ok((block, link_info))
212            }
213            TreeNode::Stem(links) => {
214                let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum();
215                let node = create_unixfs_node_from_links(links)?;
216                let block = node.encode()?;
217                encoded_len += block.data().len() as u64;
218                let raw_data_len = node
219                    .filesize()
220                    .expect("UnixfsNode::File will have a filesize");
221                Ok((
222                    block,
223                    LinkInfo {
224                        raw_data_len,
225                        encoded_len,
226                    },
227                ))
228            }
229        }
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use bytes::BytesMut;
237    use futures::StreamExt;
238    use wnfs_common::MemoryBlockStore;
239
240    // chunks are just a single usize integer
241    const CHUNK_SIZE: u64 = std::mem::size_of::<usize>() as u64;
242
243    fn test_chunk_stream(num_chunks: usize) -> impl Stream<Item = std::io::Result<Bytes>> {
244        futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into())))
245    }
246
247    async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec<Vec<(Cid, Block)>> {
248        let store = &MemoryBlockStore::new();
249        let chunks = test_chunk_stream(num_chunks);
250        tokio::pin!(chunks);
251        let mut tree = vec![vec![]];
252        let mut links = vec![vec![]];
253
254        if num_chunks / degree == 0 {
255            let chunk = chunks.next().await.unwrap().unwrap();
256            let leaf = TreeNode::Leaf(chunk);
257            let (block, _) = leaf.encode().unwrap();
258            let cid = block.store(store).await.unwrap();
259            tree[0].push((cid, block));
260            return tree;
261        }
262
263        while let Some(chunk) = chunks.next().await {
264            let chunk = chunk.unwrap();
265            let leaf = TreeNode::Leaf(chunk);
266            let (block, link_info) = leaf.encode().unwrap();
267            let cid = block.store(store).await.unwrap();
268            links[0].push((cid, link_info));
269            tree[0].push((cid, block));
270        }
271
272        while tree.last().unwrap().len() > 1 {
273            let prev_layer = links.last().unwrap();
274            let count = prev_layer.len() / degree;
275            let mut tree_layer = Vec::with_capacity(count);
276            let mut links_layer = Vec::with_capacity(count);
277            for links in prev_layer.chunks(degree) {
278                let stem = TreeNode::Stem(links.to_vec());
279                let (block, link_info) = stem.encode().unwrap();
280                let cid = block.store(store).await.unwrap();
281                links_layer.push((cid, link_info));
282                tree_layer.push((cid, block));
283            }
284            tree.push(tree_layer);
285            links.push(links_layer);
286        }
287        tree
288    }
289
290    async fn build_expect_vec_from_tree(
291        tree: Vec<Vec<(Cid, Block)>>,
292        num_chunks: usize,
293        degree: usize,
294    ) -> Vec<(Cid, Block)> {
295        let mut out = vec![];
296
297        if num_chunks == 1 {
298            out.push(tree[0][0].clone());
299            return out;
300        }
301
302        let mut counts = vec![0; tree.len()];
303
304        for leaf in tree[0].iter() {
305            out.push(leaf.clone());
306            counts[0] += 1;
307            let mut push = counts[0] % degree == 0;
308            for (num_layer, count) in counts.iter_mut().enumerate() {
309                if num_layer == 0 {
310                    continue;
311                }
312                if !push {
313                    break;
314                }
315                out.push(tree[num_layer][*count].clone());
316                *count += 1;
317                if *count % degree != 0 {
318                    push = false;
319                }
320            }
321        }
322
323        for (num_layer, count) in counts.into_iter().enumerate() {
324            if num_layer == 0 {
325                continue;
326            }
327            let layer = tree[num_layer].clone();
328            for node in layer.into_iter().skip(count) {
329                out.push(node);
330            }
331        }
332
333        out
334    }
335
336    async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Block)> {
337        let tree = build_expect_tree(num_chunks, degree).await;
338        println!("{tree:?}");
339        build_expect_vec_from_tree(tree, num_chunks, degree).await
340    }
341
342    async fn make_leaf(data: usize, store: &impl BlockStore) -> (Cid, Block, LinkInfo) {
343        let (block, link_info) = TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze())
344            .encode()
345            .unwrap();
346        let cid = block.store(store).await.unwrap();
347        (cid, block, link_info)
348    }
349
350    async fn make_stem(
351        links: Vec<(Cid, LinkInfo)>,
352        store: &impl BlockStore,
353    ) -> (Cid, Block, LinkInfo) {
354        let (block, link_info) = TreeNode::Stem(links).encode().unwrap();
355        let cid = block.store(store).await.unwrap();
356        (cid, block, link_info)
357    }
358
359    #[tokio::test]
360    async fn test_build_expect() {
361        let store = &MemoryBlockStore::new();
362        // manually build tree made of 7 chunks (11 total nodes)
363        let (leaf_0_cid, leaf_0, len_0) = make_leaf(0, store).await;
364        let (leaf_1_cid, leaf_1, len_1) = make_leaf(1, store).await;
365        let (leaf_2_cid, leaf_2, len_2) = make_leaf(2, store).await;
366        let (stem_0_cid, stem_0, stem_len_0) = make_stem(
367            vec![
368                (leaf_0_cid, len_0),
369                (leaf_1_cid, len_1),
370                (leaf_2_cid, len_2),
371            ],
372            store,
373        )
374        .await;
375        let (leaf_3_cid, leaf_3, len_3) = make_leaf(3, store).await;
376        let (leaf_4_cid, leaf_4, len_4) = make_leaf(4, store).await;
377        let (leaf_5_cid, leaf_5, len_5) = make_leaf(5, store).await;
378        let (stem_1_cid, stem_1, stem_len_1) = make_stem(
379            vec![
380                (leaf_3_cid, len_3),
381                (leaf_4_cid, len_4),
382                (leaf_5_cid, len_5),
383            ],
384            store,
385        )
386        .await;
387        let (leaf_6_cid, leaf_6, len_6) = make_leaf(6, store).await;
388        let (stem_2_cid, stem_2, stem_len_2) = make_stem(vec![(leaf_6_cid, len_6)], store).await;
389        let (root_cid, root, _root_len) = make_stem(
390            vec![
391                (stem_0_cid, stem_len_0),
392                (stem_1_cid, stem_len_1),
393                (stem_2_cid, stem_len_2),
394            ],
395            store,
396        )
397        .await;
398
399        let expect_tree = vec![
400            vec![
401                (leaf_0_cid, leaf_0.clone()),
402                (leaf_1_cid, leaf_1.clone()),
403                (leaf_2_cid, leaf_2.clone()),
404                (leaf_3_cid, leaf_3.clone()),
405                (leaf_4_cid, leaf_4.clone()),
406                (leaf_5_cid, leaf_5.clone()),
407                (leaf_6_cid, leaf_6.clone()),
408            ],
409            vec![
410                (stem_0_cid, stem_0.clone()),
411                (stem_1_cid, stem_1.clone()),
412                (stem_2_cid, stem_2.clone()),
413            ],
414            vec![(root_cid, root.clone())],
415        ];
416        let got_tree = build_expect_tree(7, 3).await;
417        assert_eq!(expect_tree, got_tree);
418
419        let expect_vec = vec![
420            (leaf_0_cid, leaf_0),
421            (leaf_1_cid, leaf_1),
422            (leaf_2_cid, leaf_2),
423            (stem_0_cid, stem_0),
424            (leaf_3_cid, leaf_3),
425            (leaf_4_cid, leaf_4),
426            (leaf_5_cid, leaf_5),
427            (stem_1_cid, stem_1),
428            (leaf_6_cid, leaf_6),
429            (stem_2_cid, stem_2),
430            (root_cid, root),
431        ];
432        let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await;
433        assert_eq!(expect_vec, got_vec);
434    }
435
436    async fn ensure_equal(
437        expect: Vec<(Cid, Block)>,
438        got: impl Stream<Item = Result<(Cid, Block)>>,
439        expected_filesize: u64,
440    ) {
441        let mut i = 0;
442        tokio::pin!(got);
443        let mut got_filesize = 0;
444        let mut expected_tsize = 0;
445        let mut got_tsize = 0;
446        while let Some(node) = got.next().await {
447            let (expect_cid, expect_block) = expect
448                .get(i)
449                .expect("too many nodes in balanced tree stream")
450                .clone();
451            let (got_cid, got_block) = node.expect("unexpected error in balanced tree stream");
452            let len = got_block.data().len() as u64;
453            println!("node index {i}");
454            assert_eq!(expect_cid, got_cid);
455            assert_eq!(expect_block.data(), got_block.data());
456            i += 1;
457            let expect_node = UnixFsFile::decode(&expect_cid, expect_block.data().clone()).unwrap();
458            let got_node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
459            if let Some(DataType::File) = got_node.typ() {
460                assert_eq!(
461                    got_node.filesize().unwrap(),
462                    got_node.blocksizes().iter().sum::<u64>()
463                );
464            }
465            assert_eq!(expect_node, got_node);
466            if expect.len() == i {
467                let node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
468                got_tsize = node.links().map(|l| l.unwrap().tsize.unwrap()).sum();
469                got_filesize = got_node.filesize().unwrap();
470            } else {
471                expected_tsize += len;
472            }
473        }
474        if expect.len() != i {
475            panic!(
476                "expected at {} nodes of the stream, got {}",
477                expect.len(),
478                i
479            );
480        }
481        assert_eq!(expected_filesize, got_filesize);
482        assert_eq!(expected_tsize, got_tsize);
483    }
484
485    #[tokio::test]
486    async fn balanced_tree_test_leaf() {
487        let store = &MemoryBlockStore::new();
488        let num_chunks = 1;
489        let expect = build_expect(num_chunks, 3).await;
490        let got = stream_balanced_tree(test_chunk_stream(1), 3, store);
491        tokio::pin!(got);
492        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
493    }
494
495    #[tokio::test]
496    async fn balanced_tree_test_height_one() {
497        let store = &MemoryBlockStore::new();
498        let num_chunks = 3;
499        let degrees = 3;
500        let expect = build_expect(num_chunks, degrees).await;
501        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
502        tokio::pin!(got);
503        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
504    }
505
506    #[tokio::test]
507    async fn balanced_tree_test_height_two_full() {
508        let store = &MemoryBlockStore::new();
509        let degrees = 3;
510        let num_chunks = 9;
511        let expect = build_expect(num_chunks, degrees).await;
512        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
513        tokio::pin!(got);
514        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
515    }
516
517    #[tokio::test]
518    async fn balanced_tree_test_height_two_not_full() {
519        let store = &MemoryBlockStore::new();
520        let degrees = 3;
521        let num_chunks = 10;
522        let expect = build_expect(num_chunks, degrees).await;
523        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
524        tokio::pin!(got);
525        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
526    }
527
528    #[tokio::test]
529    async fn balanced_tree_test_height_three() {
530        let store = &MemoryBlockStore::new();
531        let num_chunks = 125;
532        let degrees = 5;
533        let expect = build_expect(num_chunks, degrees).await;
534        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
535        tokio::pin!(got);
536        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
537    }
538
539    #[tokio::test]
540    async fn balanced_tree_test_large() {
541        let store = &MemoryBlockStore::new();
542        let num_chunks = 780;
543        let degrees = 11;
544        let expect = build_expect(num_chunks, degrees).await;
545        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
546        tokio::pin!(got);
547        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
548    }
549}