1use crate::{
2 builder::encode_unixfs_pb,
3 protobufs,
4 types::Block,
5 unixfs::{DataType, Node, UnixFsFile},
6};
7use anyhow::Result;
8use async_stream::try_stream;
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use libipld::Cid;
12use std::collections::VecDeque;
13use wnfs_common::{utils::CondSend, BlockStore};
14
15pub const DEFAULT_DEGREE: usize = 174;
18
19#[derive(Debug, PartialEq, Eq)]
20pub enum TreeBuilder {
21 Balanced { degree: usize },
24}
25
26impl TreeBuilder {
27 pub fn balanced_tree() -> Self {
28 Self::balanced_tree_with_degree(DEFAULT_DEGREE)
29 }
30
31 pub fn balanced_tree_with_degree(degree: usize) -> Self {
32 assert!(degree > 1);
33 TreeBuilder::Balanced { degree }
34 }
35
36 pub fn stream_tree<'a>(
37 &self,
38 chunks: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
39 store: &'a impl BlockStore,
40 ) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
41 match self {
42 TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree, store),
43 }
44 }
45}
46
47#[derive(Clone, Debug, PartialEq)]
48struct LinkInfo {
49 raw_data_len: u64,
50 encoded_len: u64,
51}
52
53fn stream_balanced_tree<'a>(
54 in_stream: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
55 degree: usize,
56 store: &'a impl BlockStore,
57) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
58 try_stream! {
59 let mut tree: VecDeque<Vec<(Cid, LinkInfo)>> = VecDeque::new();
82 tree.push_back(Vec::with_capacity(degree));
83
84 let in_stream = in_stream.map(|chunk| TreeNode::Leaf(chunk?).encode());
85
86 tokio::pin!(in_stream);
87
88 while let Some(chunk) = in_stream.next().await {
89 let (block, link_info) = chunk?;
90 let tree_len = tree.len();
91
92 if tree[0].len() == degree {
94 for i in 0..tree_len {
96 if tree[i].len() < degree {
98 break;
99 }
100
101 if i == tree_len - 1 {
104 tree.push_back(Vec::with_capacity(degree));
105 }
106
107 let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree));
109 let (block, link_info) = TreeNode::Stem(links).encode()?;
110 let cid = block.store(store).await?;
111 yield (cid, block);
112
113 tree[i+1].push((cid, link_info));
115 }
116 }
120
121 let cid = block.store(store).await?;
124 tree[0].push((cid, link_info));
125 yield (cid, block);
126 }
129
130 if tree.len() == 1 && tree[0].len() == 1 {
132 return
133 }
134
135 while let Some(links) = tree.pop_front() {
139 let (block, link_info) = TreeNode::Stem(links).encode()?;
140 let cid = block.store(store).await?;
141 yield (cid, block);
142
143 if let Some(front) = tree.front_mut() {
144 front.push((cid, link_info));
145 } else {
146 }
148 }
149 }
150}
151
152fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result<UnixFsFile> {
153 let blocksizes: Vec<u64> = links.iter().map(|l| l.1.raw_data_len).collect();
154 let filesize: u64 = blocksizes.iter().sum();
155 let links = links
156 .into_iter()
157 .map(|(cid, l)| protobufs::PbLink {
158 hash: Some(cid.to_bytes()),
159 name: None,
162 tsize: Some(l.encoded_len),
170 })
171 .collect();
172
173 let inner = protobufs::Data {
175 r#type: DataType::File as i32,
176 filesize: Some(filesize),
178 blocksizes,
180 ..Default::default()
181 };
182
183 let outer = encode_unixfs_pb(&inner, links)?;
185
186 Ok(UnixFsFile::Node(Node { inner, outer }))
188}
189
190enum TreeNode {
194 Leaf(Bytes),
195 Stem(Vec<(Cid, LinkInfo)>),
196}
197
198impl TreeNode {
199 fn encode(self) -> Result<(Block, LinkInfo)> {
200 match self {
201 TreeNode::Leaf(bytes) => {
202 let len = bytes.len();
203 let node = UnixFsFile::Raw(bytes);
204 let block = node.encode()?;
205 let link_info = LinkInfo {
206 raw_data_len: len as u64,
209 encoded_len: len as u64,
210 };
211 Ok((block, link_info))
212 }
213 TreeNode::Stem(links) => {
214 let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum();
215 let node = create_unixfs_node_from_links(links)?;
216 let block = node.encode()?;
217 encoded_len += block.data().len() as u64;
218 let raw_data_len = node
219 .filesize()
220 .expect("UnixfsNode::File will have a filesize");
221 Ok((
222 block,
223 LinkInfo {
224 raw_data_len,
225 encoded_len,
226 },
227 ))
228 }
229 }
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use bytes::BytesMut;
237 use futures::StreamExt;
238 use wnfs_common::MemoryBlockStore;
239
240 const CHUNK_SIZE: u64 = std::mem::size_of::<usize>() as u64;
242
243 fn test_chunk_stream(num_chunks: usize) -> impl Stream<Item = std::io::Result<Bytes>> {
244 futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into())))
245 }
246
247 async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec<Vec<(Cid, Block)>> {
248 let store = &MemoryBlockStore::new();
249 let chunks = test_chunk_stream(num_chunks);
250 tokio::pin!(chunks);
251 let mut tree = vec![vec![]];
252 let mut links = vec![vec![]];
253
254 if num_chunks / degree == 0 {
255 let chunk = chunks.next().await.unwrap().unwrap();
256 let leaf = TreeNode::Leaf(chunk);
257 let (block, _) = leaf.encode().unwrap();
258 let cid = block.store(store).await.unwrap();
259 tree[0].push((cid, block));
260 return tree;
261 }
262
263 while let Some(chunk) = chunks.next().await {
264 let chunk = chunk.unwrap();
265 let leaf = TreeNode::Leaf(chunk);
266 let (block, link_info) = leaf.encode().unwrap();
267 let cid = block.store(store).await.unwrap();
268 links[0].push((cid, link_info));
269 tree[0].push((cid, block));
270 }
271
272 while tree.last().unwrap().len() > 1 {
273 let prev_layer = links.last().unwrap();
274 let count = prev_layer.len() / degree;
275 let mut tree_layer = Vec::with_capacity(count);
276 let mut links_layer = Vec::with_capacity(count);
277 for links in prev_layer.chunks(degree) {
278 let stem = TreeNode::Stem(links.to_vec());
279 let (block, link_info) = stem.encode().unwrap();
280 let cid = block.store(store).await.unwrap();
281 links_layer.push((cid, link_info));
282 tree_layer.push((cid, block));
283 }
284 tree.push(tree_layer);
285 links.push(links_layer);
286 }
287 tree
288 }
289
290 async fn build_expect_vec_from_tree(
291 tree: Vec<Vec<(Cid, Block)>>,
292 num_chunks: usize,
293 degree: usize,
294 ) -> Vec<(Cid, Block)> {
295 let mut out = vec![];
296
297 if num_chunks == 1 {
298 out.push(tree[0][0].clone());
299 return out;
300 }
301
302 let mut counts = vec![0; tree.len()];
303
304 for leaf in tree[0].iter() {
305 out.push(leaf.clone());
306 counts[0] += 1;
307 let mut push = counts[0] % degree == 0;
308 for (num_layer, count) in counts.iter_mut().enumerate() {
309 if num_layer == 0 {
310 continue;
311 }
312 if !push {
313 break;
314 }
315 out.push(tree[num_layer][*count].clone());
316 *count += 1;
317 if *count % degree != 0 {
318 push = false;
319 }
320 }
321 }
322
323 for (num_layer, count) in counts.into_iter().enumerate() {
324 if num_layer == 0 {
325 continue;
326 }
327 let layer = tree[num_layer].clone();
328 for node in layer.into_iter().skip(count) {
329 out.push(node);
330 }
331 }
332
333 out
334 }
335
336 async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Block)> {
337 let tree = build_expect_tree(num_chunks, degree).await;
338 println!("{tree:?}");
339 build_expect_vec_from_tree(tree, num_chunks, degree).await
340 }
341
342 async fn make_leaf(data: usize, store: &impl BlockStore) -> (Cid, Block, LinkInfo) {
343 let (block, link_info) = TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze())
344 .encode()
345 .unwrap();
346 let cid = block.store(store).await.unwrap();
347 (cid, block, link_info)
348 }
349
350 async fn make_stem(
351 links: Vec<(Cid, LinkInfo)>,
352 store: &impl BlockStore,
353 ) -> (Cid, Block, LinkInfo) {
354 let (block, link_info) = TreeNode::Stem(links).encode().unwrap();
355 let cid = block.store(store).await.unwrap();
356 (cid, block, link_info)
357 }
358
359 #[tokio::test]
360 async fn test_build_expect() {
361 let store = &MemoryBlockStore::new();
362 let (leaf_0_cid, leaf_0, len_0) = make_leaf(0, store).await;
364 let (leaf_1_cid, leaf_1, len_1) = make_leaf(1, store).await;
365 let (leaf_2_cid, leaf_2, len_2) = make_leaf(2, store).await;
366 let (stem_0_cid, stem_0, stem_len_0) = make_stem(
367 vec![
368 (leaf_0_cid, len_0),
369 (leaf_1_cid, len_1),
370 (leaf_2_cid, len_2),
371 ],
372 store,
373 )
374 .await;
375 let (leaf_3_cid, leaf_3, len_3) = make_leaf(3, store).await;
376 let (leaf_4_cid, leaf_4, len_4) = make_leaf(4, store).await;
377 let (leaf_5_cid, leaf_5, len_5) = make_leaf(5, store).await;
378 let (stem_1_cid, stem_1, stem_len_1) = make_stem(
379 vec![
380 (leaf_3_cid, len_3),
381 (leaf_4_cid, len_4),
382 (leaf_5_cid, len_5),
383 ],
384 store,
385 )
386 .await;
387 let (leaf_6_cid, leaf_6, len_6) = make_leaf(6, store).await;
388 let (stem_2_cid, stem_2, stem_len_2) = make_stem(vec![(leaf_6_cid, len_6)], store).await;
389 let (root_cid, root, _root_len) = make_stem(
390 vec![
391 (stem_0_cid, stem_len_0),
392 (stem_1_cid, stem_len_1),
393 (stem_2_cid, stem_len_2),
394 ],
395 store,
396 )
397 .await;
398
399 let expect_tree = vec![
400 vec![
401 (leaf_0_cid, leaf_0.clone()),
402 (leaf_1_cid, leaf_1.clone()),
403 (leaf_2_cid, leaf_2.clone()),
404 (leaf_3_cid, leaf_3.clone()),
405 (leaf_4_cid, leaf_4.clone()),
406 (leaf_5_cid, leaf_5.clone()),
407 (leaf_6_cid, leaf_6.clone()),
408 ],
409 vec![
410 (stem_0_cid, stem_0.clone()),
411 (stem_1_cid, stem_1.clone()),
412 (stem_2_cid, stem_2.clone()),
413 ],
414 vec![(root_cid, root.clone())],
415 ];
416 let got_tree = build_expect_tree(7, 3).await;
417 assert_eq!(expect_tree, got_tree);
418
419 let expect_vec = vec![
420 (leaf_0_cid, leaf_0),
421 (leaf_1_cid, leaf_1),
422 (leaf_2_cid, leaf_2),
423 (stem_0_cid, stem_0),
424 (leaf_3_cid, leaf_3),
425 (leaf_4_cid, leaf_4),
426 (leaf_5_cid, leaf_5),
427 (stem_1_cid, stem_1),
428 (leaf_6_cid, leaf_6),
429 (stem_2_cid, stem_2),
430 (root_cid, root),
431 ];
432 let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await;
433 assert_eq!(expect_vec, got_vec);
434 }
435
436 async fn ensure_equal(
437 expect: Vec<(Cid, Block)>,
438 got: impl Stream<Item = Result<(Cid, Block)>>,
439 expected_filesize: u64,
440 ) {
441 let mut i = 0;
442 tokio::pin!(got);
443 let mut got_filesize = 0;
444 let mut expected_tsize = 0;
445 let mut got_tsize = 0;
446 while let Some(node) = got.next().await {
447 let (expect_cid, expect_block) = expect
448 .get(i)
449 .expect("too many nodes in balanced tree stream")
450 .clone();
451 let (got_cid, got_block) = node.expect("unexpected error in balanced tree stream");
452 let len = got_block.data().len() as u64;
453 println!("node index {i}");
454 assert_eq!(expect_cid, got_cid);
455 assert_eq!(expect_block.data(), got_block.data());
456 i += 1;
457 let expect_node = UnixFsFile::decode(&expect_cid, expect_block.data().clone()).unwrap();
458 let got_node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
459 if let Some(DataType::File) = got_node.typ() {
460 assert_eq!(
461 got_node.filesize().unwrap(),
462 got_node.blocksizes().iter().sum::<u64>()
463 );
464 }
465 assert_eq!(expect_node, got_node);
466 if expect.len() == i {
467 let node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
468 got_tsize = node.links().map(|l| l.unwrap().tsize.unwrap()).sum();
469 got_filesize = got_node.filesize().unwrap();
470 } else {
471 expected_tsize += len;
472 }
473 }
474 if expect.len() != i {
475 panic!(
476 "expected at {} nodes of the stream, got {}",
477 expect.len(),
478 i
479 );
480 }
481 assert_eq!(expected_filesize, got_filesize);
482 assert_eq!(expected_tsize, got_tsize);
483 }
484
485 #[tokio::test]
486 async fn balanced_tree_test_leaf() {
487 let store = &MemoryBlockStore::new();
488 let num_chunks = 1;
489 let expect = build_expect(num_chunks, 3).await;
490 let got = stream_balanced_tree(test_chunk_stream(1), 3, store);
491 tokio::pin!(got);
492 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
493 }
494
495 #[tokio::test]
496 async fn balanced_tree_test_height_one() {
497 let store = &MemoryBlockStore::new();
498 let num_chunks = 3;
499 let degrees = 3;
500 let expect = build_expect(num_chunks, degrees).await;
501 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
502 tokio::pin!(got);
503 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
504 }
505
506 #[tokio::test]
507 async fn balanced_tree_test_height_two_full() {
508 let store = &MemoryBlockStore::new();
509 let degrees = 3;
510 let num_chunks = 9;
511 let expect = build_expect(num_chunks, degrees).await;
512 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
513 tokio::pin!(got);
514 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
515 }
516
517 #[tokio::test]
518 async fn balanced_tree_test_height_two_not_full() {
519 let store = &MemoryBlockStore::new();
520 let degrees = 3;
521 let num_chunks = 10;
522 let expect = build_expect(num_chunks, degrees).await;
523 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
524 tokio::pin!(got);
525 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
526 }
527
528 #[tokio::test]
529 async fn balanced_tree_test_height_three() {
530 let store = &MemoryBlockStore::new();
531 let num_chunks = 125;
532 let degrees = 5;
533 let expect = build_expect(num_chunks, degrees).await;
534 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
535 tokio::pin!(got);
536 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
537 }
538
539 #[tokio::test]
540 async fn balanced_tree_test_large() {
541 let store = &MemoryBlockStore::new();
542 let num_chunks = 780;
543 let degrees = 11;
544 let expect = build_expect(num_chunks, degrees).await;
545 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
546 tokio::pin!(got);
547 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
548 }
549}