use std::{
future::Future,
io::{self, Cursor},
result,
};
use bytes::Bytes;
pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
use smallvec::SmallVec;
pub use super::BaoContentItem;
use super::{combine_hash_pair, DecodeError};
use crate::{
blake3, hash_subtree,
io::{
error::EncodeError,
outboard::{PostOrderOutboard, PreOrderOutboard},
Leaf, Parent,
},
iter::{BaoChunk, ResponseIter},
parent_cv,
rec::{encode_selected_rec, truncate_ranges, truncate_ranges_owned},
BaoTree, BlockSize, ChunkRanges, ChunkRangesRef, TreeNode,
};
pub trait Outboard {
fn root(&self) -> blake3::Hash;
fn tree(&self) -> BaoTree;
fn load(
&mut self,
node: TreeNode,
) -> impl Future<Output = io::Result<Option<(blake3::Hash, blake3::Hash)>>>;
}
pub trait OutboardMut: Sized {
fn save(
&mut self,
node: TreeNode,
hash_pair: &(blake3::Hash, blake3::Hash),
) -> impl Future<Output = io::Result<()>>;
fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
}
pub trait CreateOutboard {
#[allow(async_fn_in_trait)]
async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
let size = data.size().await?;
Self::create_sized(Cursor::new(data), size, block_size).await
}
fn create_sized(
data: impl AsyncStreamReader,
size: u64,
block_size: BlockSize,
) -> impl Future<Output = io::Result<Self>>
where
Self: Default + Sized;
fn init_from(&mut self, data: impl AsyncStreamReader) -> impl Future<Output = io::Result<()>>;
}
impl<O: Outboard> Outboard for &mut O {
fn root(&self) -> blake3::Hash {
(**self).root()
}
fn tree(&self) -> BaoTree {
(**self).tree()
}
async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
(**self).load(node).await
}
}
impl<R: AsyncSliceReader> Outboard for PreOrderOutboard<R> {
fn root(&self) -> blake3::Hash {
self.root
}
fn tree(&self) -> BaoTree {
self.tree
}
async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
let Some(offset) = self.tree.pre_order_offset(node) else {
return Ok(None);
};
let offset = offset * 64;
let content = self.data.read_at(offset, 64).await?;
Ok(Some(if content.len() != 64 {
(blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
} else {
parse_hash_pair(content)?
}))
}
}
impl<O: OutboardMut> OutboardMut for &mut O {
async fn save(
&mut self,
node: TreeNode,
hash_pair: &(blake3::Hash, blake3::Hash),
) -> io::Result<()> {
(**self).save(node, hash_pair).await
}
async fn sync(&mut self) -> io::Result<()> {
(**self).sync().await
}
}
impl<W: AsyncSliceWriter> OutboardMut for PreOrderOutboard<W> {
async fn save(
&mut self,
node: TreeNode,
hash_pair: &(blake3::Hash, blake3::Hash),
) -> io::Result<()> {
let Some(offset) = self.tree.pre_order_offset(node) else {
return Ok(());
};
let offset = offset * 64;
let mut buf = [0u8; 64];
buf[..32].copy_from_slice(hash_pair.0.as_bytes());
buf[32..].copy_from_slice(hash_pair.1.as_bytes());
self.data.write_at(offset, &buf).await?;
Ok(())
}
async fn sync(&mut self) -> io::Result<()> {
self.data.sync().await
}
}
impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {
async fn save(
&mut self,
node: TreeNode,
hash_pair: &(blake3::Hash, blake3::Hash),
) -> io::Result<()> {
let Some(offset) = self.tree.post_order_offset(node) else {
return Ok(());
};
let offset = offset.value() * 64;
let mut buf = [0u8; 64];
buf[..32].copy_from_slice(hash_pair.0.as_bytes());
buf[32..].copy_from_slice(hash_pair.1.as_bytes());
self.data.write_at(offset, &buf).await?;
Ok(())
}
async fn sync(&mut self) -> io::Result<()> {
self.data.sync().await
}
}
impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
async fn create_sized(
data: impl AsyncStreamReader,
size: u64,
block_size: BlockSize,
) -> io::Result<Self>
where
Self: Default + Sized,
{
let mut res = Self {
tree: BaoTree::new(size, block_size),
..Self::default()
};
res.init_from(data).await?;
Ok(res)
}
async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this).await?;
this.root = root;
this.sync().await?;
Ok(())
}
}
impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
async fn create_sized(
data: impl AsyncStreamReader,
size: u64,
block_size: BlockSize,
) -> io::Result<Self>
where
Self: Default + Sized,
{
let mut res = Self {
tree: BaoTree::new(size, block_size),
..Self::default()
};
res.init_from(data).await?;
Ok(res)
}
async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this).await?;
this.root = root;
this.sync().await?;
Ok(())
}
}
impl<R: AsyncSliceReader> Outboard for PostOrderOutboard<R> {
fn root(&self) -> blake3::Hash {
self.root
}
fn tree(&self) -> BaoTree {
self.tree
}
async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
let Some(offset) = self.tree.post_order_offset(node) else {
return Ok(None);
};
let offset = offset.value() * 64;
let content = self.data.read_at(offset, 64).await?;
Ok(Some(if content.len() != 64 {
(blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
} else {
parse_hash_pair(content)?
}))
}
}
pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::Hash)> {
if buf.len() != 64 {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"hash pair must be 64 bytes",
));
}
let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
Ok((l_hash, r_hash))
}
#[derive(Debug)]
struct ResponseDecoderInner<R> {
iter: ResponseIter,
stack: SmallVec<[blake3::Hash; 10]>,
encoded: R,
}
impl<R> ResponseDecoderInner<R> {
fn new(tree: BaoTree, hash: blake3::Hash, ranges: ChunkRanges, encoded: R) -> Self {
let ranges = truncate_ranges_owned(ranges, tree.size());
let mut res = Self {
iter: ResponseIter::new(tree, ranges),
stack: SmallVec::new(),
encoded,
};
res.stack.push(hash);
res
}
}
#[derive(Debug)]
pub struct ResponseDecoder<R>(Box<ResponseDecoderInner<R>>);
#[derive(Debug)]
pub enum ResponseDecoderNext<R> {
More(
(
ResponseDecoder<R>,
std::result::Result<BaoContentItem, DecodeError>,
),
),
Done(R),
}
impl<R: AsyncStreamReader> ResponseDecoder<R> {
pub fn new(hash: blake3::Hash, ranges: ChunkRanges, tree: BaoTree, encoded: R) -> Self {
Self(Box::new(ResponseDecoderInner::new(
tree, hash, ranges, encoded,
)))
}
pub async fn next(mut self) -> ResponseDecoderNext<R> {
if let Some(chunk) = self.0.iter.next() {
let item = self.next0(chunk).await;
ResponseDecoderNext::More((self, item))
} else {
ResponseDecoderNext::Done(self.0.encoded)
}
}
pub fn finish(self) -> R {
self.0.encoded
}
pub fn tree(&self) -> BaoTree {
self.0.iter.tree()
}
pub fn hash(&self) -> &blake3::Hash {
&self.0.stack[0]
}
async fn next0(&mut self, chunk: BaoChunk) -> std::result::Result<BaoContentItem, DecodeError> {
Ok(match chunk {
BaoChunk::Parent {
is_root,
right,
left,
node,
..
} => {
let this = &mut self.0;
let buf = this
.encoded
.read::<64>()
.await
.map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
let pair @ (l_hash, r_hash) = read_parent(&buf);
let parent_hash = this.stack.pop().unwrap();
let actual = parent_cv(&l_hash, &r_hash, is_root);
if right {
this.stack.push(r_hash);
}
if left {
this.stack.push(l_hash);
}
if parent_hash != actual {
return Err(DecodeError::ParentHashMismatch(node));
}
Parent { pair, node }.into()
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
let this = &mut self.0;
let data = this
.encoded
.read_bytes_exact(size)
.await
.map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
let leaf_hash = this.stack.pop().unwrap();
let actual = hash_subtree(start_chunk.0, &data, is_root);
if leaf_hash != actual {
return Err(DecodeError::LeafHashMismatch(start_chunk));
}
Leaf {
offset: start_chunk.to_bytes(),
data,
}
.into()
}
})
}
}
pub async fn encode_ranges<D, O, W>(
mut data: D,
mut outboard: O,
ranges: &ChunkRangesRef,
encoded: W,
) -> result::Result<(), EncodeError>
where
D: AsyncSliceReader,
O: Outboard,
W: AsyncStreamWriter,
{
let mut encoded = encoded;
let tree = outboard.tree();
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent { node, .. } => {
let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
let pair = combine_hash_pair(&l_hash, &r_hash);
encoded
.write(&pair)
.await
.map_err(|e| EncodeError::maybe_parent_write(e, node))?;
}
BaoChunk::Leaf {
start_chunk, size, ..
} => {
let start = start_chunk.to_bytes();
let bytes = data.read_exact_at(start, size).await?;
encoded
.write_bytes(bytes)
.await
.map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
}
}
}
Ok(())
}
pub async fn encode_ranges_validated<D, O, W>(
mut data: D,
mut outboard: O,
ranges: &ChunkRangesRef,
encoded: W,
) -> result::Result<(), EncodeError>
where
D: AsyncSliceReader,
O: Outboard,
W: AsyncStreamWriter,
{
let mut out_buf = Vec::new();
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
stack.push(outboard.root());
let mut encoded = encoded;
let tree = outboard.tree();
let ranges = truncate_ranges(ranges, tree.size());
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent {
is_root,
left,
right,
node,
..
} => {
let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
let actual = parent_cv(&l_hash, &r_hash, is_root);
let expected = stack.pop().unwrap();
if actual != expected {
return Err(EncodeError::ParentHashMismatch(node));
}
if right {
stack.push(r_hash);
}
if left {
stack.push(l_hash);
}
let pair = combine_hash_pair(&l_hash, &r_hash);
encoded
.write(&pair)
.await
.map_err(|e| EncodeError::maybe_parent_write(e, node))?;
}
BaoChunk::Leaf {
start_chunk,
size,
is_root,
ranges,
..
} => {
let expected = stack.pop().unwrap();
let start = start_chunk.to_bytes();
let bytes = data.read_exact_at(start, size).await?;
let (actual, to_write) = if !ranges.is_all() {
out_buf.clear();
let actual = encode_selected_rec(
start_chunk,
&bytes,
is_root,
ranges,
tree.block_size.to_u32(),
true,
&mut out_buf,
);
(actual, out_buf.clone().into())
} else {
let actual = hash_subtree(start_chunk.0, &bytes, is_root);
(actual, bytes)
};
if actual != expected {
return Err(EncodeError::LeafHashMismatch(start_chunk));
}
encoded
.write_bytes(to_write)
.await
.map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
}
}
}
Ok(())
}
pub async fn decode_ranges<R, O, W>(
encoded: R,
ranges: ChunkRanges,
mut target: W,
mut outboard: O,
) -> std::result::Result<(), DecodeError>
where
O: OutboardMut + Outboard,
R: AsyncStreamReader,
W: AsyncSliceWriter,
{
let mut reading = ResponseDecoder::new(outboard.root(), ranges, outboard.tree(), encoded);
loop {
let item = match reading.next().await {
ResponseDecoderNext::Done(_reader) => break,
ResponseDecoderNext::More((next, item)) => {
reading = next;
item?
}
};
match item {
BaoContentItem::Parent(Parent { node, pair }) => {
outboard.save(node, &pair).await?;
}
BaoContentItem::Leaf(Leaf { offset, data }) => {
target.write_bytes_at(offset, data).await?;
}
}
}
Ok(())
}
fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..64]).unwrap());
(l_hash, r_hash)
}
pub async fn outboard(
data: impl AsyncStreamReader,
tree: BaoTree,
mut outboard: impl OutboardMut,
) -> io::Result<blake3::Hash> {
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?;
Ok(hash)
}
async fn outboard_impl(
tree: BaoTree,
mut data: impl AsyncStreamReader,
mut outboard: impl OutboardMut,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
match item {
BaoChunk::Parent { is_root, node, .. } => {
let right_hash = stack.pop().unwrap();
let left_hash = stack.pop().unwrap();
outboard.save(node, &(left_hash, right_hash)).await?;
let parent = parent_cv(&left_hash, &right_hash, is_root);
stack.push(parent);
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
let buf = data.read_bytes_exact(size).await?;
let hash = hash_subtree(start_chunk.0, &buf, is_root);
stack.push(hash);
}
}
}
debug_assert_eq!(stack.len(), 1);
let hash = stack.pop().unwrap();
Ok(hash)
}
pub async fn outboard_post_order(
data: impl AsyncStreamReader,
tree: BaoTree,
mut outboard: impl AsyncStreamWriter,
) -> io::Result<blake3::Hash> {
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?;
Ok(hash)
}
async fn outboard_post_order_impl(
tree: BaoTree,
mut data: impl AsyncStreamReader,
mut outboard: impl AsyncStreamWriter,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
match item {
BaoChunk::Parent { is_root, .. } => {
let right_hash = stack.pop().unwrap();
let left_hash = stack.pop().unwrap();
outboard.write(left_hash.as_bytes()).await?;
outboard.write(right_hash.as_bytes()).await?;
let parent = parent_cv(&left_hash, &right_hash, is_root);
stack.push(parent);
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
let buf = data.read_bytes_exact(size).await?;
let hash = hash_subtree(start_chunk.0, &buf, is_root);
stack.push(hash);
}
}
}
debug_assert_eq!(stack.len(), 1);
let hash = stack.pop().unwrap();
Ok(hash)
}
pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
let tree = from.tree();
for node in tree.pre_order_nodes_iter() {
if let Some(hash_pair) = from.load(node).await? {
to.save(node, &hash_pair).await?;
}
}
Ok(())
}
#[cfg(feature = "validate")]
mod validate {
use std::{io, ops::Range};
use futures_lite::{FutureExt, Stream};
use genawaiter::sync::{Co, Gen};
use iroh_io::AsyncSliceReader;
use super::Outboard;
use crate::{
blake3, hash_subtree, io::LocalBoxFuture, parent_cv, rec::truncate_ranges, split, BaoTree,
ChunkNum, ChunkRangesRef, TreeNode,
};
pub fn valid_ranges<'a, O, D>(
outboard: O,
data: D,
ranges: &'a ChunkRangesRef,
) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
where
O: Outboard + 'a,
D: AsyncSliceReader + 'a,
{
Gen::new(move |co| async move {
if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
{
co.yield_(Err(cause)).await;
}
})
}
struct RecursiveDataValidator<'a, O: Outboard, D: AsyncSliceReader> {
tree: BaoTree,
shifted_filled_size: TreeNode,
outboard: O,
data: D,
co: &'a Co<io::Result<Range<ChunkNum>>>,
}
impl<O: Outboard, D: AsyncSliceReader> RecursiveDataValidator<'_, O, D> {
async fn validate(
outboard: O,
data: D,
ranges: &ChunkRangesRef,
co: &Co<io::Result<Range<ChunkNum>>>,
) -> io::Result<()> {
let tree = outboard.tree();
if tree.blocks() == 1 {
let mut data = data;
let data = data
.read_exact_at(0, tree.size().try_into().unwrap())
.await?;
let actual = hash_subtree(0, &data, true);
if actual == outboard.root() {
co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
}
return Ok(());
}
let ranges = truncate_ranges(ranges, tree.size());
let root_hash = outboard.root();
let (shifted_root, shifted_filled_size) = tree.shifted();
let mut validator = RecursiveDataValidator {
tree,
shifted_filled_size,
outboard,
data,
co,
};
validator
.validate_rec(&root_hash, shifted_root, true, ranges)
.await
}
async fn yield_if_valid(
&mut self,
range: Range<u64>,
hash: &blake3::Hash,
is_root: bool,
) -> io::Result<()> {
let len = (range.end - range.start).try_into().unwrap();
let data = self.data.read_exact_at(range.start, len).await?;
let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, &data, is_root);
if &actual == hash {
self.co
.yield_(Ok(
ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
))
.await;
}
io::Result::Ok(())
}
fn validate_rec<'b>(
&'b mut self,
parent_hash: &'b blake3::Hash,
shifted: TreeNode,
is_root: bool,
ranges: &'b ChunkRangesRef,
) -> LocalBoxFuture<'b, io::Result<()>> {
async move {
if ranges.is_empty() {
return Ok(());
}
let node = shifted.subtract_block_size(self.tree.block_size.0);
let (l, m, r) = self.tree.leaf_byte_ranges3(node);
if !self.tree.is_relevant_for_outboard(node) {
self.yield_if_valid(l..r, parent_hash, is_root).await?;
return Ok(());
}
let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
return Ok(());
};
let actual = parent_cv(&l_hash, &r_hash, is_root);
if &actual != parent_hash {
return Ok(());
};
let (l_ranges, r_ranges) = split(ranges, node);
if shifted.is_leaf() {
if !l_ranges.is_empty() {
self.yield_if_valid(l..m, &l_hash, false).await?;
}
if !r_ranges.is_empty() {
self.yield_if_valid(m..r, &r_hash, false).await?;
}
} else {
let left = shifted.left_child().unwrap();
self.validate_rec(&l_hash, left, false, l_ranges).await?;
let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
self.validate_rec(&r_hash, right, false, r_ranges).await?;
}
Ok(())
}
.boxed_local()
}
}
pub fn valid_outboard_ranges<'a, O>(
outboard: O,
ranges: &'a ChunkRangesRef,
) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
where
O: Outboard + 'a,
{
Gen::new(move |co| async move {
if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
co.yield_(Err(cause)).await;
}
})
}
struct RecursiveOutboardValidator<'a, O: Outboard> {
tree: BaoTree,
shifted_filled_size: TreeNode,
outboard: O,
co: &'a Co<io::Result<Range<ChunkNum>>>,
}
impl<O: Outboard> RecursiveOutboardValidator<'_, O> {
async fn validate(
outboard: O,
ranges: &ChunkRangesRef,
co: &Co<io::Result<Range<ChunkNum>>>,
) -> io::Result<()> {
let tree = outboard.tree();
if tree.blocks() == 1 {
co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
return Ok(());
}
let ranges = truncate_ranges(ranges, tree.size());
let root_hash = outboard.root();
let (shifted_root, shifted_filled_size) = tree.shifted();
let mut validator = RecursiveOutboardValidator {
tree,
shifted_filled_size,
outboard,
co,
};
validator
.validate_rec(&root_hash, shifted_root, true, ranges)
.await
}
fn validate_rec<'b>(
&'b mut self,
parent_hash: &'b blake3::Hash,
shifted: TreeNode,
is_root: bool,
ranges: &'b ChunkRangesRef,
) -> LocalBoxFuture<'b, io::Result<()>> {
Box::pin(async move {
let yield_node_range = |range: Range<u64>| {
self.co.yield_(Ok(
ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
))
};
if ranges.is_empty() {
return Ok(());
}
let node = shifted.subtract_block_size(self.tree.block_size.0);
let (l, m, r) = self.tree.leaf_byte_ranges3(node);
if !self.tree.is_relevant_for_outboard(node) {
yield_node_range(l..r).await;
return Ok(());
}
let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
return Ok(());
};
let actual = parent_cv(&l_hash, &r_hash, is_root);
if &actual != parent_hash {
return Ok(());
};
let (l_ranges, r_ranges) = split(ranges, node);
if shifted.is_leaf() {
if !l_ranges.is_empty() {
yield_node_range(l..m).await;
}
if !r_ranges.is_empty() {
yield_node_range(m..r).await;
}
} else {
let left = shifted.left_child().unwrap();
self.validate_rec(&l_hash, left, false, l_ranges).await?;
let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
self.validate_rec(&r_hash, right, false, r_ranges).await?;
}
Ok(())
})
}
}
}
#[cfg(feature = "validate")]
pub use validate::{valid_outboard_ranges, valid_ranges};