1use crate::{
2 builder::encode_unixfs_pb,
3 protobufs,
4 types::Block,
5 unixfs::{DataType, Node, UnixFsFile},
6};
7use anyhow::Result;
8use async_stream::try_stream;
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use std::collections::VecDeque;
12use wnfs_common::{BlockStore, Cid, utils::CondSend};
13
14pub const DEFAULT_DEGREE: usize = 174;
17
18#[derive(Debug, PartialEq, Eq)]
19pub enum TreeBuilder {
20 Balanced { degree: usize },
23}
24
25impl TreeBuilder {
26 pub fn balanced_tree() -> Self {
27 Self::balanced_tree_with_degree(DEFAULT_DEGREE)
28 }
29
30 pub fn balanced_tree_with_degree(degree: usize) -> Self {
31 assert!(degree > 1);
32 TreeBuilder::Balanced { degree }
33 }
34
35 pub fn stream_tree<'a>(
36 self,
37 chunks: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
38 store: &'a impl BlockStore,
39 ) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
40 match self {
41 TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, degree, store),
42 }
43 }
44}
45
46#[derive(Clone, Debug, PartialEq)]
47struct LinkInfo {
48 raw_data_len: u64,
49 encoded_len: u64,
50}
51
52fn stream_balanced_tree<'a>(
53 in_stream: impl Stream<Item = std::io::Result<Bytes>> + CondSend + 'a,
54 degree: usize,
55 store: &'a impl BlockStore,
56) -> impl Stream<Item = Result<(Cid, Block)>> + 'a {
57 try_stream! {
58 let mut tree: VecDeque<Vec<(Cid, LinkInfo)>> = VecDeque::new();
81 tree.push_back(Vec::with_capacity(degree));
82
83 let in_stream = in_stream.map(|chunk| TreeNode::Leaf(chunk?).encode());
84
85 tokio::pin!(in_stream);
86
87 while let Some(chunk) = in_stream.next().await {
88 let (block, link_info) = chunk?;
89 let tree_len = tree.len();
90
91 if tree[0].len() == degree {
93 for i in 0..tree_len {
95 if tree[i].len() < degree {
97 break;
98 }
99
100 if i == tree_len - 1 {
103 tree.push_back(Vec::with_capacity(degree));
104 }
105
106 let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree));
108 let (block, link_info) = TreeNode::Stem(links).encode()?;
109 let cid = block.store(store).await?;
110 yield (cid, block);
111
112 tree[i+1].push((cid, link_info));
114 }
115 }
119
120 let cid = block.store(store).await?;
123 tree[0].push((cid, link_info));
124 yield (cid, block);
125 }
128
129 if tree.len() == 1 && tree[0].len() == 1 {
131 return
132 }
133
134 while let Some(links) = tree.pop_front() {
138 let (block, link_info) = TreeNode::Stem(links).encode()?;
139 let cid = block.store(store).await?;
140 yield (cid, block);
141
142 if let Some(front) = tree.front_mut() {
143 front.push((cid, link_info));
144 } else {
145 }
147 }
148 }
149}
150
151fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result<UnixFsFile> {
152 let blocksizes: Vec<u64> = links.iter().map(|l| l.1.raw_data_len).collect();
153 let filesize: u64 = blocksizes.iter().sum();
154 let links = links
155 .into_iter()
156 .map(|(cid, l)| protobufs::PbLink {
157 hash: Some(cid.to_bytes()),
158 name: None,
161 tsize: Some(l.encoded_len),
169 })
170 .collect();
171
172 let inner = protobufs::Data {
174 r#type: DataType::File as i32,
175 filesize: Some(filesize),
177 blocksizes,
179 ..Default::default()
180 };
181
182 let outer = encode_unixfs_pb(&inner, links)?;
184
185 Ok(UnixFsFile::Node(Node { inner, outer }))
187}
188
189enum TreeNode {
193 Leaf(Bytes),
194 Stem(Vec<(Cid, LinkInfo)>),
195}
196
197impl TreeNode {
198 fn encode(self) -> Result<(Block, LinkInfo)> {
199 match self {
200 TreeNode::Leaf(bytes) => {
201 let len = bytes.len();
202 let node = UnixFsFile::Raw(bytes);
203 let block = node.encode()?;
204 let link_info = LinkInfo {
205 raw_data_len: len as u64,
208 encoded_len: len as u64,
209 };
210 Ok((block, link_info))
211 }
212 TreeNode::Stem(links) => {
213 let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum();
214 let node = create_unixfs_node_from_links(links)?;
215 let block = node.encode()?;
216 encoded_len += block.data().len() as u64;
217 let raw_data_len = node
218 .filesize()
219 .expect("UnixfsNode::File will have a filesize");
220 Ok((
221 block,
222 LinkInfo {
223 raw_data_len,
224 encoded_len,
225 },
226 ))
227 }
228 }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235 use bytes::BytesMut;
236 use wnfs_common::MemoryBlockStore;
237
238 const CHUNK_SIZE: u64 = std::mem::size_of::<usize>() as u64;
240
241 fn test_chunk_stream(num_chunks: usize) -> impl Stream<Item = std::io::Result<Bytes>> {
242 futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into())))
243 }
244
245 async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec<Vec<(Cid, Block)>> {
246 let store = &MemoryBlockStore::new();
247 let chunks = test_chunk_stream(num_chunks);
248 tokio::pin!(chunks);
249 let mut tree = vec![vec![]];
250 let mut links = vec![vec![]];
251
252 if num_chunks / degree == 0 {
253 let chunk = chunks.next().await.unwrap().unwrap();
254 let leaf = TreeNode::Leaf(chunk);
255 let (block, _) = leaf.encode().unwrap();
256 let cid = block.store(store).await.unwrap();
257 tree[0].push((cid, block));
258 return tree;
259 }
260
261 while let Some(chunk) = chunks.next().await {
262 let chunk = chunk.unwrap();
263 let leaf = TreeNode::Leaf(chunk);
264 let (block, link_info) = leaf.encode().unwrap();
265 let cid = block.store(store).await.unwrap();
266 links[0].push((cid, link_info));
267 tree[0].push((cid, block));
268 }
269
270 while tree.last().unwrap().len() > 1 {
271 let prev_layer = links.last().unwrap();
272 let count = prev_layer.len() / degree;
273 let mut tree_layer = Vec::with_capacity(count);
274 let mut links_layer = Vec::with_capacity(count);
275 for links in prev_layer.chunks(degree) {
276 let stem = TreeNode::Stem(links.to_vec());
277 let (block, link_info) = stem.encode().unwrap();
278 let cid = block.store(store).await.unwrap();
279 links_layer.push((cid, link_info));
280 tree_layer.push((cid, block));
281 }
282 tree.push(tree_layer);
283 links.push(links_layer);
284 }
285 tree
286 }
287
288 async fn build_expect_vec_from_tree(
289 tree: Vec<Vec<(Cid, Block)>>,
290 num_chunks: usize,
291 degree: usize,
292 ) -> Vec<(Cid, Block)> {
293 let mut out = vec![];
294
295 if num_chunks == 1 {
296 out.push(tree[0][0].clone());
297 return out;
298 }
299
300 let mut counts = vec![0; tree.len()];
301
302 for leaf in tree[0].iter() {
303 out.push(leaf.clone());
304 counts[0] += 1;
305 let mut push = counts[0] % degree == 0;
306 for (num_layer, count) in counts.iter_mut().enumerate() {
307 if num_layer == 0 {
308 continue;
309 }
310 if !push {
311 break;
312 }
313 out.push(tree[num_layer][*count].clone());
314 *count += 1;
315 if *count % degree != 0 {
316 push = false;
317 }
318 }
319 }
320
321 for (num_layer, count) in counts.into_iter().enumerate() {
322 if num_layer == 0 {
323 continue;
324 }
325 let layer = tree[num_layer].clone();
326 for node in layer.into_iter().skip(count) {
327 out.push(node);
328 }
329 }
330
331 out
332 }
333
334 async fn build_expect(num_chunks: usize, degree: usize) -> Vec<(Cid, Block)> {
335 let tree = build_expect_tree(num_chunks, degree).await;
336 println!("{tree:?}");
337 build_expect_vec_from_tree(tree, num_chunks, degree).await
338 }
339
340 async fn make_leaf(data: usize, store: &impl BlockStore) -> (Cid, Block, LinkInfo) {
341 let (block, link_info) = TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze())
342 .encode()
343 .unwrap();
344 let cid = block.store(store).await.unwrap();
345 (cid, block, link_info)
346 }
347
348 async fn make_stem(
349 links: Vec<(Cid, LinkInfo)>,
350 store: &impl BlockStore,
351 ) -> (Cid, Block, LinkInfo) {
352 let (block, link_info) = TreeNode::Stem(links).encode().unwrap();
353 let cid = block.store(store).await.unwrap();
354 (cid, block, link_info)
355 }
356
357 #[tokio::test]
358 async fn test_build_expect() {
359 let store = &MemoryBlockStore::new();
360 let (leaf_0_cid, leaf_0, len_0) = make_leaf(0, store).await;
362 let (leaf_1_cid, leaf_1, len_1) = make_leaf(1, store).await;
363 let (leaf_2_cid, leaf_2, len_2) = make_leaf(2, store).await;
364 let (stem_0_cid, stem_0, stem_len_0) = make_stem(
365 vec![
366 (leaf_0_cid, len_0),
367 (leaf_1_cid, len_1),
368 (leaf_2_cid, len_2),
369 ],
370 store,
371 )
372 .await;
373 let (leaf_3_cid, leaf_3, len_3) = make_leaf(3, store).await;
374 let (leaf_4_cid, leaf_4, len_4) = make_leaf(4, store).await;
375 let (leaf_5_cid, leaf_5, len_5) = make_leaf(5, store).await;
376 let (stem_1_cid, stem_1, stem_len_1) = make_stem(
377 vec![
378 (leaf_3_cid, len_3),
379 (leaf_4_cid, len_4),
380 (leaf_5_cid, len_5),
381 ],
382 store,
383 )
384 .await;
385 let (leaf_6_cid, leaf_6, len_6) = make_leaf(6, store).await;
386 let (stem_2_cid, stem_2, stem_len_2) = make_stem(vec![(leaf_6_cid, len_6)], store).await;
387 let (root_cid, root, _root_len) = make_stem(
388 vec![
389 (stem_0_cid, stem_len_0),
390 (stem_1_cid, stem_len_1),
391 (stem_2_cid, stem_len_2),
392 ],
393 store,
394 )
395 .await;
396
397 let expect_tree = vec![
398 vec![
399 (leaf_0_cid, leaf_0.clone()),
400 (leaf_1_cid, leaf_1.clone()),
401 (leaf_2_cid, leaf_2.clone()),
402 (leaf_3_cid, leaf_3.clone()),
403 (leaf_4_cid, leaf_4.clone()),
404 (leaf_5_cid, leaf_5.clone()),
405 (leaf_6_cid, leaf_6.clone()),
406 ],
407 vec![
408 (stem_0_cid, stem_0.clone()),
409 (stem_1_cid, stem_1.clone()),
410 (stem_2_cid, stem_2.clone()),
411 ],
412 vec![(root_cid, root.clone())],
413 ];
414 let got_tree = build_expect_tree(7, 3).await;
415 assert_eq!(expect_tree, got_tree);
416
417 let expect_vec = vec![
418 (leaf_0_cid, leaf_0),
419 (leaf_1_cid, leaf_1),
420 (leaf_2_cid, leaf_2),
421 (stem_0_cid, stem_0),
422 (leaf_3_cid, leaf_3),
423 (leaf_4_cid, leaf_4),
424 (leaf_5_cid, leaf_5),
425 (stem_1_cid, stem_1),
426 (leaf_6_cid, leaf_6),
427 (stem_2_cid, stem_2),
428 (root_cid, root),
429 ];
430 let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await;
431 assert_eq!(expect_vec, got_vec);
432 }
433
434 async fn ensure_equal(
435 expect: Vec<(Cid, Block)>,
436 got: impl Stream<Item = Result<(Cid, Block)>>,
437 expected_filesize: u64,
438 ) {
439 let mut i = 0;
440 tokio::pin!(got);
441 let mut got_filesize = 0;
442 let mut expected_tsize = 0;
443 let mut got_tsize = 0;
444 while let Some(node) = got.next().await {
445 let (expect_cid, expect_block) = expect
446 .get(i)
447 .expect("too many nodes in balanced tree stream")
448 .clone();
449 let (got_cid, got_block) = node.expect("unexpected error in balanced tree stream");
450 let len = got_block.data().len() as u64;
451 println!("node index {i}");
452 assert_eq!(expect_cid, got_cid);
453 assert_eq!(expect_block.data(), got_block.data());
454 i += 1;
455 let expect_node = UnixFsFile::decode(&expect_cid, expect_block.data().clone()).unwrap();
456 let got_node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
457 if let Some(DataType::File) = got_node.typ() {
458 assert_eq!(
459 got_node.filesize().unwrap(),
460 got_node.blocksizes().iter().sum::<u64>()
461 );
462 }
463 assert_eq!(expect_node, got_node);
464 if expect.len() == i {
465 let node = UnixFsFile::decode(&got_cid, got_block.data().clone()).unwrap();
466 got_tsize = node.links().map(|l| l.unwrap().tsize.unwrap()).sum();
467 got_filesize = got_node.filesize().unwrap();
468 } else {
469 expected_tsize += len;
470 }
471 }
472 if expect.len() != i {
473 panic!(
474 "expected at {} nodes of the stream, got {}",
475 expect.len(),
476 i
477 );
478 }
479 assert_eq!(expected_filesize, got_filesize);
480 assert_eq!(expected_tsize, got_tsize);
481 }
482
483 #[tokio::test]
484 async fn balanced_tree_test_leaf() {
485 let store = &MemoryBlockStore::new();
486 let num_chunks = 1;
487 let expect = build_expect(num_chunks, 3).await;
488 let got = stream_balanced_tree(test_chunk_stream(1), 3, store);
489 tokio::pin!(got);
490 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
491 }
492
493 #[tokio::test]
494 async fn balanced_tree_test_height_one() {
495 let store = &MemoryBlockStore::new();
496 let num_chunks = 3;
497 let degrees = 3;
498 let expect = build_expect(num_chunks, degrees).await;
499 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
500 tokio::pin!(got);
501 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
502 }
503
504 #[tokio::test]
505 async fn balanced_tree_test_height_two_full() {
506 let store = &MemoryBlockStore::new();
507 let degrees = 3;
508 let num_chunks = 9;
509 let expect = build_expect(num_chunks, degrees).await;
510 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
511 tokio::pin!(got);
512 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
513 }
514
515 #[tokio::test]
516 async fn balanced_tree_test_height_two_not_full() {
517 let store = &MemoryBlockStore::new();
518 let degrees = 3;
519 let num_chunks = 10;
520 let expect = build_expect(num_chunks, degrees).await;
521 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
522 tokio::pin!(got);
523 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
524 }
525
526 #[tokio::test]
527 async fn balanced_tree_test_height_three() {
528 let store = &MemoryBlockStore::new();
529 let num_chunks = 125;
530 let degrees = 5;
531 let expect = build_expect(num_chunks, degrees).await;
532 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
533 tokio::pin!(got);
534 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
535 }
536
537 #[tokio::test]
538 async fn balanced_tree_test_large() {
539 let store = &MemoryBlockStore::new();
540 let num_chunks = 780;
541 let degrees = 11;
542 let expect = build_expect(num_chunks, degrees).await;
543 let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees, store);
544 tokio::pin!(got);
545 ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
546 }
547}