wnfs_unixfs_file/
balanced_tree.rs

1use crate::{
2    builder::encode_unixfs_pb,
3    protobufs,
4    types::Block,
5    unixfs::{DataType, Node, UnixFsFile},
6};
7use anyhow::Result;
8use async_stream::try_stream;
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use std::collections::VecDeque;
12use wnfs_common::{BlockStore, Cid, utils::CondSend};
13
14/// Default degree number for balanced tree, taken from unixfs specs
15/// <https://github.com/ipfs/specs/blob/main/UNIXFS.md#layout>
16pub const DEFAULT_DEGREE: usize = 174;
17
18#[derive(Debug, PartialEq, Eq)]
19pub enum TreeBuilder {
20    /// TreeBuilder that builds a "balanced tree" with a max degree size of
21    /// degree
22    Balanced { degree: usize },
23}
24
25impl TreeBuilder {
26    pub fn balanced_tree() -> Self {
27        Self::balanced_tree_with_degree(DEFAULT_DEGREE)
28    }
29
30    pub fn balanced_tree_with_degree(degree: usize) -> Self {
31        assert!(degree > 1);
32        TreeBuilder::Balanced { degree }
33    }
34
35    pub fn stream_tree<'a>(
36        self,
37        chunks: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
38        store: &'a impl BlockStore,
39    ) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
40        match self {
41            TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, degree, store),
42        }
43    }
44}
45
46#[derive(Clone, Debug, PartialEq)]
47struct LinkInfo {
48    raw_data_len: u64,
49    encoded_len: u64,
50}
51
52fn stream_balanced_tree<'a>(
53    in_stream: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
54    degree: usize,
55    store: &'a impl BlockStore,
56) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
57    try_stream! {
58        // degree = 8
59        // VecDeque![ vec![] ]
60        // ..
61        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] ]
62        // VecDeque![ vec![8], vec![p0] ]
63
64        // ..
65
66        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] vec![p0] ]
67        // VecDeque![ vec![], vec![p0, p1]]
68
69        // ..
70
71        // VecDeque![ vec![0, 1, 2, 3, 4, 5, 6, 7] vec![p0, p1, p2, p3, p4, p5, p6, p7], ]
72        // VecDeque![ vec![], vec![p0, p1, p2, p3, p4, p5, p6, p7], vec![] ]
73        // VecDeque![ vec![8], vec![p8], vec![pp0] ]
74        //
75        // A vecdeque of vecs, the first vec representing the lowest layer of stem nodes
76        // and the last vec representing the root node
77        // Since we emit leaf and stem nodes as we go, we only need to keep track of the
78        // most "recent" branch, storing the links to that node's children & yielding them
79        // when each node reaches `degree` number of links
80        let mut tree: VecDeque<Vec<(Cid, LinkInfo)>> = VecDeque::new();
81        tree.push_back(Vec::with_capacity(degree));
82
83        let in_stream = in_stream.map(|chunk| TreeNode::Leaf(chunk?).encode());
84
85        tokio::pin!(in_stream);
86
87        while let Some(chunk) = in_stream.next().await {
88            let (block, link_info) = chunk?;
89            let tree_len = tree.len();
90
91            // check if the leaf node of the tree is full
92            if tree[0].len() == degree {
93                // if so, iterate through nodes
94                for i in 0..tree_len {
95                    // if we encounter any nodes that are not full, break
96                    if tree[i].len() < degree {
97                        break;
98                    }
99
100                    // in this case we have a full set of links & we are
101                    // at the top of the tree. Time to make a new layer.
102                    if i == tree_len - 1 {
103                        tree.push_back(Vec::with_capacity(degree));
104                    }
105
106                    // create node, keeping the cid
107                    let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree));
108                    let (block, link_info) = TreeNode::Stem(links).encode()?;
109                    let cid = block.store(store).await?;
110                    yield (cid, block);
111
112                    // add link_info to parent node
113                    tree[i+1].push((cid, link_info));
114                }
115                // at this point the tree will be able to recieve new links
116                // without "overflowing", aka the leaf node and stem nodes
117                // have fewer than `degree` number of links
118            }
119
120            // now that we know the tree is in a "healthy" state to
121            // recieve more links, add the link to the tree
122            let cid = block.store(store).await?;
123            tree[0].push((cid, link_info));
124            yield (cid, block);
125            // at this point, the leaf node may have `degree` number of
126            // links, but no other stem node will
127        }
128
129        // our stream had 1 chunk that we have already yielded
130        if tree.len() == 1 && tree[0].len() == 1 {
131            return
132        }
133
134        // clean up, aka yield the rest of the stem nodes
135        // since all the stem nodes are able to recieve links
136        // we don't have to worry about "overflow"
137        while let Some(links) = tree.pop_front() {
138            let (block, link_info) = TreeNode::Stem(links).encode()?;
139            let cid = block.store(store).await?;
140            yield (cid, block);
141
142            if let Some(front) = tree.front_mut() {
143                front.push((cid, link_info));
144            } else {
145                // final root, nothing to do
146            }
147        }
148    }
149}
150
151fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result<UnixFsFile> {
152    let blocksizes: Vec<u64> = links.iter().map(|l| l.1.raw_data_len).collect();
153    let filesize: u64 = blocksizes.iter().sum();
154    let links = links
155        .into_iter()
156        .map(|(cid, l)| protobufs::PbLink {
157            hash: Some(cid.to_bytes()),
158            // ALL "stem" nodes have `name: None`.
159            // In kubo, nodes that have links to `leaf` nodes have `name: Some("".to_string())`
160            name: None,
161            // tsize has no strict definition
162            // Iroh's definiton of `tsize` is "the cumulative size of the encoded tree
163            // pointed to by this link", so not just the size of the raw content, but including
164            // all encoded dag nodes as well.
165            // In the `go-merkledag` package, the `merkledag.proto` file, states that tsize
166            // is the "cumulative size of the target object"
167            // (https://github.com/ipfs/go-merkledag/blob/8335efd4765ed5a512baa7e522c3552d067cf966/pb/merkledag.proto#L29)
168            tsize: Some(l.encoded_len),
169        })
170        .collect();
171
172    // PBNode.Data
173    let inner = protobufs::Data {
174        r#type: DataType::File as i32,
175        // total size of the raw data this node points to
176        filesize: Some(filesize),
177        // sizes of the raw data pointed to by each link in this node
178        blocksizes,
179        ..Default::default()
180    };
181
182    // create PBNode
183    let outer = encode_unixfs_pb(&inner, links)?;
184
185    // create UnixfsNode
186    Ok(UnixFsFile::Node(Node { inner, outer }))
187}
188
189// Leaf and Stem nodes are the two types of nodes that can exist in the tree
190// Leaf nodes encode to `UnixfsNode::Raw`
191// Stem nodes encode to `UnixfsNode::File`
192enum TreeNode {
193    Leaf(Bytes),
194    Stem(Vec<(Cid, LinkInfo)>),
195}
196
197impl TreeNode {
198    fn encode(self) -> Result<(Block, LinkInfo)> {
199        match self {
200            TreeNode::Leaf(bytes) => {
201                let len = bytes.len();
202                let node = UnixFsFile::Raw(bytes);
203                let block = node.encode()?;
204                let link_info = LinkInfo {
205                    // in a leaf the raw data len and encoded len are the same since our leaf
206                    // nodes are raw unixfs nodes
207                    raw_data_len: len as u64,
208                    encoded_len: len as u64,
209                };
210                Ok((block, link_info))
211            }
212            TreeNode::Stem(links) => {
213                let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum();
214                let node = create_unixfs_node_from_links(links)?;
215                let block = node.encode()?;
216                encoded_len += block.data().len() as u64;
217                let raw_data_len = node
218                    .filesize()
219                    .expect("UnixfsNode::File will have a filesize");
220                Ok((
221                    block,
222                    LinkInfo {
223                        raw_data_len,
224                        encoded_len,
225                    },
226                ))
227            }
228        }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use bytes::BytesMut;
236    use wnfs_common::MemoryBlockStore;
237
238    // chunks are just a single usize integer
239    const CHUNK_SIZE: u64 = std::mem::size_of::<usize>() as u64;
240
241    fn test_chunk_stream(num_chunks: usize) -> impl Stream<Item = std::io::Result<Bytes>> {
242        futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into())))
243    }
244
245    async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec<Vec<(Cid, Block)>> {
246        let store = &MemoryBlockStore::new();
247        let chunks = test_chunk_stream(num_chunks);
248        tokio::pin!(chunks);
249        let mut tree = vec![vec![]];
250        let mut links = vec![vec![]];
251
252        if num_chunks / degree == 0 {
253            let chunk = chunks.next().await.unwrap().unwrap();
254            let leaf = TreeNode::Leaf(chunk);
255            let (block, _) = leaf.encode().unwrap();
256            let cid = block.store(store).await.unwrap();
257            tree[0].push((cid, block));
258            return tree;
259        }
260
261        while let Some(chunk) = chunks.next().await {
262            let chunk = chunk.unwrap();
263            let leaf = TreeNode::Leaf(chunk);
264            let (block, link_info) = leaf.encode().unwrap();
265            let cid = block.store(store).await.unwrap();
266            links[0].push((cid, link_info));
267            tree[0].push((cid, block));
268        }
269
270        while tree.last().unwrap().len() > 1 {
271            let prev_layer = links.last().unwrap();
272            let count = prev_layer.len() / degree;
273            let mut tree_layer = Vec::with_capacity(count);
274            let mut links_layer = Vec::with_capacity(count);
275            for links in prev_layer.chunks(degree) {
276                let stem = TreeNode::Stem(links.to_vec());
277                let (block, link_info) = stem.encode().unwrap();
278                let cid = block.store(store).await.unwrap();
279                links_layer.push((cid, link_info));
280                tree_layer.push((cid, block));
281            }
282            tree.push(tree_layer);
283            links.push(links_layer);
284        }
285        tree
286    }
287
288    async fn build_expect_vec_from_tree(
289        tree: Vec<Vec<(Cid, Block)>>,
290        num_chunks: usize,
291        degree: usize,
292    ) -> Vec<(Cid, Block)> {
293        let mut out = vec![];
294
295        if num_chunks == 1 {
296            out.push(tree[0][0].clone());
297            return out;
298        }
299
300        let mut counts = vec![0; tree.len()];
301
302        for leaf in tree[0].iter() {
303            out.push(leaf.clone());
304            counts[0] += 1;
305            let mut push = counts[0] % degree == 0;
306            for (num_layer, count) in counts.iter_mut().enumerate() {
307                if num_layer == 0 {
308                    continue;
309                }
310                if !push {
311                    break;
312                }
313                out.push(tree[num_layer][*count].clone());
314                *count += 1;
315                if *count % degree != 0 {
316                    push = false;
317                }
318            }
319        }
320
321        for (num_layer, count) in counts.into_iter().enumerate() {
322            if num_layer == 0 {
323                continue;
324            }
325            let layer = tree[num_layer].clone();
326            for node in layer.into_iter().skip(count) {
327                out.push(node);
328            }
329        }
330
331        out
332    }
333
334    async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Block)> {
335        let tree = build_expect_tree(num_chunks, degree).await;
336        println!("{tree:?}");
337        build_expect_vec_from_tree(tree, num_chunks, degree).await
338    }
339
340    async fn make_leaf(data: usize, store: &impl BlockStore) -> (Cid, Block, LinkInfo) {
341        let (block, link_info) = TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze())
342            .encode()
343            .unwrap();
344        let cid = block.store(store).await.unwrap();
345        (cid, block, link_info)
346    }
347
348    async fn make_stem(
349        links: Vec<(Cid, LinkInfo)>,
350        store: &impl BlockStore,
351    ) -> (Cid, Block, LinkInfo) {
352        let (block, link_info) = TreeNode::Stem(links).encode().unwrap();
353        let cid = block.store(store).await.unwrap();
354        (cid, block, link_info)
355    }
356
357    #[tokio::test]
358    async fn test_build_expect() {
359        let store = &MemoryBlockStore::new();
360        // manually build tree made of 7 chunks (11 total nodes)
361        let (leaf_0_cid, leaf_0, len_0) = make_leaf(0, store).await;
362        let (leaf_1_cid, leaf_1, len_1) = make_leaf(1, store).await;
363        let (leaf_2_cid, leaf_2, len_2) = make_leaf(2, store).await;
364        let (stem_0_cid, stem_0, stem_len_0) = make_stem(
365            vec![
366                (leaf_0_cid, len_0),
367                (leaf_1_cid, len_1),
368                (leaf_2_cid, len_2),
369            ],
370            store,
371        )
372        .await;
373        let (leaf_3_cid, leaf_3, len_3) = make_leaf(3, store).await;
374        let (leaf_4_cid, leaf_4, len_4) = make_leaf(4, store).await;
375        let (leaf_5_cid, leaf_5, len_5) = make_leaf(5, store).await;
376        let (stem_1_cid, stem_1, stem_len_1) = make_stem(
377            vec![
378                (leaf_3_cid, len_3),
379                (leaf_4_cid, len_4),
380                (leaf_5_cid, len_5),
381            ],
382            store,
383        )
384        .await;
385        let (leaf_6_cid, leaf_6, len_6) = make_leaf(6, store).await;
386        let (stem_2_cid, stem_2, stem_len_2) = make_stem(vec![(leaf_6_cid, len_6)], store).await;
387        let (root_cid, root, _root_len) = make_stem(
388            vec![
389                (stem_0_cid, stem_len_0),
390                (stem_1_cid, stem_len_1),
391                (stem_2_cid, stem_len_2),
392            ],
393            store,
394        )
395        .await;
396
397        let expect_tree = vec![
398            vec![
399                (leaf_0_cid, leaf_0.clone()),
400                (leaf_1_cid, leaf_1.clone()),
401                (leaf_2_cid, leaf_2.clone()),
402                (leaf_3_cid, leaf_3.clone()),
403                (leaf_4_cid, leaf_4.clone()),
404                (leaf_5_cid, leaf_5.clone()),
405                (leaf_6_cid, leaf_6.clone()),
406            ],
407            vec![
408                (stem_0_cid, stem_0.clone()),
409                (stem_1_cid, stem_1.clone()),
410                (stem_2_cid, stem_2.clone()),
411            ],
412            vec![(root_cid, root.clone())],
413        ];
414        let got_tree = build_expect_tree(7, 3).await;
415        assert_eq!(expect_tree, got_tree);
416
417        let expect_vec = vec![
418            (leaf_0_cid, leaf_0),
419            (leaf_1_cid, leaf_1),
420            (leaf_2_cid, leaf_2),
421            (stem_0_cid, stem_0),
422            (leaf_3_cid, leaf_3),
423            (leaf_4_cid, leaf_4),
424            (leaf_5_cid, leaf_5),
425            (stem_1_cid, stem_1),
426            (leaf_6_cid, leaf_6),
427            (stem_2_cid, stem_2),
428            (root_cid, root),
429        ];
430        let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await;
431        assert_eq!(expect_vec, got_vec);
432    }
433
434    async fn ensure_equal(
435        expect: Vec<(Cid, Block)>,
436        got: impl Stream<Item = Result<(Cid, Block)>>,
437        expected_filesize: u64,
438    ) {
439        let mut i = 0;
440        tokio::pin!(got);
441        let mut got_filesize = 0;
442        let mut expected_tsize = 0;
443        let mut got_tsize = 0;
444        while let Some(node) = got.next().await {
445            let (expect_cid, expect_block) = expect
446                .get(i)
447                .expect("too many nodes in balanced tree stream")
448                .clone();
449            let (got_cid, got_block) = node.expect("unexpected error in balanced tree stream");
450            let len = got_block.data().len() as u64;
451            println!("node index {i}");
452            assert_eq!(expect_cid, got_cid);
453            assert_eq!(expect_block.data(), got_block.data());
454            i += 1;
455            let expect_node = UnixFsFile::decode(&expect_cid, expect_block.data().clone()).unwrap();
456            let got_node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
457            if let Some(DataType::File) = got_node.typ() {
458                assert_eq!(
459                    got_node.filesize().unwrap(),
460                    got_node.blocksizes().iter().sum::<u64>()
461                );
462            }
463            assert_eq!(expect_node, got_node);
464            if expect.len() == i {
465                let node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
466                got_tsize = node.links().map(|l| l.unwrap().tsize.unwrap()).sum();
467                got_filesize = got_node.filesize().unwrap();
468            } else {
469                expected_tsize += len;
470            }
471        }
472        if expect.len() != i {
473            panic!(
474                "expected at {} nodes of the stream, got {}",
475                expect.len(),
476                i
477            );
478        }
479        assert_eq!(expected_filesize, got_filesize);
480        assert_eq!(expected_tsize, got_tsize);
481    }
482
483    #[tokio::test]
484    async fn balanced_tree_test_leaf() {
485        let store = &MemoryBlockStore::new();
486        let num_chunks = 1;
487        let expect = build_expect(num_chunks, 3).await;
488        let got = stream_balanced_tree(test_chunk_stream(1), 3, store);
489        tokio::pin!(got);
490        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
491    }
492
493    #[tokio::test]
494    async fn balanced_tree_test_height_one() {
495        let store = &MemoryBlockStore::new();
496        let num_chunks = 3;
497        let degrees = 3;
498        let expect = build_expect(num_chunks, degrees).await;
499        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
500        tokio::pin!(got);
501        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
502    }
503
504    #[tokio::test]
505    async fn balanced_tree_test_height_two_full() {
506        let store = &MemoryBlockStore::new();
507        let degrees = 3;
508        let num_chunks = 9;
509        let expect = build_expect(num_chunks, degrees).await;
510        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
511        tokio::pin!(got);
512        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
513    }
514
515    #[tokio::test]
516    async fn balanced_tree_test_height_two_not_full() {
517        let store = &MemoryBlockStore::new();
518        let degrees = 3;
519        let num_chunks = 10;
520        let expect = build_expect(num_chunks, degrees).await;
521        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
522        tokio::pin!(got);
523        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
524    }
525
526    #[tokio::test]
527    async fn balanced_tree_test_height_three() {
528        let store = &MemoryBlockStore::new();
529        let num_chunks = 125;
530        let degrees = 5;
531        let expect = build_expect(num_chunks, degrees).await;
532        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
533        tokio::pin!(got);
534        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
535    }
536
537    #[tokio::test]
538    async fn balanced_tree_test_large() {
539        let store = &MemoryBlockStore::new();
540        let num_chunks = 780;
541        let degrees = 11;
542        let expect = build_expect(num_chunks, degrees).await;
543        let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
544        tokio::pin!(got);
545        ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
546    }
547}