use std::collections::VecDeque;
use anyhow::Result;
use async_stream::try_stream;
use bytes::Bytes;
use cid::Cid;
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
use crate::builder::encode_unixfs_pb;
use crate::types::Block;
use crate::unixfs::{dag_pb, unixfs_pb, DataType, Node, UnixfsNode};
pub const DEFAULT_DEGREE: usize = 174;
#[derive(Debug, PartialEq, Eq)]
pub enum TreeBuilder {
Balanced { degree: usize },
}
impl TreeBuilder {
pub fn balanced_tree() -> Self {
Self::balanced_tree_with_degree(DEFAULT_DEGREE)
}
pub fn balanced_tree_with_degree(degree: usize) -> Self {
assert!(degree > 1);
TreeBuilder::Balanced { degree }
}
pub fn stream_tree(
&self,
chunks: impl Stream<Item = std::io::Result<Bytes>> + Send,
) -> impl Stream<Item = Result<Block>> {
match self {
TreeBuilder::Balanced { degree } => stream_balanced_tree(chunks, *degree),
}
}
}
#[derive(Clone, Debug, PartialEq)]
struct LinkInfo {
raw_data_len: u64,
encoded_len: u64,
}
fn stream_balanced_tree(
in_stream: impl Stream<Item = std::io::Result<Bytes>> + Send,
degree: usize,
) -> impl Stream<Item = Result<Block>> {
try_stream! {
let mut tree: VecDeque<Vec<(Cid, LinkInfo)>> = VecDeque::new();
tree.push_back(Vec::with_capacity(degree));
let hash_par: usize = 8;
let in_stream = in_stream.err_into::<anyhow::Error>().map(|chunk| {
tokio::task::spawn_blocking(|| {
chunk.and_then(|chunk| TreeNode::Leaf(chunk).encode())
}).err_into::<anyhow::Error>()
}).buffered(hash_par).map(|x| x.and_then(|x| x));
tokio::pin!(in_stream);
while let Some(chunk) = in_stream.next().await {
let ((block, link_info)) = chunk?;
let tree_len = tree.len();
if tree[0].len() == degree {
for i in 0..tree_len {
if tree[i].len() < degree {
break;
}
if i == tree_len - 1 {
tree.push_back(Vec::with_capacity(degree));
}
let links = std::mem::replace(&mut tree[i], Vec::with_capacity(degree));
let (block, link_info) = TreeNode::Stem(links).encode()?;
let cid = *block.cid();
yield block;
tree[i+1].push((cid, link_info));
}
}
tree[0].push((*block.cid(), link_info));
yield block;
}
if tree.len() == 1 && tree[0].len() == 1 {
return
}
while let Some(links) = tree.pop_front() {
let (block, link_info) = TreeNode::Stem(links).encode()?;
let cid = *block.cid();
yield block;
if let Some(front) = tree.front_mut() {
front.push((cid, link_info));
} else {
}
}
}
}
fn create_unixfs_node_from_links(links: Vec<(Cid, LinkInfo)>) -> Result<UnixfsNode> {
let blocksizes: Vec<u64> = links.iter().map(|l| l.1.raw_data_len).collect();
let filesize: u64 = blocksizes.iter().sum();
let links = links
.into_iter()
.map(|(cid, l)| dag_pb::PbLink {
hash: Some(cid.to_bytes()),
name: None,
tsize: Some(l.encoded_len),
})
.collect();
let inner = unixfs_pb::Data {
r#type: DataType::File as i32,
filesize: Some(filesize),
blocksizes,
..Default::default()
};
let outer = encode_unixfs_pb(&inner, links)?;
Ok(UnixfsNode::File(Node { inner, outer }))
}
enum TreeNode {
Leaf(Bytes),
Stem(Vec<(Cid, LinkInfo)>),
}
impl TreeNode {
fn encode(self) -> Result<(Block, LinkInfo)> {
match self {
TreeNode::Leaf(bytes) => {
let len = bytes.len();
let node = UnixfsNode::Raw(bytes);
let block = node.encode()?;
let link_info = LinkInfo {
raw_data_len: len as u64,
encoded_len: len as u64,
};
Ok((block, link_info))
}
TreeNode::Stem(links) => {
let mut encoded_len: u64 = links.iter().map(|(_, l)| l.encoded_len).sum();
let node = create_unixfs_node_from_links(links)?;
let block = node.encode()?;
encoded_len += block.data().len() as u64;
let raw_data_len = node
.filesize()
.expect("UnixfsNode::File will have a filesize");
Ok((
block,
LinkInfo {
raw_data_len,
encoded_len,
},
))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::BytesMut;
use futures::StreamExt;
const CHUNK_SIZE: u64 = std::mem::size_of::<usize>() as u64;
fn test_chunk_stream(num_chunks: usize) -> impl Stream<Item = std::io::Result<Bytes>> {
futures::stream::iter((0..num_chunks).map(|n| Ok(n.to_be_bytes().to_vec().into())))
}
async fn build_expect_tree(num_chunks: usize, degree: usize) -> Vec<Vec<Block>> {
let chunks = test_chunk_stream(num_chunks);
tokio::pin!(chunks);
let mut tree = vec![vec![]];
let mut links = vec![vec![]];
if num_chunks / degree == 0 {
let chunk = chunks.next().await.unwrap().unwrap();
let leaf = TreeNode::Leaf(chunk);
let (block, _) = leaf.encode().unwrap();
tree[0].push(block);
return tree;
}
while let Some(chunk) = chunks.next().await {
let chunk = chunk.unwrap();
let leaf = TreeNode::Leaf(chunk);
let (block, link_info) = leaf.encode().unwrap();
links[0].push((*block.cid(), link_info));
tree[0].push(block);
}
while tree.last().unwrap().len() > 1 {
let prev_layer = links.last().unwrap();
let count = prev_layer.len() / degree;
let mut tree_layer = Vec::with_capacity(count);
let mut links_layer = Vec::with_capacity(count);
for links in prev_layer.chunks(degree) {
let stem = TreeNode::Stem(links.to_vec());
let (block, link_info) = stem.encode().unwrap();
links_layer.push((*block.cid(), link_info));
tree_layer.push(block);
}
tree.push(tree_layer);
links.push(links_layer);
}
tree
}
async fn build_expect_vec_from_tree(
tree: Vec<Vec<Block>>,
num_chunks: usize,
degree: usize,
) -> Vec<Block> {
let mut out = vec![];
if num_chunks == 1 {
out.push(tree[0][0].clone());
return out;
}
let mut counts = vec![0; tree.len()];
for leaf in tree[0].iter() {
out.push(leaf.clone());
counts[0] += 1;
let mut push = counts[0] % degree == 0;
for (num_layer, count) in counts.iter_mut().enumerate() {
if num_layer == 0 {
continue;
}
if !push {
break;
}
out.push(tree[num_layer][*count].clone());
*count += 1;
if *count % degree != 0 {
push = false;
}
}
}
for (num_layer, count) in counts.into_iter().enumerate() {
if num_layer == 0 {
continue;
}
let layer = tree[num_layer].clone();
for node in layer.into_iter().skip(count) {
out.push(node);
}
}
out
}
async fn build_expect(num_chunks: usize, degree: usize) -> Vec<Block> {
let tree = build_expect_tree(num_chunks, degree).await;
println!("{tree:?}");
build_expect_vec_from_tree(tree, num_chunks, degree).await
}
fn make_leaf(data: usize) -> (Block, LinkInfo) {
TreeNode::Leaf(BytesMut::from(&data.to_be_bytes()[..]).freeze())
.encode()
.unwrap()
}
fn make_stem(links: Vec<(Cid, LinkInfo)>) -> (Block, LinkInfo) {
TreeNode::Stem(links).encode().unwrap()
}
#[tokio::test]
async fn test_build_expect() {
let (leaf_0, len_0) = make_leaf(0);
let (leaf_1, len_1) = make_leaf(1);
let (leaf_2, len_2) = make_leaf(2);
let (stem_0, stem_len_0) = make_stem(vec![
(*leaf_0.cid(), len_0),
(*leaf_1.cid(), len_1),
(*leaf_2.cid(), len_2),
]);
let (leaf_3, len_3) = make_leaf(3);
let (leaf_4, len_4) = make_leaf(4);
let (leaf_5, len_5) = make_leaf(5);
let (stem_1, stem_len_1) = make_stem(vec![
(*leaf_3.cid(), len_3),
(*leaf_4.cid(), len_4),
(*leaf_5.cid(), len_5),
]);
let (leaf_6, len_6) = make_leaf(6);
let (stem_2, stem_len_2) = make_stem(vec![(*leaf_6.cid(), len_6)]);
let (root, _root_len) = make_stem(vec![
(*stem_0.cid(), stem_len_0),
(*stem_1.cid(), stem_len_1),
(*stem_2.cid(), stem_len_2),
]);
let expect_tree = vec![
vec![
leaf_0.clone(),
leaf_1.clone(),
leaf_2.clone(),
leaf_3.clone(),
leaf_4.clone(),
leaf_5.clone(),
leaf_6.clone(),
],
vec![stem_0.clone(), stem_1.clone(), stem_2.clone()],
vec![root.clone()],
];
let got_tree = build_expect_tree(7, 3).await;
assert_eq!(expect_tree, got_tree);
let expect_vec = vec![
leaf_0, leaf_1, leaf_2, stem_0, leaf_3, leaf_4, leaf_5, stem_1, leaf_6, stem_2, root,
];
let got_vec = build_expect_vec_from_tree(got_tree, 7, 3).await;
assert_eq!(expect_vec, got_vec);
}
async fn ensure_equal(
expect: Vec<Block>,
got: impl Stream<Item = Result<Block>>,
expected_filesize: u64,
) {
let mut i = 0;
tokio::pin!(got);
let mut got_filesize = 0;
let mut expected_tsize = 0;
let mut got_tsize = 0;
while let Some(node) = got.next().await {
let (expect_cid, expect_bytes, _) = expect
.get(i)
.expect("too many nodes in balanced tree stream")
.clone()
.into_parts();
let node = node.expect("unexpected error in balanced tree stream");
let (got_cid, got_bytes, _) = node.into_parts();
let len = got_bytes.len() as u64;
println!("node index {i}");
assert_eq!(expect_cid, got_cid);
assert_eq!(expect_bytes, got_bytes);
i += 1;
let expect_node = UnixfsNode::decode(&expect_cid, expect_bytes.to_owned()).unwrap();
let got_node = UnixfsNode::decode(&got_cid, got_bytes.clone()).unwrap();
if let Some(DataType::File) = got_node.typ() {
assert_eq!(
got_node.filesize().unwrap(),
got_node.blocksizes().iter().sum::<u64>()
);
}
assert_eq!(expect_node, got_node);
if expect.len() == i {
let node = UnixfsNode::decode(&got_cid, got_bytes).unwrap();
got_tsize = node.links().map(|l| l.unwrap().tsize.unwrap()).sum();
got_filesize = got_node.filesize().unwrap();
} else {
expected_tsize += len;
}
}
if expect.len() != i {
panic!(
"expected at {} nodes of the stream, got {}",
expect.len(),
i
);
}
assert_eq!(expected_filesize, got_filesize);
assert_eq!(expected_tsize, got_tsize);
}
#[tokio::test]
async fn balanced_tree_test_leaf() {
let num_chunks = 1;
let expect = build_expect(num_chunks, 3).await;
let got = stream_balanced_tree(test_chunk_stream(1), 3);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
#[tokio::test]
async fn balanced_tree_test_height_one() {
let num_chunks = 3;
let degrees = 3;
let expect = build_expect(num_chunks, degrees).await;
let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
#[tokio::test]
async fn balanced_tree_test_height_two_full() {
let degrees = 3;
let num_chunks = 9;
let expect = build_expect(num_chunks, degrees).await;
let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
#[tokio::test]
async fn balanced_tree_test_height_two_not_full() {
let degrees = 3;
let num_chunks = 10;
let expect = build_expect(num_chunks, degrees).await;
let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
#[tokio::test]
async fn balanced_tree_test_height_three() {
let num_chunks = 125;
let degrees = 5;
let expect = build_expect(num_chunks, degrees).await;
let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
#[tokio::test]
async fn balanced_tree_test_large() {
let num_chunks = 780;
let degrees = 11;
let expect = build_expect(num_chunks, degrees).await;
let got = stream_balanced_tree(test_chunk_stream(num_chunks), degrees);
tokio::pin!(got);
ensure_equal(expect, got, num_chunks as u64 * CHUNK_SIZE).await;
}
}