bao_tree/io/
fsm.rs

1//! Async io, written in fsm style
2//!
3//! IO ops are written as async state machines that thread the state through the
4//! futures to avoid being encumbered by lifetimes.
5//!
6//! This makes them occasionally a bit verbose to use, but allows being generic
7//! without having to box the futures.
8//!
9//! The traits to perform async io are re-exported from
10//! [iroh-io](https://crates.io/crates/iroh-io).
11use std::{
12    future::Future,
13    io::{self, Cursor},
14    result,
15};
16
17use bytes::Bytes;
18pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
19use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
20use smallvec::SmallVec;
21
22pub use super::BaoContentItem;
23use super::{combine_hash_pair, DecodeError};
24use crate::{
25    blake3, hash_subtree,
26    io::{
27        error::EncodeError,
28        outboard::{PostOrderOutboard, PreOrderOutboard},
29        Leaf, Parent,
30    },
31    iter::{BaoChunk, ResponseIter},
32    parent_cv,
33    rec::{encode_selected_rec, truncate_ranges, truncate_ranges_owned},
34    BaoTree, BlockSize, ChunkRanges, ChunkRangesRef, TreeNode,
35};
36
37/// A binary merkle tree for blake3 hashes of a blob.
38///
39/// This trait contains information about the geometry of the tree, the root hash,
40/// and a method to load the hashes at a given node.
41///
42/// It is up to the implementor to decide how to store the hashes.
43///
44/// In the original bao crate, the hashes are stored in a file in pre order.
45/// This is implemented for a generic io object in [super::outboard::PreOrderOutboard]
46/// and for a memory region in [super::outboard::PreOrderMemOutboard].
47///
48/// For files that grow over time, it is more efficient to store the hashes in post order.
49/// This is implemented for a generic io object in [super::outboard::PostOrderOutboard]
50/// and for a memory region in [super::outboard::PostOrderMemOutboard].
51///
52/// If you use a different storage engine, you can implement this trait for it. E.g.
53/// you could store the hashes in a database and use the node number as the key.
54///
55/// The async version takes a mutable reference to load, not because it mutates
56/// the outboard (it doesn't), but to ensure that there is at most one outstanding
57/// load at a time.
58///
59/// Dropping the load future without polling it to completion is safe, but will
60/// possibly render the outboard unusable.
61pub trait Outboard {
62    /// The root hash
63    fn root(&self) -> blake3::Hash;
64    /// The tree. This contains the information about the size of the file and the block size.
65    fn tree(&self) -> BaoTree;
66    /// load the hash pair for a node
67    ///
68    /// This takes a &mut self not because it mutates the outboard (it doesn't),
69    /// but to ensure that there is only one outstanding load at a time.
70    fn load(
71        &mut self,
72        node: TreeNode,
73    ) -> impl Future<Output = io::Result<Option<(blake3::Hash, blake3::Hash)>>>;
74}
75
76/// A mutable outboard.
77///
78/// This trait provides a way to save a hash pair for a node and to set the
79/// length of the data file.
80///
81/// This trait can be used to incrementally save an outboard when receiving data.
82/// If you want to just ignore outboard data, there is a special placeholder outboard
83/// implementation [super::outboard::EmptyOutboard].
84pub trait OutboardMut: Sized {
85    /// Save a hash pair for a node
86    fn save(
87        &mut self,
88        node: TreeNode,
89        hash_pair: &(blake3::Hash, blake3::Hash),
90    ) -> impl Future<Output = io::Result<()>>;
91
92    /// sync to disk
93    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
94}
95
96/// Convenience trait to initialize an outboard from a data source.
97///
98/// In complex real applications, you might want to do this manually.
99pub trait CreateOutboard {
100    /// Create an outboard from a seekable data source, measuring the size first.
101    ///
102    /// This requires the outboard to have a default implementation, which is
103    /// the case for the memory implementations.
104    #[allow(async_fn_in_trait)]
105    async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
106    where
107        Self: Default + Sized,
108    {
109        let size = data.size().await?;
110        Self::create_sized(Cursor::new(data), size, block_size).await
111    }
112
113    /// create an outboard from a data source. This requires the outboard to
114    /// have a default implementation, which is the case for the memory
115    /// implementations.
116    fn create_sized(
117        data: impl AsyncStreamReader,
118        size: u64,
119        block_size: BlockSize,
120    ) -> impl Future<Output = io::Result<Self>>
121    where
122        Self: Default + Sized;
123
124    /// Init the outboard from a data source. This will use the existing
125    /// tree and only init the data and set the root hash.
126    ///
127    /// So this can be used to initialize an outboard that does not have a default,
128    /// such as a file based one.
129    ///
130    /// It will only include data up the the current tree size.
131    fn init_from(&mut self, data: impl AsyncStreamReader) -> impl Future<Output = io::Result<()>>;
132}
133
134impl<O: Outboard> Outboard for &mut O {
135    fn root(&self) -> blake3::Hash {
136        (**self).root()
137    }
138
139    fn tree(&self) -> BaoTree {
140        (**self).tree()
141    }
142
143    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
144        (**self).load(node).await
145    }
146}
147
148impl<R: AsyncSliceReader> Outboard for PreOrderOutboard<R> {
149    fn root(&self) -> blake3::Hash {
150        self.root
151    }
152
153    fn tree(&self) -> BaoTree {
154        self.tree
155    }
156
157    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
158        let Some(offset) = self.tree.pre_order_offset(node) else {
159            return Ok(None);
160        };
161        let offset = offset * 64;
162        let content = self.data.read_at(offset, 64).await?;
163        Ok(Some(if content.len() != 64 {
164            (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
165        } else {
166            parse_hash_pair(content)?
167        }))
168    }
169}
170
171impl<O: OutboardMut> OutboardMut for &mut O {
172    async fn save(
173        &mut self,
174        node: TreeNode,
175        hash_pair: &(blake3::Hash, blake3::Hash),
176    ) -> io::Result<()> {
177        (**self).save(node, hash_pair).await
178    }
179
180    async fn sync(&mut self) -> io::Result<()> {
181        (**self).sync().await
182    }
183}
184
185impl<W: AsyncSliceWriter> OutboardMut for PreOrderOutboard<W> {
186    async fn save(
187        &mut self,
188        node: TreeNode,
189        hash_pair: &(blake3::Hash, blake3::Hash),
190    ) -> io::Result<()> {
191        let Some(offset) = self.tree.pre_order_offset(node) else {
192            return Ok(());
193        };
194        let offset = offset * 64;
195        let mut buf = [0u8; 64];
196        buf[..32].copy_from_slice(hash_pair.0.as_bytes());
197        buf[32..].copy_from_slice(hash_pair.1.as_bytes());
198        self.data.write_at(offset, &buf).await?;
199        Ok(())
200    }
201
202    async fn sync(&mut self) -> io::Result<()> {
203        self.data.sync().await
204    }
205}
206
207impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {
208    async fn save(
209        &mut self,
210        node: TreeNode,
211        hash_pair: &(blake3::Hash, blake3::Hash),
212    ) -> io::Result<()> {
213        let Some(offset) = self.tree.post_order_offset(node) else {
214            return Ok(());
215        };
216        let offset = offset.value() * 64;
217        let mut buf = [0u8; 64];
218        buf[..32].copy_from_slice(hash_pair.0.as_bytes());
219        buf[32..].copy_from_slice(hash_pair.1.as_bytes());
220        self.data.write_at(offset, &buf).await?;
221        Ok(())
222    }
223
224    async fn sync(&mut self) -> io::Result<()> {
225        self.data.sync().await
226    }
227}
228
229impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
230    async fn create_sized(
231        data: impl AsyncStreamReader,
232        size: u64,
233        block_size: BlockSize,
234    ) -> io::Result<Self>
235    where
236        Self: Default + Sized,
237    {
238        let mut res = Self {
239            tree: BaoTree::new(size, block_size),
240            ..Self::default()
241        };
242        res.init_from(data).await?;
243        Ok(res)
244    }
245
246    async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
247        let mut this = self;
248        let root = outboard(data, this.tree, &mut this).await?;
249        this.root = root;
250        this.sync().await?;
251        Ok(())
252    }
253}
254
255impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
256    async fn create_sized(
257        data: impl AsyncStreamReader,
258        size: u64,
259        block_size: BlockSize,
260    ) -> io::Result<Self>
261    where
262        Self: Default + Sized,
263    {
264        let mut res = Self {
265            tree: BaoTree::new(size, block_size),
266            ..Self::default()
267        };
268        res.init_from(data).await?;
269        Ok(res)
270    }
271
272    async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
273        let mut this = self;
274        let root = outboard(data, this.tree, &mut this).await?;
275        this.root = root;
276        this.sync().await?;
277        Ok(())
278    }
279}
280
281impl<R: AsyncSliceReader> Outboard for PostOrderOutboard<R> {
282    fn root(&self) -> blake3::Hash {
283        self.root
284    }
285
286    fn tree(&self) -> BaoTree {
287        self.tree
288    }
289
290    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
291        let Some(offset) = self.tree.post_order_offset(node) else {
292            return Ok(None);
293        };
294        let offset = offset.value() * 64;
295        let content = self.data.read_at(offset, 64).await?;
296        Ok(Some(if content.len() != 64 {
297            (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
298        } else {
299            parse_hash_pair(content)?
300        }))
301    }
302}
303
304pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::Hash)> {
305    if buf.len() != 64 {
306        return Err(io::Error::new(
307            io::ErrorKind::UnexpectedEof,
308            "hash pair must be 64 bytes",
309        ));
310    }
311    let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
312    let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
313    Ok((l_hash, r_hash))
314}
315
316#[derive(Debug)]
317struct ResponseDecoderInner<R> {
318    iter: ResponseIter,
319    stack: SmallVec<[blake3::Hash; 10]>,
320    encoded: R,
321}
322
323impl<R> ResponseDecoderInner<R> {
324    fn new(tree: BaoTree, hash: blake3::Hash, ranges: ChunkRanges, encoded: R) -> Self {
325        // now that we know the size, we can canonicalize the ranges
326        let ranges = truncate_ranges_owned(ranges, tree.size());
327        let mut res = Self {
328            iter: ResponseIter::new(tree, ranges),
329            stack: SmallVec::new(),
330            encoded,
331        };
332        res.stack.push(hash);
333        res
334    }
335}
336
337/// Response decoder
338#[derive(Debug)]
339pub struct ResponseDecoder<R>(Box<ResponseDecoderInner<R>>);
340
341/// Next type for ResponseDecoder.
342#[derive(Debug)]
343pub enum ResponseDecoderNext<R> {
344    /// One more item, and you get back the state machine in the next state
345    More(
346        (
347            ResponseDecoder<R>,
348            std::result::Result<BaoContentItem, DecodeError>,
349        ),
350    ),
351    /// The stream is done, you get back the underlying reader
352    Done(R),
353}
354
355impl<R: AsyncStreamReader> ResponseDecoder<R> {
356    /// Create a new response decoder state machine, when you have already read the size.
357    ///
358    /// The size as well as the chunk size is given in the `tree` parameter.
359    pub fn new(hash: blake3::Hash, ranges: ChunkRanges, tree: BaoTree, encoded: R) -> Self {
360        Self(Box::new(ResponseDecoderInner::new(
361            tree, hash, ranges, encoded,
362        )))
363    }
364
365    /// Proceed to the next state by reading the next chunk from the stream.
366    pub async fn next(mut self) -> ResponseDecoderNext<R> {
367        if let Some(chunk) = self.0.iter.next() {
368            let item = self.next0(chunk).await;
369            ResponseDecoderNext::More((self, item))
370        } else {
371            ResponseDecoderNext::Done(self.0.encoded)
372        }
373    }
374
375    /// Immediately return the underlying reader
376    pub fn finish(self) -> R {
377        self.0.encoded
378    }
379
380    /// The tree geometry
381    pub fn tree(&self) -> BaoTree {
382        self.0.iter.tree()
383    }
384
385    /// Hash of the blob we are currently getting
386    pub fn hash(&self) -> &blake3::Hash {
387        &self.0.stack[0]
388    }
389
390    async fn next0(&mut self, chunk: BaoChunk) -> std::result::Result<BaoContentItem, DecodeError> {
391        Ok(match chunk {
392            BaoChunk::Parent {
393                is_root,
394                right,
395                left,
396                node,
397                ..
398            } => {
399                let this = &mut self.0;
400                let buf = this
401                    .encoded
402                    .read::<64>()
403                    .await
404                    .map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
405                let pair @ (l_hash, r_hash) = read_parent(&buf);
406                let parent_hash = this.stack.pop().unwrap();
407                let actual = parent_cv(&l_hash, &r_hash, is_root);
408                // Push the children in reverse order so they are popped in the correct order
409                // only push right if the range intersects with the right child
410                if right {
411                    this.stack.push(r_hash);
412                }
413                // only push left if the range intersects with the left child
414                if left {
415                    this.stack.push(l_hash);
416                }
417                // Validate after pushing the children so that we could in principle continue
418                if parent_hash != actual {
419                    return Err(DecodeError::ParentHashMismatch(node));
420                }
421                Parent { pair, node }.into()
422            }
423            BaoChunk::Leaf {
424                size,
425                is_root,
426                start_chunk,
427                ..
428            } => {
429                // this will resize always to chunk group size, except for the last chunk
430                let this = &mut self.0;
431                let data = this
432                    .encoded
433                    .read_bytes_exact(size)
434                    .await
435                    .map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
436                let leaf_hash = this.stack.pop().unwrap();
437                let actual = hash_subtree(start_chunk.0, &data, is_root);
438                if leaf_hash != actual {
439                    return Err(DecodeError::LeafHashMismatch(start_chunk));
440                }
441                Leaf {
442                    offset: start_chunk.to_bytes(),
443                    data,
444                }
445                .into()
446            }
447        })
448    }
449}
450
451/// Encode ranges relevant to a query from a reader and outboard to a writer
452///
453/// This will not validate on writing, so data corruption will be detected on reading
454///
455/// It is possible to encode ranges from a partial file and outboard.
456/// This will either succeed if the requested ranges are all present, or fail
457/// as soon as a range is missing.
458pub async fn encode_ranges<D, O, W>(
459    mut data: D,
460    mut outboard: O,
461    ranges: &ChunkRangesRef,
462    encoded: W,
463) -> result::Result<(), EncodeError>
464where
465    D: AsyncSliceReader,
466    O: Outboard,
467    W: AsyncStreamWriter,
468{
469    let mut encoded = encoded;
470    let tree = outboard.tree();
471    for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
472        match item {
473            BaoChunk::Parent { node, .. } => {
474                let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
475                let pair = combine_hash_pair(&l_hash, &r_hash);
476                encoded
477                    .write(&pair)
478                    .await
479                    .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
480            }
481            BaoChunk::Leaf {
482                start_chunk, size, ..
483            } => {
484                let start = start_chunk.to_bytes();
485                let bytes = data.read_exact_at(start, size).await?;
486                encoded
487                    .write_bytes(bytes)
488                    .await
489                    .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
490            }
491        }
492    }
493    Ok(())
494}
495
496/// Encode ranges relevant to a query from a reader and outboard to a writer
497///
498/// This function validates the data before writing
499///
500/// It is possible to encode ranges from a partial file and outboard.
501/// This will either succeed if the requested ranges are all present, or fail
502/// as soon as a range is missing.
503pub async fn encode_ranges_validated<D, O, W>(
504    mut data: D,
505    mut outboard: O,
506    ranges: &ChunkRangesRef,
507    encoded: W,
508) -> result::Result<(), EncodeError>
509where
510    D: AsyncSliceReader,
511    O: Outboard,
512    W: AsyncStreamWriter,
513{
514    // buffer for writing incomplete subtrees.
515    // for queries that don't have incomplete subtrees, this will never be used.
516    let mut out_buf = Vec::new();
517    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
518    stack.push(outboard.root());
519    let mut encoded = encoded;
520    let tree = outboard.tree();
521    let ranges = truncate_ranges(ranges, tree.size());
522    for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
523        match item {
524            BaoChunk::Parent {
525                is_root,
526                left,
527                right,
528                node,
529                ..
530            } => {
531                let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
532                let actual = parent_cv(&l_hash, &r_hash, is_root);
533                let expected = stack.pop().unwrap();
534                if actual != expected {
535                    return Err(EncodeError::ParentHashMismatch(node));
536                }
537                if right {
538                    stack.push(r_hash);
539                }
540                if left {
541                    stack.push(l_hash);
542                }
543                let pair = combine_hash_pair(&l_hash, &r_hash);
544                encoded
545                    .write(&pair)
546                    .await
547                    .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
548            }
549            BaoChunk::Leaf {
550                start_chunk,
551                size,
552                is_root,
553                ranges,
554                ..
555            } => {
556                let expected = stack.pop().unwrap();
557                let start = start_chunk.to_bytes();
558                let bytes = data.read_exact_at(start, size).await?;
559                let (actual, to_write) = if !ranges.is_all() {
560                    // we need to encode just a part of the data
561                    //
562                    // write into an out buffer to ensure we detect mismatches
563                    // before writing to the output.
564                    out_buf.clear();
565                    let actual = encode_selected_rec(
566                        start_chunk,
567                        &bytes,
568                        is_root,
569                        ranges,
570                        tree.block_size.to_u32(),
571                        true,
572                        &mut out_buf,
573                    );
574                    (actual, out_buf.clone().into())
575                } else {
576                    let actual = hash_subtree(start_chunk.0, &bytes, is_root);
577                    (actual, bytes)
578                };
579                if actual != expected {
580                    return Err(EncodeError::LeafHashMismatch(start_chunk));
581                }
582                encoded
583                    .write_bytes(to_write)
584                    .await
585                    .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
586            }
587        }
588    }
589    Ok(())
590}
591
592/// Decode a response into a file while updating an outboard.
593///
594/// If you do not want to update an outboard, use [super::outboard::EmptyOutboard] as
595/// the outboard.
596pub async fn decode_ranges<R, O, W>(
597    encoded: R,
598    ranges: ChunkRanges,
599    mut target: W,
600    mut outboard: O,
601) -> std::result::Result<(), DecodeError>
602where
603    O: OutboardMut + Outboard,
604    R: AsyncStreamReader,
605    W: AsyncSliceWriter,
606{
607    let mut reading = ResponseDecoder::new(outboard.root(), ranges, outboard.tree(), encoded);
608    loop {
609        let item = match reading.next().await {
610            ResponseDecoderNext::Done(_reader) => break,
611            ResponseDecoderNext::More((next, item)) => {
612                reading = next;
613                item?
614            }
615        };
616        match item {
617            BaoContentItem::Parent(Parent { node, pair }) => {
618                outboard.save(node, &pair).await?;
619            }
620            BaoContentItem::Leaf(Leaf { offset, data }) => {
621                target.write_bytes_at(offset, data).await?;
622            }
623        }
624    }
625    Ok(())
626}
627fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
628    let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
629    let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..64]).unwrap());
630    (l_hash, r_hash)
631}
632
633/// Compute the outboard for the given data.
634///
635/// Unlike [outboard_post_order], this will work with any outboard
636/// implementation, but it is not guaranteed that writes are sequential.
637pub async fn outboard(
638    data: impl AsyncStreamReader,
639    tree: BaoTree,
640    mut outboard: impl OutboardMut,
641) -> io::Result<blake3::Hash> {
642    let mut buffer = vec![0u8; tree.chunk_group_bytes()];
643    let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?;
644    Ok(hash)
645}
646
647/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
648async fn outboard_impl(
649    tree: BaoTree,
650    mut data: impl AsyncStreamReader,
651    mut outboard: impl OutboardMut,
652    buffer: &mut [u8],
653) -> io::Result<blake3::Hash> {
654    // do not allocate for small trees
655    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
656    debug_assert!(buffer.len() == tree.chunk_group_bytes());
657    for item in tree.post_order_chunks_iter() {
658        match item {
659            BaoChunk::Parent { is_root, node, .. } => {
660                let right_hash = stack.pop().unwrap();
661                let left_hash = stack.pop().unwrap();
662                outboard.save(node, &(left_hash, right_hash)).await?;
663                let parent = parent_cv(&left_hash, &right_hash, is_root);
664                stack.push(parent);
665            }
666            BaoChunk::Leaf {
667                size,
668                is_root,
669                start_chunk,
670                ..
671            } => {
672                let buf = data.read_bytes_exact(size).await?;
673                let hash = hash_subtree(start_chunk.0, &buf, is_root);
674                stack.push(hash);
675            }
676        }
677    }
678    debug_assert_eq!(stack.len(), 1);
679    let hash = stack.pop().unwrap();
680    Ok(hash)
681}
682
683/// Compute the post order outboard for the given data, writing into a io::Write
684///
685/// For the post order outboard, writes to the target are sequential.
686///
687/// This will not add the size to the output. You need to store it somewhere else
688/// or append it yourself.
689pub async fn outboard_post_order(
690    data: impl AsyncStreamReader,
691    tree: BaoTree,
692    mut outboard: impl AsyncStreamWriter,
693) -> io::Result<blake3::Hash> {
694    let mut buffer = vec![0u8; tree.chunk_group_bytes()];
695    let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?;
696    Ok(hash)
697}
698
699/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
700async fn outboard_post_order_impl(
701    tree: BaoTree,
702    mut data: impl AsyncStreamReader,
703    mut outboard: impl AsyncStreamWriter,
704    buffer: &mut [u8],
705) -> io::Result<blake3::Hash> {
706    // do not allocate for small trees
707    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
708    debug_assert!(buffer.len() == tree.chunk_group_bytes());
709    for item in tree.post_order_chunks_iter() {
710        match item {
711            BaoChunk::Parent { is_root, .. } => {
712                let right_hash = stack.pop().unwrap();
713                let left_hash = stack.pop().unwrap();
714                outboard.write(left_hash.as_bytes()).await?;
715                outboard.write(right_hash.as_bytes()).await?;
716                let parent = parent_cv(&left_hash, &right_hash, is_root);
717                stack.push(parent);
718            }
719            BaoChunk::Leaf {
720                size,
721                is_root,
722                start_chunk,
723                ..
724            } => {
725                let buf = data.read_bytes_exact(size).await?;
726                let hash = hash_subtree(start_chunk.0, &buf, is_root);
727                stack.push(hash);
728            }
729        }
730    }
731    debug_assert_eq!(stack.len(), 1);
732    let hash = stack.pop().unwrap();
733    Ok(hash)
734}
735
736/// Copy an outboard to another outboard.
737///
738/// This can be used to persist an in memory outboard or to change from
739/// pre-order to post-order.
740pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
741    let tree = from.tree();
742    for node in tree.pre_order_nodes_iter() {
743        if let Some(hash_pair) = from.load(node).await? {
744            to.save(node, &hash_pair).await?;
745        }
746    }
747    Ok(())
748}
749
750#[cfg(feature = "validate")]
751mod validate {
752    use std::{io, ops::Range};
753
754    use futures_lite::{FutureExt, Stream};
755    use genawaiter::sync::{Co, Gen};
756    use iroh_io::AsyncSliceReader;
757
758    use super::Outboard;
759    use crate::{
760        blake3, hash_subtree, io::LocalBoxFuture, parent_cv, rec::truncate_ranges, split, BaoTree,
761        ChunkNum, ChunkRangesRef, TreeNode,
762    };
763
764    /// Given a data file and an outboard, compute all valid ranges.
765    ///
766    /// This is not cheap since it recomputes the hashes for all chunks.
767    ///
768    /// To reduce the amount of work, you can specify a range you are interested in.
769    pub fn valid_ranges<'a, O, D>(
770        outboard: O,
771        data: D,
772        ranges: &'a ChunkRangesRef,
773    ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
774    where
775        O: Outboard + 'a,
776        D: AsyncSliceReader + 'a,
777    {
778        Gen::new(move |co| async move {
779            if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
780            {
781                co.yield_(Err(cause)).await;
782            }
783        })
784    }
785
786    struct RecursiveDataValidator<'a, O: Outboard, D: AsyncSliceReader> {
787        tree: BaoTree,
788        shifted_filled_size: TreeNode,
789        outboard: O,
790        data: D,
791        co: &'a Co<io::Result<Range<ChunkNum>>>,
792    }
793
794    impl<O: Outboard, D: AsyncSliceReader> RecursiveDataValidator<'_, O, D> {
795        async fn validate(
796            outboard: O,
797            data: D,
798            ranges: &ChunkRangesRef,
799            co: &Co<io::Result<Range<ChunkNum>>>,
800        ) -> io::Result<()> {
801            let tree = outboard.tree();
802            if tree.blocks() == 1 {
803                // special case for a tree that fits in one block / chunk group
804                let mut data = data;
805                let data = data
806                    .read_exact_at(0, tree.size().try_into().unwrap())
807                    .await?;
808                let actual = hash_subtree(0, &data, true);
809                if actual == outboard.root() {
810                    co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
811                }
812                return Ok(());
813            }
814            let ranges = truncate_ranges(ranges, tree.size());
815            let root_hash = outboard.root();
816            let (shifted_root, shifted_filled_size) = tree.shifted();
817            let mut validator = RecursiveDataValidator {
818                tree,
819                shifted_filled_size,
820                outboard,
821                data,
822                co,
823            };
824            validator
825                .validate_rec(&root_hash, shifted_root, true, ranges)
826                .await
827        }
828
829        async fn yield_if_valid(
830            &mut self,
831            range: Range<u64>,
832            hash: &blake3::Hash,
833            is_root: bool,
834        ) -> io::Result<()> {
835            let len = (range.end - range.start).try_into().unwrap();
836            let data = self.data.read_exact_at(range.start, len).await?;
837            // is_root is always false because the case of a single chunk group is handled before calling this function
838            let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, &data, is_root);
839            if &actual == hash {
840                // yield the left range
841                self.co
842                    .yield_(Ok(
843                        ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
844                    ))
845                    .await;
846            }
847            io::Result::Ok(())
848        }
849
850        fn validate_rec<'b>(
851            &'b mut self,
852            parent_hash: &'b blake3::Hash,
853            shifted: TreeNode,
854            is_root: bool,
855            ranges: &'b ChunkRangesRef,
856        ) -> LocalBoxFuture<'b, io::Result<()>> {
857            async move {
858                if ranges.is_empty() {
859                    // this part of the tree is not of interest, so we can skip it
860                    return Ok(());
861                }
862                let node = shifted.subtract_block_size(self.tree.block_size.0);
863                let (l, m, r) = self.tree.leaf_byte_ranges3(node);
864                if !self.tree.is_relevant_for_outboard(node) {
865                    self.yield_if_valid(l..r, parent_hash, is_root).await?;
866                    return Ok(());
867                }
868                let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
869                    // outboard is incomplete, we can't validate
870                    return Ok(());
871                };
872                let actual = parent_cv(&l_hash, &r_hash, is_root);
873                if &actual != parent_hash {
874                    // hash mismatch, we can't validate
875                    return Ok(());
876                };
877                let (l_ranges, r_ranges) = split(ranges, node);
878                if shifted.is_leaf() {
879                    if !l_ranges.is_empty() {
880                        self.yield_if_valid(l..m, &l_hash, false).await?;
881                    }
882                    if !r_ranges.is_empty() {
883                        self.yield_if_valid(m..r, &r_hash, false).await?;
884                    }
885                } else {
886                    // recurse (we are in the domain of the shifted tree)
887                    let left = shifted.left_child().unwrap();
888                    self.validate_rec(&l_hash, left, false, l_ranges).await?;
889                    let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
890                    self.validate_rec(&r_hash, right, false, r_ranges).await?;
891                }
892                Ok(())
893            }
894            .boxed_local()
895        }
896    }
897
898    /// Given just an outboard, compute all valid ranges.
899    ///
900    /// This is not cheap since it recomputes the hashes for all chunks.
901    pub fn valid_outboard_ranges<'a, O>(
902        outboard: O,
903        ranges: &'a ChunkRangesRef,
904    ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
905    where
906        O: Outboard + 'a,
907    {
908        Gen::new(move |co| async move {
909            if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
910                co.yield_(Err(cause)).await;
911            }
912        })
913    }
914
915    struct RecursiveOutboardValidator<'a, O: Outboard> {
916        tree: BaoTree,
917        shifted_filled_size: TreeNode,
918        outboard: O,
919        co: &'a Co<io::Result<Range<ChunkNum>>>,
920    }
921
922    impl<O: Outboard> RecursiveOutboardValidator<'_, O> {
923        async fn validate(
924            outboard: O,
925            ranges: &ChunkRangesRef,
926            co: &Co<io::Result<Range<ChunkNum>>>,
927        ) -> io::Result<()> {
928            let tree = outboard.tree();
929            if tree.blocks() == 1 {
930                // special case for a tree that fits in one block / chunk group
931                co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
932                return Ok(());
933            }
934            let ranges = truncate_ranges(ranges, tree.size());
935            let root_hash = outboard.root();
936            let (shifted_root, shifted_filled_size) = tree.shifted();
937            let mut validator = RecursiveOutboardValidator {
938                tree,
939                shifted_filled_size,
940                outboard,
941                co,
942            };
943            validator
944                .validate_rec(&root_hash, shifted_root, true, ranges)
945                .await
946        }
947
948        fn validate_rec<'b>(
949            &'b mut self,
950            parent_hash: &'b blake3::Hash,
951            shifted: TreeNode,
952            is_root: bool,
953            ranges: &'b ChunkRangesRef,
954        ) -> LocalBoxFuture<'b, io::Result<()>> {
955            Box::pin(async move {
956                let yield_node_range = |range: Range<u64>| {
957                    self.co.yield_(Ok(
958                        ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
959                    ))
960                };
961                if ranges.is_empty() {
962                    // this part of the tree is not of interest, so we can skip it
963                    return Ok(());
964                }
965                let node = shifted.subtract_block_size(self.tree.block_size.0);
966                let (l, m, r) = self.tree.leaf_byte_ranges3(node);
967                if !self.tree.is_relevant_for_outboard(node) {
968                    yield_node_range(l..r).await;
969                    return Ok(());
970                }
971                let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
972                    // outboard is incomplete, we can't validate
973                    return Ok(());
974                };
975                let actual = parent_cv(&l_hash, &r_hash, is_root);
976                if &actual != parent_hash {
977                    // hash mismatch, we can't validate
978                    return Ok(());
979                };
980                let (l_ranges, r_ranges) = split(ranges, node);
981                if shifted.is_leaf() {
982                    if !l_ranges.is_empty() {
983                        yield_node_range(l..m).await;
984                    }
985                    if !r_ranges.is_empty() {
986                        yield_node_range(m..r).await;
987                    }
988                } else {
989                    // recurse (we are in the domain of the shifted tree)
990                    let left = shifted.left_child().unwrap();
991                    self.validate_rec(&l_hash, left, false, l_ranges).await?;
992                    let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
993                    self.validate_rec(&r_hash, right, false, r_ranges).await?;
994                }
995                Ok(())
996            })
997        }
998    }
999}
1000#[cfg(feature = "validate")]
1001pub use validate::{valid_outboard_ranges, valid_ranges};