use std::{
io::{self, Read, Seek, Write},
result,
};
use bytes::BytesMut;
pub use positioned_io::{ReadAt, Size, WriteAt};
use smallvec::SmallVec;
use super::{combine_hash_pair, BaoContentItem, DecodeError};
pub use crate::rec::truncate_ranges;
use crate::{
blake3, hash_subtree,
io::{
error::EncodeError,
outboard::{parse_hash_pair, PostOrderOutboard, PreOrderOutboard},
Leaf, Parent,
},
iter::{BaoChunk, ResponseIterRef},
parent_cv,
rec::encode_selected_rec,
BaoTree, BlockSize, ChunkRangesRef, TreeNode,
};
pub trait Outboard {
fn root(&self) -> blake3::Hash;
fn tree(&self) -> BaoTree;
fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>>;
}
pub trait OutboardMut: Sized {
fn save(&mut self, node: TreeNode, hash_pair: &(blake3::Hash, blake3::Hash)) -> io::Result<()>;
fn sync(&mut self) -> io::Result<()>;
}
pub trait CreateOutboard {
fn create(mut data: impl Read + Seek, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
let size = data.seek(io::SeekFrom::End(0))?;
data.rewind()?;
Self::create_sized(data, size, block_size)
}
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized;
fn init_from(&mut self, data: impl Read) -> io::Result<()>;
}
impl<O: OutboardMut> OutboardMut for &mut O {
fn save(&mut self, node: TreeNode, hash_pair: &(blake3::Hash, blake3::Hash)) -> io::Result<()> {
(**self).save(node, hash_pair)
}
fn sync(&mut self) -> io::Result<()> {
(**self).sync()
}
}
impl<O: Outboard> Outboard for &O {
fn root(&self) -> blake3::Hash {
(**self).root()
}
fn tree(&self) -> BaoTree {
(**self).tree()
}
fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
(**self).load(node)
}
}
impl<O: Outboard> Outboard for &mut O {
fn root(&self) -> blake3::Hash {
(**self).root()
}
fn tree(&self) -> BaoTree {
(**self).tree()
}
fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
(**self).load(node)
}
}
impl<R: ReadAt> Outboard for PreOrderOutboard<R> {
fn root(&self) -> blake3::Hash {
self.root
}
fn tree(&self) -> BaoTree {
self.tree
}
fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
let Some(offset) = self.tree.pre_order_offset(node) else {
return Ok(None);
};
let offset = offset * 64;
let mut content = [0u8; 64];
self.data.read_exact_at(offset, &mut content)?;
Ok(Some(parse_hash_pair(content)))
}
}
impl<W: WriteAt> OutboardMut for PreOrderOutboard<W> {
fn save(&mut self, node: TreeNode, hash_pair: &(blake3::Hash, blake3::Hash)) -> io::Result<()> {
let Some(offset) = self.tree.pre_order_offset(node) else {
return Ok(());
};
let offset = offset * 64;
let mut content = [0u8; 64];
content[0..32].copy_from_slice(hash_pair.0.as_bytes());
content[32..64].copy_from_slice(hash_pair.1.as_bytes());
self.data.write_all_at(offset, &content)?;
Ok(())
}
fn sync(&mut self) -> io::Result<()> {
self.data.flush()
}
}
impl<W: WriteAt> CreateOutboard for PreOrderOutboard<W> {
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
let tree = BaoTree::new(size, block_size);
let mut res = Self {
tree,
..Default::default()
};
res.init_from(data)?;
res.sync()?;
Ok(res)
}
fn init_from(&mut self, data: impl Read) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this)?;
this.root = root;
this.sync()?;
Ok(())
}
}
impl<W: WriteAt> CreateOutboard for PostOrderOutboard<W> {
fn create_sized(data: impl Read, size: u64, block_size: BlockSize) -> io::Result<Self>
where
Self: Default + Sized,
{
let tree = BaoTree::new(size, block_size);
let mut res = Self {
tree,
..Default::default()
};
res.init_from(data)?;
res.sync()?;
Ok(res)
}
fn init_from(&mut self, data: impl Read) -> io::Result<()> {
let mut this = self;
let root = outboard(data, this.tree, &mut this)?;
this.root = root;
this.sync()?;
Ok(())
}
}
impl<W: WriteAt> OutboardMut for PostOrderOutboard<W> {
fn save(&mut self, node: TreeNode, hash_pair: &(blake3::Hash, blake3::Hash)) -> io::Result<()> {
let Some(offset) = self.tree.post_order_offset(node) else {
return Ok(());
};
let offset = offset.value() * 64;
let mut content = [0u8; 64];
content[0..32].copy_from_slice(hash_pair.0.as_bytes());
content[32..64].copy_from_slice(hash_pair.1.as_bytes());
self.data.write_all_at(offset, &content)?;
Ok(())
}
fn sync(&mut self) -> io::Result<()> {
self.data.flush()
}
}
impl<R: ReadAt> Outboard for PostOrderOutboard<R> {
fn root(&self) -> blake3::Hash {
self.root
}
fn tree(&self) -> BaoTree {
self.tree
}
fn load(&self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
let Some(offset) = self.tree.post_order_offset(node) else {
return Ok(None);
};
let offset = offset.value() * 64;
let mut content = [0u8; 64];
self.data.read_exact_at(offset, &mut content)?;
Ok(Some(parse_hash_pair(content)))
}
}
#[derive(Debug)]
pub struct DecodeResponseIter<'a, R> {
inner: ResponseIterRef<'a>,
stack: SmallVec<[blake3::Hash; 10]>,
encoded: R,
buf: BytesMut,
}
impl<'a, R: Read> DecodeResponseIter<'a, R> {
pub fn new(root: blake3::Hash, tree: BaoTree, encoded: R, ranges: &'a ChunkRangesRef) -> Self {
let buf = BytesMut::with_capacity(tree.block_size().bytes());
Self::new_with_buffer(root, tree, encoded, ranges, buf)
}
pub fn new_with_buffer(
root: blake3::Hash,
tree: BaoTree,
encoded: R,
ranges: &'a ChunkRangesRef,
buf: BytesMut,
) -> Self {
let ranges = truncate_ranges(ranges, tree.size());
let mut stack = SmallVec::new();
stack.push(root);
Self {
stack,
inner: ResponseIterRef::new(tree, ranges),
encoded,
buf,
}
}
pub fn buffer(&self) -> &[u8] {
&self.buf
}
pub fn tree(&self) -> BaoTree {
self.inner.tree()
}
fn next0(&mut self) -> result::Result<Option<BaoContentItem>, DecodeError> {
match self.inner.next() {
Some(BaoChunk::Parent {
is_root,
left,
right,
node,
..
}) => {
let pair @ (l_hash, r_hash) = read_parent(&mut self.encoded)
.map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
let parent_hash = self.stack.pop().unwrap();
let actual = parent_cv(&l_hash, &r_hash, is_root);
if parent_hash != actual {
return Err(DecodeError::ParentHashMismatch(node));
}
if right {
self.stack.push(r_hash);
}
if left {
self.stack.push(l_hash);
}
Ok(Some(Parent { node, pair }.into()))
}
Some(BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
}) => {
self.buf.resize(size, 0);
self.encoded
.read_exact(&mut self.buf)
.map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
let actual = hash_subtree(start_chunk.0, &self.buf, is_root);
let leaf_hash = self.stack.pop().unwrap();
if leaf_hash != actual {
return Err(DecodeError::LeafHashMismatch(start_chunk));
}
Ok(Some(
Leaf {
offset: start_chunk.to_bytes(),
data: self.buf.split().freeze(),
}
.into(),
))
}
None => Ok(None),
}
}
}
impl<R: Read> Iterator for DecodeResponseIter<'_, R> {
type Item = result::Result<BaoContentItem, DecodeError>;
fn next(&mut self) -> Option<Self::Item> {
self.next0().transpose()
}
}
pub fn encode_ranges<D: ReadAt + Size, O: Outboard, W: Write>(
data: D,
outboard: O,
ranges: &ChunkRangesRef,
encoded: W,
) -> result::Result<(), EncodeError> {
let data = data;
let mut encoded = encoded;
let tree = outboard.tree();
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent { node, .. } => {
let (l_hash, r_hash) = outboard.load(node)?.unwrap();
let pair = combine_hash_pair(&l_hash, &r_hash);
encoded.write_all(&pair)?;
}
BaoChunk::Leaf {
start_chunk, size, ..
} => {
let start = start_chunk.to_bytes();
let buf = &mut buffer[..size];
data.read_exact_at(start, buf)?;
encoded.write_all(buf)?;
}
}
}
Ok(())
}
pub fn encode_ranges_validated<D: ReadAt, O: Outboard, W: Write>(
data: D,
outboard: O,
ranges: &ChunkRangesRef,
encoded: W,
) -> result::Result<(), EncodeError> {
if ranges.is_empty() {
return Ok(());
}
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
stack.push(outboard.root());
let data = data;
let mut encoded = encoded;
let tree = outboard.tree();
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
let mut out_buf = Vec::new();
let ranges = truncate_ranges(ranges, tree.size());
for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
match item {
BaoChunk::Parent {
is_root,
left,
right,
node,
..
} => {
let (l_hash, r_hash) = outboard.load(node)?.unwrap();
let actual = parent_cv(&l_hash, &r_hash, is_root);
let expected = stack.pop().unwrap();
if actual != expected {
return Err(EncodeError::ParentHashMismatch(node));
}
if right {
stack.push(r_hash);
}
if left {
stack.push(l_hash);
}
let pair = combine_hash_pair(&l_hash, &r_hash);
encoded.write_all(&pair)?;
}
BaoChunk::Leaf {
start_chunk,
size,
is_root,
ranges,
..
} => {
let expected = stack.pop().unwrap();
let start = start_chunk.to_bytes();
let buf = &mut buffer[..size];
data.read_exact_at(start, buf)?;
let (actual, to_write) = if !ranges.is_all() {
out_buf.clear();
let actual = encode_selected_rec(
start_chunk,
buf,
is_root,
ranges,
tree.block_size.to_u32(),
true,
&mut out_buf,
);
(actual, &out_buf[..])
} else {
let actual = hash_subtree(start_chunk.0, buf, is_root);
#[allow(clippy::redundant_slicing)]
(actual, &buf[..])
};
if actual != expected {
return Err(EncodeError::LeafHashMismatch(start_chunk));
}
encoded.write_all(to_write)?;
}
}
}
Ok(())
}
pub fn decode_ranges<R, O, W>(
encoded: R,
ranges: &ChunkRangesRef,
mut target: W,
mut outboard: O,
) -> std::result::Result<(), DecodeError>
where
O: OutboardMut + Outboard,
R: Read,
W: WriteAt,
{
let iter = DecodeResponseIter::new(outboard.root(), outboard.tree(), encoded, ranges);
for item in iter {
match item? {
BaoContentItem::Parent(Parent { node, pair }) => {
outboard.save(node, &pair)?;
}
BaoContentItem::Leaf(Leaf { offset, data }) => {
target.write_all_at(offset, &data)?;
}
}
}
Ok(())
}
pub fn outboard(
data: impl Read,
tree: BaoTree,
mut outboard: impl OutboardMut,
) -> io::Result<blake3::Hash> {
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
let hash = outboard_impl(tree, data, &mut outboard, &mut buffer)?;
Ok(hash)
}
fn outboard_impl(
tree: BaoTree,
mut data: impl Read,
mut outboard: impl OutboardMut,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
match item {
BaoChunk::Parent { is_root, node, .. } => {
let right_hash = stack.pop().unwrap();
let left_hash = stack.pop().unwrap();
outboard.save(node, &(left_hash, right_hash))?;
let parent = parent_cv(&left_hash, &right_hash, is_root);
stack.push(parent);
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
let buf = &mut buffer[..size];
data.read_exact(buf)?;
let hash = hash_subtree(start_chunk.0, buf, is_root);
stack.push(hash);
}
}
}
debug_assert_eq!(stack.len(), 1);
let hash = stack.pop().unwrap();
Ok(hash)
}
pub fn outboard_post_order(
data: impl Read,
tree: BaoTree,
mut outboard: impl Write,
) -> io::Result<blake3::Hash> {
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer)?;
Ok(hash)
}
fn outboard_post_order_impl(
tree: BaoTree,
mut data: impl Read,
mut outboard: impl Write,
buffer: &mut [u8],
) -> io::Result<blake3::Hash> {
let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
debug_assert!(buffer.len() == tree.chunk_group_bytes());
for item in tree.post_order_chunks_iter() {
match item {
BaoChunk::Parent { is_root, .. } => {
let right_hash = stack.pop().unwrap();
let left_hash = stack.pop().unwrap();
outboard.write_all(left_hash.as_bytes())?;
outboard.write_all(right_hash.as_bytes())?;
let parent = parent_cv(&left_hash, &right_hash, is_root);
stack.push(parent);
}
BaoChunk::Leaf {
size,
is_root,
start_chunk,
..
} => {
let buf = &mut buffer[..size];
data.read_exact(buf)?;
let hash = hash_subtree(start_chunk.0, buf, is_root);
stack.push(hash);
}
}
}
debug_assert_eq!(stack.len(), 1);
let hash = stack.pop().unwrap();
Ok(hash)
}
fn read_parent(mut from: impl Read) -> std::io::Result<(blake3::Hash, blake3::Hash)> {
let mut buf = [0; 64];
from.read_exact(&mut buf)?;
let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
Ok((l_hash, r_hash))
}
pub fn copy(from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
let tree = from.tree();
for node in tree.pre_order_nodes_iter() {
if let Some(hash_pair) = from.load(node)? {
to.save(node, &hash_pair)?;
}
}
Ok(())
}
#[cfg(feature = "validate")]
mod validate {
use std::{io, ops::Range};
use genawaiter::sync::{Co, Gen};
use positioned_io::ReadAt;
use super::Outboard;
use crate::{
blake3, hash_subtree, io::LocalBoxFuture, parent_cv, rec::truncate_ranges, split, BaoTree,
ChunkNum, ChunkRangesRef, TreeNode,
};
pub fn valid_ranges<'a, O, D>(
outboard: O,
data: D,
ranges: &'a ChunkRangesRef,
) -> impl IntoIterator<Item = io::Result<Range<ChunkNum>>> + 'a
where
O: Outboard + 'a,
D: ReadAt + 'a,
{
Gen::new(move |co| async move {
if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
{
co.yield_(Err(cause)).await;
}
})
}
struct RecursiveDataValidator<'a, O: Outboard, D: ReadAt> {
tree: BaoTree,
shifted_filled_size: TreeNode,
outboard: O,
data: D,
buffer: Vec<u8>,
co: &'a Co<io::Result<Range<ChunkNum>>>,
}
impl<O: Outboard, D: ReadAt> RecursiveDataValidator<'_, O, D> {
async fn validate(
outboard: O,
data: D,
ranges: &ChunkRangesRef,
co: &Co<io::Result<Range<ChunkNum>>>,
) -> io::Result<()> {
let tree = outboard.tree();
let mut buffer = vec![0u8; tree.chunk_group_bytes()];
if tree.blocks() == 1 {
let tmp = &mut buffer[..tree.size().try_into().unwrap()];
data.read_exact_at(0, tmp)?;
let actual = hash_subtree(0, tmp, true);
if actual == outboard.root() {
co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
}
return Ok(());
}
let ranges = truncate_ranges(ranges, tree.size());
let root_hash = outboard.root();
let (shifted_root, shifted_filled_size) = tree.shifted();
let mut validator = RecursiveDataValidator {
tree,
shifted_filled_size,
outboard,
data,
buffer,
co,
};
validator
.validate_rec(&root_hash, shifted_root, true, ranges)
.await
}
async fn yield_if_valid(
&mut self,
range: Range<u64>,
hash: &blake3::Hash,
is_root: bool,
) -> io::Result<()> {
let len = (range.end - range.start).try_into().unwrap();
let tmp = &mut self.buffer[..len];
self.data.read_exact_at(range.start, tmp)?;
let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, tmp, is_root);
if &actual == hash {
self.co
.yield_(Ok(
ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
))
.await;
}
io::Result::Ok(())
}
fn validate_rec<'b>(
&'b mut self,
parent_hash: &'b blake3::Hash,
shifted: TreeNode,
is_root: bool,
ranges: &'b ChunkRangesRef,
) -> LocalBoxFuture<'b, io::Result<()>> {
Box::pin(async move {
if ranges.is_empty() {
return Ok(());
}
let node = shifted.subtract_block_size(self.tree.block_size.0);
let (l, m, r) = self.tree.leaf_byte_ranges3(node);
if !self.tree.is_relevant_for_outboard(node) {
self.yield_if_valid(l..r, parent_hash, is_root).await?;
return Ok(());
}
let Some((l_hash, r_hash)) = self.outboard.load(node)? else {
return Ok(());
};
let actual = parent_cv(&l_hash, &r_hash, is_root);
if &actual != parent_hash {
return Ok(());
};
let (l_ranges, r_ranges) = split(ranges, node);
if shifted.is_leaf() {
if !l_ranges.is_empty() {
self.yield_if_valid(l..m, &l_hash, false).await?;
}
if !r_ranges.is_empty() {
self.yield_if_valid(m..r, &r_hash, false).await?;
}
} else {
let left = shifted.left_child().unwrap();
self.validate_rec(&l_hash, left, false, l_ranges).await?;
let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
self.validate_rec(&r_hash, right, false, r_ranges).await?;
}
Ok(())
})
}
}
pub fn valid_outboard_ranges<'a, O>(
outboard: O,
ranges: &'a ChunkRangesRef,
) -> impl IntoIterator<Item = io::Result<Range<ChunkNum>>> + 'a
where
O: Outboard + 'a,
{
Gen::new(move |co| async move {
if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
co.yield_(Err(cause)).await;
}
})
}
struct RecursiveOutboardValidator<'a, O: Outboard> {
tree: BaoTree,
shifted_filled_size: TreeNode,
outboard: O,
co: &'a Co<io::Result<Range<ChunkNum>>>,
}
impl<O: Outboard> RecursiveOutboardValidator<'_, O> {
async fn validate(
outboard: O,
ranges: &ChunkRangesRef,
co: &Co<io::Result<Range<ChunkNum>>>,
) -> io::Result<()> {
let tree = outboard.tree();
if tree.blocks() == 1 {
co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
return Ok(());
}
let ranges = truncate_ranges(ranges, tree.size());
let root_hash = outboard.root();
let (shifted_root, shifted_filled_size) = tree.shifted();
let mut validator = RecursiveOutboardValidator {
tree,
shifted_filled_size,
outboard,
co,
};
validator
.validate_rec(&root_hash, shifted_root, true, ranges)
.await
}
fn validate_rec<'b>(
&'b mut self,
parent_hash: &'b blake3::Hash,
shifted: TreeNode,
is_root: bool,
ranges: &'b ChunkRangesRef,
) -> LocalBoxFuture<'b, io::Result<()>> {
Box::pin(async move {
let yield_node_range = |range: Range<u64>| {
self.co.yield_(Ok(
ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
))
};
if ranges.is_empty() {
return Ok(());
}
let node = shifted.subtract_block_size(self.tree.block_size.0);
let (l, m, r) = self.tree.leaf_byte_ranges3(node);
if !self.tree.is_relevant_for_outboard(node) {
yield_node_range(l..r).await;
return Ok(());
}
let Some((l_hash, r_hash)) = self.outboard.load(node)? else {
return Ok(());
};
let actual = parent_cv(&l_hash, &r_hash, is_root);
if &actual != parent_hash {
return Ok(());
};
let (l_ranges, r_ranges) = split(ranges, node);
if shifted.is_leaf() {
if !l_ranges.is_empty() {
yield_node_range(l..m).await;
}
if !r_ranges.is_empty() {
yield_node_range(m..r).await;
}
} else {
let left = shifted.left_child().unwrap();
self.validate_rec(&l_hash, left, false, l_ranges).await?;
let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
self.validate_rec(&r_hash, right, false, r_ranges).await?;
}
Ok(())
})
}
}
}
#[cfg(feature = "validate")]
pub use validate::{valid_outboard_ranges, valid_ranges};