bao_tree/io/
fsm.rs

1//! Async io, written in fsm style
2//!
3//! IO ops are written as async state machines that thread the state through the
4//! futures to avoid being encumbered by lifetimes.
5//!
6//! This makes them occasionally a bit verbose to use, but allows being generic
7//! without having to box the futures.
8//!
9//! The traits to perform async io are re-exported from
10//! [iroh-io](https://crates.io/crates/iroh-io).
11use std::{
12    future::Future,
13    io::{self, Cursor},
14    result,
15};
16
17use crate::{
18    blake3, hash_subtree,
19    iter::ResponseIter,
20    rec::{encode_selected_rec, truncate_ranges, truncate_ranges_owned},
21    ChunkRanges, ChunkRangesRef,
22};
23use blake3::guts::parent_cv;
24use bytes::Bytes;
25use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
26use smallvec::SmallVec;
27
28pub use super::BaoContentItem;
29use crate::{
30    io::{
31        error::EncodeError,
32        outboard::{PostOrderOutboard, PreOrderOutboard},
33        Leaf, Parent,
34    },
35    iter::BaoChunk,
36    BaoTree, BlockSize, TreeNode,
37};
38pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
39
40use super::{combine_hash_pair, DecodeError};
41
42/// A binary merkle tree for blake3 hashes of a blob.
43///
44/// This trait contains information about the geometry of the tree, the root hash,
45/// and a method to load the hashes at a given node.
46///
47/// It is up to the implementor to decide how to store the hashes.
48///
49/// In the original bao crate, the hashes are stored in a file in pre order.
50/// This is implemented for a generic io object in [super::outboard::PreOrderOutboard]
51/// and for a memory region in [super::outboard::PreOrderMemOutboard].
52///
53/// For files that grow over time, it is more efficient to store the hashes in post order.
54/// This is implemented for a generic io object in [super::outboard::PostOrderOutboard]
55/// and for a memory region in [super::outboard::PostOrderMemOutboard].
56///
57/// If you use a different storage engine, you can implement this trait for it. E.g.
58/// you could store the hashes in a database and use the node number as the key.
59///
60/// The async version takes a mutable reference to load, not because it mutates
61/// the outboard (it doesn't), but to ensure that there is at most one outstanding
62/// load at a time.
63///
64/// Dropping the load future without polling it to completion is safe, but will
65/// possibly render the outboard unusable.
66pub trait Outboard {
67    /// The root hash
68    fn root(&self) -> blake3::Hash;
69    /// The tree. This contains the information about the size of the file and the block size.
70    fn tree(&self) -> BaoTree;
71    /// load the hash pair for a node
72    ///
73    /// This takes a &mut self not because it mutates the outboard (it doesn't),
74    /// but to ensure that there is only one outstanding load at a time.
75    fn load(
76        &mut self,
77        node: TreeNode,
78    ) -> impl Future<Output = io::Result<Option<(blake3::Hash, blake3::Hash)>>>;
79}
80
81/// A mutable outboard.
82///
83/// This trait provides a way to save a hash pair for a node and to set the
84/// length of the data file.
85///
86/// This trait can be used to incrementally save an outboard when receiving data.
87/// If you want to just ignore outboard data, there is a special placeholder outboard
88/// implementation [super::outboard::EmptyOutboard].
89pub trait OutboardMut: Sized {
90    /// Save a hash pair for a node
91    fn save(
92        &mut self,
93        node: TreeNode,
94        hash_pair: &(blake3::Hash, blake3::Hash),
95    ) -> impl Future<Output = io::Result<()>>;
96
97    /// sync to disk
98    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
99}
100
101/// Convenience trait to initialize an outboard from a data source.
102///
103/// In complex real applications, you might want to do this manually.
104pub trait CreateOutboard {
105    /// Create an outboard from a seekable data source, measuring the size first.
106    ///
107    /// This requires the outboard to have a default implementation, which is
108    /// the case for the memory implementations.
109    #[allow(async_fn_in_trait)]
110    async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
111    where
112        Self: Default + Sized,
113    {
114        let size = data.size().await?;
115        Self::create_sized(Cursor::new(data), size, block_size).await
116    }
117
118    /// create an outboard from a data source. This requires the outboard to
119    /// have a default implementation, which is the case for the memory
120    /// implementations.
121    fn create_sized(
122        data: impl AsyncStreamReader,
123        size: u64,
124        block_size: BlockSize,
125    ) -> impl Future<Output = io::Result<Self>>
126    where
127        Self: Default + Sized;
128
129    /// Init the outboard from a data source. This will use the existing
130    /// tree and only init the data and set the root hash.
131    ///
132    /// So this can be used to initialize an outboard that does not have a default,
133    /// such as a file based one.
134    ///
135    /// It will only include data up the the current tree size.
136    fn init_from(&mut self, data: impl AsyncStreamReader) -> impl Future<Output = io::Result<()>>;
137}
138
139impl<'b, O: Outboard> Outboard for &'b mut O {
140    fn root(&self) -> blake3::Hash {
141        (**self).root()
142    }
143
144    fn tree(&self) -> BaoTree {
145        (**self).tree()
146    }
147
148    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
149        (**self).load(node).await
150    }
151}
152
153impl<R: AsyncSliceReader> Outboard for PreOrderOutboard<R> {
154    fn root(&self) -> blake3::Hash {
155        self.root
156    }
157
158    fn tree(&self) -> BaoTree {
159        self.tree
160    }
161
162    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
163        let Some(offset) = self.tree.pre_order_offset(node) else {
164            return Ok(None);
165        };
166        let offset = offset * 64;
167        let content = self.data.read_at(offset, 64).await?;
168        Ok(Some(if content.len() != 64 {
169            (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
170        } else {
171            parse_hash_pair(content)?
172        }))
173    }
174}
175
176impl<'b, O: OutboardMut> OutboardMut for &'b mut O {
177    async fn save(
178        &mut self,
179        node: TreeNode,
180        hash_pair: &(blake3::Hash, blake3::Hash),
181    ) -> io::Result<()> {
182        (**self).save(node, hash_pair).await
183    }
184
185    async fn sync(&mut self) -> io::Result<()> {
186        (**self).sync().await
187    }
188}
189
190impl<W: AsyncSliceWriter> OutboardMut for PreOrderOutboard<W> {
191    async fn save(
192        &mut self,
193        node: TreeNode,
194        hash_pair: &(blake3::Hash, blake3::Hash),
195    ) -> io::Result<()> {
196        let Some(offset) = self.tree.pre_order_offset(node) else {
197            return Ok(());
198        };
199        let offset = offset * 64;
200        let mut buf = [0u8; 64];
201        buf[..32].copy_from_slice(hash_pair.0.as_bytes());
202        buf[32..].copy_from_slice(hash_pair.1.as_bytes());
203        self.data.write_at(offset, &buf).await?;
204        Ok(())
205    }
206
207    async fn sync(&mut self) -> io::Result<()> {
208        self.data.sync().await
209    }
210}
211
212impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {
213    async fn save(
214        &mut self,
215        node: TreeNode,
216        hash_pair: &(blake3::Hash, blake3::Hash),
217    ) -> io::Result<()> {
218        let Some(offset) = self.tree.post_order_offset(node) else {
219            return Ok(());
220        };
221        let offset = offset.value() * 64;
222        let mut buf = [0u8; 64];
223        buf[..32].copy_from_slice(hash_pair.0.as_bytes());
224        buf[32..].copy_from_slice(hash_pair.1.as_bytes());
225        self.data.write_at(offset, &buf).await?;
226        Ok(())
227    }
228
229    async fn sync(&mut self) -> io::Result<()> {
230        self.data.sync().await
231    }
232}
233
234impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
235    async fn create_sized(
236        data: impl AsyncStreamReader,
237        size: u64,
238        block_size: BlockSize,
239    ) -> io::Result<Self>
240    where
241        Self: Default + Sized,
242    {
243        let mut res = Self {
244            tree: BaoTree::new(size, block_size),
245            ..Self::default()
246        };
247        res.init_from(data).await?;
248        Ok(res)
249    }
250
251    async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
252        let mut this = self;
253        let root = outboard(data, this.tree, &mut this).await?;
254        this.root = root;
255        this.sync().await?;
256        Ok(())
257    }
258}
259
260impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
261    async fn create_sized(
262        data: impl AsyncStreamReader,
263        size: u64,
264        block_size: BlockSize,
265    ) -> io::Result<Self>
266    where
267        Self: Default + Sized,
268    {
269        let mut res = Self {
270            tree: BaoTree::new(size, block_size),
271            ..Self::default()
272        };
273        res.init_from(data).await?;
274        Ok(res)
275    }
276
277    async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
278        let mut this = self;
279        let root = outboard(data, this.tree, &mut this).await?;
280        this.root = root;
281        this.sync().await?;
282        Ok(())
283    }
284}
285
286impl<R: AsyncSliceReader> Outboard for PostOrderOutboard<R> {
287    fn root(&self) -> blake3::Hash {
288        self.root
289    }
290
291    fn tree(&self) -> BaoTree {
292        self.tree
293    }
294
295    async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
296        let Some(offset) = self.tree.post_order_offset(node) else {
297            return Ok(None);
298        };
299        let offset = offset.value() * 64;
300        let content = self.data.read_at(offset, 64).await?;
301        Ok(Some(if content.len() != 64 {
302            (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
303        } else {
304            parse_hash_pair(content)?
305        }))
306    }
307}
308
309pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::Hash)> {
310    if buf.len() != 64 {
311        return Err(io::Error::new(
312            io::ErrorKind::UnexpectedEof,
313            "hash pair must be 64 bytes",
314        ));
315    }
316    let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
317    let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
318    Ok((l_hash, r_hash))
319}
320
321#[derive(Debug)]
322struct ResponseDecoderInner<R> {
323    iter: ResponseIter,
324    stack: SmallVec<[blake3::Hash; 10]>,
325    encoded: R,
326}
327
328impl<R> ResponseDecoderInner<R> {
329    fn new(tree: BaoTree, hash: blake3::Hash, ranges: ChunkRanges, encoded: R) -> Self {
330        // now that we know the size, we can canonicalize the ranges
331        let ranges = truncate_ranges_owned(ranges, tree.size());
332        let mut res = Self {
333            iter: ResponseIter::new(tree, ranges),
334            stack: SmallVec::new(),
335            encoded,
336        };
337        res.stack.push(hash);
338        res
339    }
340}
341
342/// Response decoder
343#[derive(Debug)]
344pub struct ResponseDecoder<R>(Box<ResponseDecoderInner<R>>);
345
346/// Next type for ResponseDecoder.
347#[derive(Debug)]
348pub enum ResponseDecoderNext<R> {
349    /// One more item, and you get back the state machine in the next state
350    More(
351        (
352            ResponseDecoder<R>,
353            std::result::Result<BaoContentItem, DecodeError>,
354        ),
355    ),
356    /// The stream is done, you get back the underlying reader
357    Done(R),
358}
359
360impl<R: AsyncStreamReader> ResponseDecoder<R> {
361    /// Create a new response decoder state machine, when you have already read the size.
362    ///
363    /// The size as well as the chunk size is given in the `tree` parameter.
364    pub fn new(hash: blake3::Hash, ranges: ChunkRanges, tree: BaoTree, encoded: R) -> Self {
365        Self(Box::new(ResponseDecoderInner::new(
366            tree, hash, ranges, encoded,
367        )))
368    }
369
370    /// Proceed to the next state by reading the next chunk from the stream.
371    pub async fn next(mut self) -> ResponseDecoderNext<R> {
372        if let Some(chunk) = self.0.iter.next() {
373            let item = self.next0(chunk).await;
374            ResponseDecoderNext::More((self, item))
375        } else {
376            ResponseDecoderNext::Done(self.0.encoded)
377        }
378    }
379
380    /// Immediately return the underlying reader
381    pub fn finish(self) -> R {
382        self.0.encoded
383    }
384
385    /// The tree geometry
386    pub fn tree(&self) -> BaoTree {
387        self.0.iter.tree()
388    }
389
390    /// Hash of the blob we are currently getting
391    pub fn hash(&self) -> &blake3::Hash {
392        &self.0.stack[0]
393    }
394
395    async fn next0(&mut self, chunk: BaoChunk) -> std::result::Result<BaoContentItem, DecodeError> {
396        Ok(match chunk {
397            BaoChunk::Parent {
398                is_root,
399                right,
400                left,
401                node,
402                ..
403            } => {
404                let this = &mut self.0;
405                let buf = this
406                    .encoded
407                    .read::<64>()
408                    .await
409                    .map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
410                let pair @ (l_hash, r_hash) = read_parent(&buf);
411                let parent_hash = this.stack.pop().unwrap();
412                let actual = parent_cv(&l_hash, &r_hash, is_root);
413                // Push the children in reverse order so they are popped in the correct order
414                // only push right if the range intersects with the right child
415                if right {
416                    this.stack.push(r_hash);
417                }
418                // only push left if the range intersects with the left child
419                if left {
420                    this.stack.push(l_hash);
421                }
422                // Validate after pushing the children so that we could in principle continue
423                if parent_hash != actual {
424                    return Err(DecodeError::ParentHashMismatch(node));
425                }
426                Parent { pair, node }.into()
427            }
428            BaoChunk::Leaf {
429                size,
430                is_root,
431                start_chunk,
432                ..
433            } => {
434                // this will resize always to chunk group size, except for the last chunk
435                let this = &mut self.0;
436                let data = this
437                    .encoded
438                    .read_bytes(size)
439                    .await
440                    .map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
441                let leaf_hash = this.stack.pop().unwrap();
442                let actual = hash_subtree(start_chunk.0, &data, is_root);
443                if leaf_hash != actual {
444                    return Err(DecodeError::LeafHashMismatch(start_chunk));
445                }
446                Leaf {
447                    offset: start_chunk.to_bytes(),
448                    data,
449                }
450                .into()
451            }
452        })
453    }
454}
455
456/// Encode ranges relevant to a query from a reader and outboard to a writer
457///
458/// This will not validate on writing, so data corruption will be detected on reading
459///
460/// It is possible to encode ranges from a partial file and outboard.
461/// This will either succeed if the requested ranges are all present, or fail
462/// as soon as a range is missing.
463pub async fn encode_ranges<D, O, W>(
464    mut data: D,
465    mut outboard: O,
466    ranges: &ChunkRangesRef,
467    encoded: W,
468) -> result::Result<(), EncodeError>
469where
470    D: AsyncSliceReader,
471    O: Outboard,
472    W: AsyncStreamWriter,
473{
474    let mut encoded = encoded;
475    let tree = outboard.tree();
476    for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
477        match item {
478            BaoChunk::Parent { node, .. } => {
479                let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
480                let pair = combine_hash_pair(&l_hash, &r_hash);
481                encoded
482                    .write(&pair)
483                    .await
484                    .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
485            }
486            BaoChunk::Leaf {
487                start_chunk, size, ..
488            } => {
489                let start = start_chunk.to_bytes();
490                let bytes = data.read_at(start, size).await?;
491                encoded
492                    .write_bytes(bytes)
493                    .await
494                    .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
495            }
496        }
497    }
498    Ok(())
499}
500
501/// Encode ranges relevant to a query from a reader and outboard to a writer
502///
503/// This function validates the data before writing
504///
505/// It is possible to encode ranges from a partial file and outboard.
506/// This will either succeed if the requested ranges are all present, or fail
507/// as soon as a range is missing.
508pub async fn encode_ranges_validated<D, O, W>(
509    mut data: D,
510    mut outboard: O,
511    ranges: &ChunkRangesRef,
512    encoded: W,
513) -> result::Result<(), EncodeError>
514where
515    D: AsyncSliceReader,
516    O: Outboard,
517    W: AsyncStreamWriter,
518{
519    // buffer for writing incomplete subtrees.
520    // for queries that don't have incomplete subtrees, this will never be used.
521    let mut out_buf = Vec::new();
522    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
523    stack.push(outboard.root());
524    let mut encoded = encoded;
525    let tree = outboard.tree();
526    let ranges = truncate_ranges(ranges, tree.size());
527    for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
528        match item {
529            BaoChunk::Parent {
530                is_root,
531                left,
532                right,
533                node,
534                ..
535            } => {
536                let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
537                let actual = parent_cv(&l_hash, &r_hash, is_root);
538                let expected = stack.pop().unwrap();
539                if actual != expected {
540                    return Err(EncodeError::ParentHashMismatch(node));
541                }
542                if right {
543                    stack.push(r_hash);
544                }
545                if left {
546                    stack.push(l_hash);
547                }
548                let pair = combine_hash_pair(&l_hash, &r_hash);
549                encoded
550                    .write(&pair)
551                    .await
552                    .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
553            }
554            BaoChunk::Leaf {
555                start_chunk,
556                size,
557                is_root,
558                ranges,
559                ..
560            } => {
561                let expected = stack.pop().unwrap();
562                let start = start_chunk.to_bytes();
563                let bytes = data.read_at(start, size).await?;
564                let (actual, to_write) = if !ranges.is_all() {
565                    // we need to encode just a part of the data
566                    //
567                    // write into an out buffer to ensure we detect mismatches
568                    // before writing to the output.
569                    out_buf.clear();
570                    let actual = encode_selected_rec(
571                        start_chunk,
572                        &bytes,
573                        is_root,
574                        ranges,
575                        tree.block_size.to_u32(),
576                        true,
577                        &mut out_buf,
578                    );
579                    (actual, out_buf.clone().into())
580                } else {
581                    let actual = hash_subtree(start_chunk.0, &bytes, is_root);
582                    (actual, bytes)
583                };
584                if actual != expected {
585                    return Err(EncodeError::LeafHashMismatch(start_chunk));
586                }
587                encoded
588                    .write_bytes(to_write)
589                    .await
590                    .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
591            }
592        }
593    }
594    Ok(())
595}
596
597/// Decode a response into a file while updating an outboard.
598///
599/// If you do not want to update an outboard, use [super::outboard::EmptyOutboard] as
600/// the outboard.
601pub async fn decode_ranges<R, O, W>(
602    encoded: R,
603    ranges: ChunkRanges,
604    mut target: W,
605    mut outboard: O,
606) -> std::result::Result<(), DecodeError>
607where
608    O: OutboardMut + Outboard,
609    R: AsyncStreamReader,
610    W: AsyncSliceWriter,
611{
612    let mut reading = ResponseDecoder::new(outboard.root(), ranges, outboard.tree(), encoded);
613    loop {
614        let item = match reading.next().await {
615            ResponseDecoderNext::Done(_reader) => break,
616            ResponseDecoderNext::More((next, item)) => {
617                reading = next;
618                item?
619            }
620        };
621        match item {
622            BaoContentItem::Parent(Parent { node, pair }) => {
623                outboard.save(node, &pair).await?;
624            }
625            BaoContentItem::Leaf(Leaf { offset, data }) => {
626                target.write_bytes_at(offset, data).await?;
627            }
628        }
629    }
630    Ok(())
631}
632fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
633    let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
634    let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..64]).unwrap());
635    (l_hash, r_hash)
636}
637
638/// Compute the outboard for the given data.
639///
640/// Unlike [outboard_post_order], this will work with any outboard
641/// implementation, but it is not guaranteed that writes are sequential.
642pub async fn outboard(
643    data: impl AsyncStreamReader,
644    tree: BaoTree,
645    mut outboard: impl OutboardMut,
646) -> io::Result<blake3::Hash> {
647    let mut buffer = vec![0u8; tree.chunk_group_bytes()];
648    let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?;
649    Ok(hash)
650}
651
652/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
653async fn outboard_impl(
654    tree: BaoTree,
655    mut data: impl AsyncStreamReader,
656    mut outboard: impl OutboardMut,
657    buffer: &mut [u8],
658) -> io::Result<blake3::Hash> {
659    // do not allocate for small trees
660    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
661    debug_assert!(buffer.len() == tree.chunk_group_bytes());
662    for item in tree.post_order_chunks_iter() {
663        match item {
664            BaoChunk::Parent { is_root, node, .. } => {
665                let right_hash = stack.pop().unwrap();
666                let left_hash = stack.pop().unwrap();
667                outboard.save(node, &(left_hash, right_hash)).await?;
668                let parent = parent_cv(&left_hash, &right_hash, is_root);
669                stack.push(parent);
670            }
671            BaoChunk::Leaf {
672                size,
673                is_root,
674                start_chunk,
675                ..
676            } => {
677                let buf = data.read_bytes(size).await?;
678                let hash = hash_subtree(start_chunk.0, &buf, is_root);
679                stack.push(hash);
680            }
681        }
682    }
683    debug_assert_eq!(stack.len(), 1);
684    let hash = stack.pop().unwrap();
685    Ok(hash)
686}
687
688/// Compute the post order outboard for the given data, writing into a io::Write
689///
690/// For the post order outboard, writes to the target are sequential.
691///
692/// This will not add the size to the output. You need to store it somewhere else
693/// or append it yourself.
694pub async fn outboard_post_order(
695    data: impl AsyncStreamReader,
696    tree: BaoTree,
697    mut outboard: impl AsyncStreamWriter,
698) -> io::Result<blake3::Hash> {
699    let mut buffer = vec![0u8; tree.chunk_group_bytes()];
700    let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?;
701    Ok(hash)
702}
703
704/// Internal helper for [outboard_post_order]. This takes a buffer of the chunk group size.
705async fn outboard_post_order_impl(
706    tree: BaoTree,
707    mut data: impl AsyncStreamReader,
708    mut outboard: impl AsyncStreamWriter,
709    buffer: &mut [u8],
710) -> io::Result<blake3::Hash> {
711    // do not allocate for small trees
712    let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
713    debug_assert!(buffer.len() == tree.chunk_group_bytes());
714    for item in tree.post_order_chunks_iter() {
715        match item {
716            BaoChunk::Parent { is_root, .. } => {
717                let right_hash = stack.pop().unwrap();
718                let left_hash = stack.pop().unwrap();
719                outboard.write(left_hash.as_bytes()).await?;
720                outboard.write(right_hash.as_bytes()).await?;
721                let parent = parent_cv(&left_hash, &right_hash, is_root);
722                stack.push(parent);
723            }
724            BaoChunk::Leaf {
725                size,
726                is_root,
727                start_chunk,
728                ..
729            } => {
730                let buf = data.read_bytes(size).await?;
731                let hash = hash_subtree(start_chunk.0, &buf, is_root);
732                stack.push(hash);
733            }
734        }
735    }
736    debug_assert_eq!(stack.len(), 1);
737    let hash = stack.pop().unwrap();
738    Ok(hash)
739}
740
741/// Copy an outboard to another outboard.
742///
743/// This can be used to persist an in memory outboard or to change from
744/// pre-order to post-order.
745pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
746    let tree = from.tree();
747    for node in tree.pre_order_nodes_iter() {
748        if let Some(hash_pair) = from.load(node).await? {
749            to.save(node, &hash_pair).await?;
750        }
751    }
752    Ok(())
753}
754
755#[cfg(feature = "validate")]
756mod validate {
757    use std::{io, ops::Range};
758
759    use futures_lite::{FutureExt, Stream};
760    use genawaiter::sync::{Co, Gen};
761    use iroh_io::AsyncSliceReader;
762
763    use crate::{
764        blake3, hash_subtree, io::LocalBoxFuture, rec::truncate_ranges, split, BaoTree, ChunkNum,
765        ChunkRangesRef, TreeNode,
766    };
767
768    use super::Outboard;
769
770    /// Given a data file and an outboard, compute all valid ranges.
771    ///
772    /// This is not cheap since it recomputes the hashes for all chunks.
773    ///
774    /// To reduce the amount of work, you can specify a range you are interested in.
775    pub fn valid_ranges<'a, O, D>(
776        outboard: O,
777        data: D,
778        ranges: &'a ChunkRangesRef,
779    ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
780    where
781        O: Outboard + 'a,
782        D: AsyncSliceReader + 'a,
783    {
784        Gen::new(move |co| async move {
785            if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
786            {
787                co.yield_(Err(cause)).await;
788            }
789        })
790    }
791
792    struct RecursiveDataValidator<'a, O: Outboard, D: AsyncSliceReader> {
793        tree: BaoTree,
794        shifted_filled_size: TreeNode,
795        outboard: O,
796        data: D,
797        co: &'a Co<io::Result<Range<ChunkNum>>>,
798    }
799
800    impl<'a, O: Outboard, D: AsyncSliceReader> RecursiveDataValidator<'a, O, D> {
801        async fn validate(
802            outboard: O,
803            data: D,
804            ranges: &ChunkRangesRef,
805            co: &Co<io::Result<Range<ChunkNum>>>,
806        ) -> io::Result<()> {
807            let tree = outboard.tree();
808            if tree.blocks() == 1 {
809                // special case for a tree that fits in one block / chunk group
810                let mut data = data;
811                let data = data.read_at(0, tree.size().try_into().unwrap()).await?;
812                let actual = hash_subtree(0, &data, true);
813                if actual == outboard.root() {
814                    co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
815                }
816                return Ok(());
817            }
818            let ranges = truncate_ranges(ranges, tree.size());
819            let root_hash = outboard.root();
820            let (shifted_root, shifted_filled_size) = tree.shifted();
821            let mut validator = RecursiveDataValidator {
822                tree,
823                shifted_filled_size,
824                outboard,
825                data,
826                co,
827            };
828            validator
829                .validate_rec(&root_hash, shifted_root, true, ranges)
830                .await
831        }
832
833        async fn yield_if_valid(
834            &mut self,
835            range: Range<u64>,
836            hash: &blake3::Hash,
837            is_root: bool,
838        ) -> io::Result<()> {
839            let len = (range.end - range.start).try_into().unwrap();
840            let data = self.data.read_at(range.start, len).await?;
841            // is_root is always false because the case of a single chunk group is handled before calling this function
842            let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, &data, is_root);
843            if &actual == hash {
844                // yield the left range
845                self.co
846                    .yield_(Ok(
847                        ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
848                    ))
849                    .await;
850            }
851            io::Result::Ok(())
852        }
853
854        fn validate_rec<'b>(
855            &'b mut self,
856            parent_hash: &'b blake3::Hash,
857            shifted: TreeNode,
858            is_root: bool,
859            ranges: &'b ChunkRangesRef,
860        ) -> LocalBoxFuture<'b, io::Result<()>> {
861            async move {
862                if ranges.is_empty() {
863                    // this part of the tree is not of interest, so we can skip it
864                    return Ok(());
865                }
866                let node = shifted.subtract_block_size(self.tree.block_size.0);
867                let (l, m, r) = self.tree.leaf_byte_ranges3(node);
868                if !self.tree.is_relevant_for_outboard(node) {
869                    self.yield_if_valid(l..r, parent_hash, is_root).await?;
870                    return Ok(());
871                }
872                let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
873                    // outboard is incomplete, we can't validate
874                    return Ok(());
875                };
876                let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root);
877                if &actual != parent_hash {
878                    // hash mismatch, we can't validate
879                    return Ok(());
880                };
881                let (l_ranges, r_ranges) = split(ranges, node);
882                if shifted.is_leaf() {
883                    if !l_ranges.is_empty() {
884                        self.yield_if_valid(l..m, &l_hash, false).await?;
885                    }
886                    if !r_ranges.is_empty() {
887                        self.yield_if_valid(m..r, &r_hash, false).await?;
888                    }
889                } else {
890                    // recurse (we are in the domain of the shifted tree)
891                    let left = shifted.left_child().unwrap();
892                    self.validate_rec(&l_hash, left, false, l_ranges).await?;
893                    let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
894                    self.validate_rec(&r_hash, right, false, r_ranges).await?;
895                }
896                Ok(())
897            }
898            .boxed_local()
899        }
900    }
901
902    /// Given just an outboard, compute all valid ranges.
903    ///
904    /// This is not cheap since it recomputes the hashes for all chunks.
905    pub fn valid_outboard_ranges<'a, O>(
906        outboard: O,
907        ranges: &'a ChunkRangesRef,
908    ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
909    where
910        O: Outboard + 'a,
911    {
912        Gen::new(move |co| async move {
913            if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
914                co.yield_(Err(cause)).await;
915            }
916        })
917    }
918
919    struct RecursiveOutboardValidator<'a, O: Outboard> {
920        tree: BaoTree,
921        shifted_filled_size: TreeNode,
922        outboard: O,
923        co: &'a Co<io::Result<Range<ChunkNum>>>,
924    }
925
926    impl<'a, O: Outboard> RecursiveOutboardValidator<'a, O> {
927        async fn validate(
928            outboard: O,
929            ranges: &ChunkRangesRef,
930            co: &Co<io::Result<Range<ChunkNum>>>,
931        ) -> io::Result<()> {
932            let tree = outboard.tree();
933            if tree.blocks() == 1 {
934                // special case for a tree that fits in one block / chunk group
935                co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
936                return Ok(());
937            }
938            let ranges = truncate_ranges(ranges, tree.size());
939            let root_hash = outboard.root();
940            let (shifted_root, shifted_filled_size) = tree.shifted();
941            let mut validator = RecursiveOutboardValidator {
942                tree,
943                shifted_filled_size,
944                outboard,
945                co,
946            };
947            validator
948                .validate_rec(&root_hash, shifted_root, true, ranges)
949                .await
950        }
951
952        fn validate_rec<'b>(
953            &'b mut self,
954            parent_hash: &'b blake3::Hash,
955            shifted: TreeNode,
956            is_root: bool,
957            ranges: &'b ChunkRangesRef,
958        ) -> LocalBoxFuture<'b, io::Result<()>> {
959            Box::pin(async move {
960                let yield_node_range = |range: Range<u64>| {
961                    self.co.yield_(Ok(
962                        ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
963                    ))
964                };
965                if ranges.is_empty() {
966                    // this part of the tree is not of interest, so we can skip it
967                    return Ok(());
968                }
969                let node = shifted.subtract_block_size(self.tree.block_size.0);
970                let (l, m, r) = self.tree.leaf_byte_ranges3(node);
971                if !self.tree.is_relevant_for_outboard(node) {
972                    yield_node_range(l..r).await;
973                    return Ok(());
974                }
975                let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
976                    // outboard is incomplete, we can't validate
977                    return Ok(());
978                };
979                let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root);
980                if &actual != parent_hash {
981                    // hash mismatch, we can't validate
982                    return Ok(());
983                };
984                let (l_ranges, r_ranges) = split(ranges, node);
985                if shifted.is_leaf() {
986                    if !l_ranges.is_empty() {
987                        yield_node_range(l..m).await;
988                    }
989                    if !r_ranges.is_empty() {
990                        yield_node_range(m..r).await;
991                    }
992                } else {
993                    // recurse (we are in the domain of the shifted tree)
994                    let left = shifted.left_child().unwrap();
995                    self.validate_rec(&l_hash, left, false, l_ranges).await?;
996                    let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
997                    self.validate_rec(&r_hash, right, false, r_ranges).await?;
998                }
999                Ok(())
1000            })
1001        }
1002    }
1003}
1004#[cfg(feature = "validate")]
1005pub use validate::{valid_outboard_ranges, valid_ranges};