1use std::{
12 future::Future,
13 io::{self, Cursor},
14 result,
15};
16
17use bytes::Bytes;
18pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
19use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
20use smallvec::SmallVec;
21
22pub use super::BaoContentItem;
23use super::{combine_hash_pair, DecodeError};
24use crate::{
25 blake3, hash_subtree,
26 io::{
27 error::EncodeError,
28 outboard::{PostOrderOutboard, PreOrderOutboard},
29 Leaf, Parent,
30 },
31 iter::{BaoChunk, ResponseIter},
32 parent_cv,
33 rec::{encode_selected_rec, truncate_ranges, truncate_ranges_owned},
34 BaoTree, BlockSize, ChunkRanges, ChunkRangesRef, TreeNode,
35};
36
37pub trait Outboard {
62 fn root(&self) -> blake3::Hash;
64 fn tree(&self) -> BaoTree;
66 fn load(
71 &mut self,
72 node: TreeNode,
73 ) -> impl Future<Output = io::Result<Option<(blake3::Hash, blake3::Hash)>>>;
74}
75
76pub trait OutboardMut: Sized {
85 fn save(
87 &mut self,
88 node: TreeNode,
89 hash_pair: &(blake3::Hash, blake3::Hash),
90 ) -> impl Future<Output = io::Result<()>>;
91
92 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
94}
95
96pub trait CreateOutboard {
100 #[allow(async_fn_in_trait)]
105 async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
106 where
107 Self: Default + Sized,
108 {
109 let size = data.size().await?;
110 Self::create_sized(Cursor::new(data), size, block_size).await
111 }
112
113 fn create_sized(
117 data: impl AsyncStreamReader,
118 size: u64,
119 block_size: BlockSize,
120 ) -> impl Future<Output = io::Result<Self>>
121 where
122 Self: Default + Sized;
123
124 fn init_from(&mut self, data: impl AsyncStreamReader) -> impl Future<Output = io::Result<()>>;
132}
133
134impl<O: Outboard> Outboard for &mut O {
135 fn root(&self) -> blake3::Hash {
136 (**self).root()
137 }
138
139 fn tree(&self) -> BaoTree {
140 (**self).tree()
141 }
142
143 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
144 (**self).load(node).await
145 }
146}
147
148impl<R: AsyncSliceReader> Outboard for PreOrderOutboard<R> {
149 fn root(&self) -> blake3::Hash {
150 self.root
151 }
152
153 fn tree(&self) -> BaoTree {
154 self.tree
155 }
156
157 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
158 let Some(offset) = self.tree.pre_order_offset(node) else {
159 return Ok(None);
160 };
161 let offset = offset * 64;
162 let content = self.data.read_at(offset, 64).await?;
163 Ok(Some(if content.len() != 64 {
164 (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
165 } else {
166 parse_hash_pair(content)?
167 }))
168 }
169}
170
171impl<O: OutboardMut> OutboardMut for &mut O {
172 async fn save(
173 &mut self,
174 node: TreeNode,
175 hash_pair: &(blake3::Hash, blake3::Hash),
176 ) -> io::Result<()> {
177 (**self).save(node, hash_pair).await
178 }
179
180 async fn sync(&mut self) -> io::Result<()> {
181 (**self).sync().await
182 }
183}
184
185impl<W: AsyncSliceWriter> OutboardMut for PreOrderOutboard<W> {
186 async fn save(
187 &mut self,
188 node: TreeNode,
189 hash_pair: &(blake3::Hash, blake3::Hash),
190 ) -> io::Result<()> {
191 let Some(offset) = self.tree.pre_order_offset(node) else {
192 return Ok(());
193 };
194 let offset = offset * 64;
195 let mut buf = [0u8; 64];
196 buf[..32].copy_from_slice(hash_pair.0.as_bytes());
197 buf[32..].copy_from_slice(hash_pair.1.as_bytes());
198 self.data.write_at(offset, &buf).await?;
199 Ok(())
200 }
201
202 async fn sync(&mut self) -> io::Result<()> {
203 self.data.sync().await
204 }
205}
206
207impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {
208 async fn save(
209 &mut self,
210 node: TreeNode,
211 hash_pair: &(blake3::Hash, blake3::Hash),
212 ) -> io::Result<()> {
213 let Some(offset) = self.tree.post_order_offset(node) else {
214 return Ok(());
215 };
216 let offset = offset.value() * 64;
217 let mut buf = [0u8; 64];
218 buf[..32].copy_from_slice(hash_pair.0.as_bytes());
219 buf[32..].copy_from_slice(hash_pair.1.as_bytes());
220 self.data.write_at(offset, &buf).await?;
221 Ok(())
222 }
223
224 async fn sync(&mut self) -> io::Result<()> {
225 self.data.sync().await
226 }
227}
228
229impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
230 async fn create_sized(
231 data: impl AsyncStreamReader,
232 size: u64,
233 block_size: BlockSize,
234 ) -> io::Result<Self>
235 where
236 Self: Default + Sized,
237 {
238 let mut res = Self {
239 tree: BaoTree::new(size, block_size),
240 ..Self::default()
241 };
242 res.init_from(data).await?;
243 Ok(res)
244 }
245
246 async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
247 let mut this = self;
248 let root = outboard(data, this.tree, &mut this).await?;
249 this.root = root;
250 this.sync().await?;
251 Ok(())
252 }
253}
254
255impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
256 async fn create_sized(
257 data: impl AsyncStreamReader,
258 size: u64,
259 block_size: BlockSize,
260 ) -> io::Result<Self>
261 where
262 Self: Default + Sized,
263 {
264 let mut res = Self {
265 tree: BaoTree::new(size, block_size),
266 ..Self::default()
267 };
268 res.init_from(data).await?;
269 Ok(res)
270 }
271
272 async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
273 let mut this = self;
274 let root = outboard(data, this.tree, &mut this).await?;
275 this.root = root;
276 this.sync().await?;
277 Ok(())
278 }
279}
280
281impl<R: AsyncSliceReader> Outboard for PostOrderOutboard<R> {
282 fn root(&self) -> blake3::Hash {
283 self.root
284 }
285
286 fn tree(&self) -> BaoTree {
287 self.tree
288 }
289
290 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
291 let Some(offset) = self.tree.post_order_offset(node) else {
292 return Ok(None);
293 };
294 let offset = offset.value() * 64;
295 let content = self.data.read_at(offset, 64).await?;
296 Ok(Some(if content.len() != 64 {
297 (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
298 } else {
299 parse_hash_pair(content)?
300 }))
301 }
302}
303
304pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::Hash)> {
305 if buf.len() != 64 {
306 return Err(io::Error::new(
307 io::ErrorKind::UnexpectedEof,
308 "hash pair must be 64 bytes",
309 ));
310 }
311 let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
312 let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
313 Ok((l_hash, r_hash))
314}
315
316#[derive(Debug)]
317struct ResponseDecoderInner<R> {
318 iter: ResponseIter,
319 stack: SmallVec<[blake3::Hash; 10]>,
320 encoded: R,
321}
322
323impl<R> ResponseDecoderInner<R> {
324 fn new(tree: BaoTree, hash: blake3::Hash, ranges: ChunkRanges, encoded: R) -> Self {
325 let ranges = truncate_ranges_owned(ranges, tree.size());
327 let mut res = Self {
328 iter: ResponseIter::new(tree, ranges),
329 stack: SmallVec::new(),
330 encoded,
331 };
332 res.stack.push(hash);
333 res
334 }
335}
336
337#[derive(Debug)]
339pub struct ResponseDecoder<R>(Box<ResponseDecoderInner<R>>);
340
341#[derive(Debug)]
343pub enum ResponseDecoderNext<R> {
344 More(
346 (
347 ResponseDecoder<R>,
348 std::result::Result<BaoContentItem, DecodeError>,
349 ),
350 ),
351 Done(R),
353}
354
355impl<R: AsyncStreamReader> ResponseDecoder<R> {
356 pub fn new(hash: blake3::Hash, ranges: ChunkRanges, tree: BaoTree, encoded: R) -> Self {
360 Self(Box::new(ResponseDecoderInner::new(
361 tree, hash, ranges, encoded,
362 )))
363 }
364
365 pub async fn next(mut self) -> ResponseDecoderNext<R> {
367 if let Some(chunk) = self.0.iter.next() {
368 let item = self.next0(chunk).await;
369 ResponseDecoderNext::More((self, item))
370 } else {
371 ResponseDecoderNext::Done(self.0.encoded)
372 }
373 }
374
375 pub fn finish(self) -> R {
377 self.0.encoded
378 }
379
380 pub fn tree(&self) -> BaoTree {
382 self.0.iter.tree()
383 }
384
385 pub fn hash(&self) -> &blake3::Hash {
387 &self.0.stack[0]
388 }
389
390 async fn next0(&mut self, chunk: BaoChunk) -> std::result::Result<BaoContentItem, DecodeError> {
391 Ok(match chunk {
392 BaoChunk::Parent {
393 is_root,
394 right,
395 left,
396 node,
397 ..
398 } => {
399 let this = &mut self.0;
400 let buf = this
401 .encoded
402 .read::<64>()
403 .await
404 .map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
405 let pair @ (l_hash, r_hash) = read_parent(&buf);
406 let parent_hash = this.stack.pop().unwrap();
407 let actual = parent_cv(&l_hash, &r_hash, is_root);
408 if right {
411 this.stack.push(r_hash);
412 }
413 if left {
415 this.stack.push(l_hash);
416 }
417 if parent_hash != actual {
419 return Err(DecodeError::ParentHashMismatch(node));
420 }
421 Parent { pair, node }.into()
422 }
423 BaoChunk::Leaf {
424 size,
425 is_root,
426 start_chunk,
427 ..
428 } => {
429 let this = &mut self.0;
431 let data = this
432 .encoded
433 .read_bytes_exact(size)
434 .await
435 .map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
436 let leaf_hash = this.stack.pop().unwrap();
437 let actual = hash_subtree(start_chunk.0, &data, is_root);
438 if leaf_hash != actual {
439 return Err(DecodeError::LeafHashMismatch(start_chunk));
440 }
441 Leaf {
442 offset: start_chunk.to_bytes(),
443 data,
444 }
445 .into()
446 }
447 })
448 }
449}
450
451pub async fn encode_ranges<D, O, W>(
459 mut data: D,
460 mut outboard: O,
461 ranges: &ChunkRangesRef,
462 encoded: W,
463) -> result::Result<(), EncodeError>
464where
465 D: AsyncSliceReader,
466 O: Outboard,
467 W: AsyncStreamWriter,
468{
469 let mut encoded = encoded;
470 let tree = outboard.tree();
471 for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
472 match item {
473 BaoChunk::Parent { node, .. } => {
474 let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
475 let pair = combine_hash_pair(&l_hash, &r_hash);
476 encoded
477 .write(&pair)
478 .await
479 .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
480 }
481 BaoChunk::Leaf {
482 start_chunk, size, ..
483 } => {
484 let start = start_chunk.to_bytes();
485 let bytes = data.read_exact_at(start, size).await?;
486 encoded
487 .write_bytes(bytes)
488 .await
489 .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
490 }
491 }
492 }
493 Ok(())
494}
495
496pub async fn encode_ranges_validated<D, O, W>(
504 mut data: D,
505 mut outboard: O,
506 ranges: &ChunkRangesRef,
507 encoded: W,
508) -> result::Result<(), EncodeError>
509where
510 D: AsyncSliceReader,
511 O: Outboard,
512 W: AsyncStreamWriter,
513{
514 let mut out_buf = Vec::new();
517 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
518 stack.push(outboard.root());
519 let mut encoded = encoded;
520 let tree = outboard.tree();
521 let ranges = truncate_ranges(ranges, tree.size());
522 for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
523 match item {
524 BaoChunk::Parent {
525 is_root,
526 left,
527 right,
528 node,
529 ..
530 } => {
531 let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
532 let actual = parent_cv(&l_hash, &r_hash, is_root);
533 let expected = stack.pop().unwrap();
534 if actual != expected {
535 return Err(EncodeError::ParentHashMismatch(node));
536 }
537 if right {
538 stack.push(r_hash);
539 }
540 if left {
541 stack.push(l_hash);
542 }
543 let pair = combine_hash_pair(&l_hash, &r_hash);
544 encoded
545 .write(&pair)
546 .await
547 .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
548 }
549 BaoChunk::Leaf {
550 start_chunk,
551 size,
552 is_root,
553 ranges,
554 ..
555 } => {
556 let expected = stack.pop().unwrap();
557 let start = start_chunk.to_bytes();
558 let bytes = data.read_exact_at(start, size).await?;
559 let (actual, to_write) = if !ranges.is_all() {
560 out_buf.clear();
565 let actual = encode_selected_rec(
566 start_chunk,
567 &bytes,
568 is_root,
569 ranges,
570 tree.block_size.to_u32(),
571 true,
572 &mut out_buf,
573 );
574 (actual, out_buf.clone().into())
575 } else {
576 let actual = hash_subtree(start_chunk.0, &bytes, is_root);
577 (actual, bytes)
578 };
579 if actual != expected {
580 return Err(EncodeError::LeafHashMismatch(start_chunk));
581 }
582 encoded
583 .write_bytes(to_write)
584 .await
585 .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
586 }
587 }
588 }
589 Ok(())
590}
591
592pub async fn decode_ranges<R, O, W>(
597 encoded: R,
598 ranges: ChunkRanges,
599 mut target: W,
600 mut outboard: O,
601) -> std::result::Result<(), DecodeError>
602where
603 O: OutboardMut + Outboard,
604 R: AsyncStreamReader,
605 W: AsyncSliceWriter,
606{
607 let mut reading = ResponseDecoder::new(outboard.root(), ranges, outboard.tree(), encoded);
608 loop {
609 let item = match reading.next().await {
610 ResponseDecoderNext::Done(_reader) => break,
611 ResponseDecoderNext::More((next, item)) => {
612 reading = next;
613 item?
614 }
615 };
616 match item {
617 BaoContentItem::Parent(Parent { node, pair }) => {
618 outboard.save(node, &pair).await?;
619 }
620 BaoContentItem::Leaf(Leaf { offset, data }) => {
621 target.write_bytes_at(offset, data).await?;
622 }
623 }
624 }
625 Ok(())
626}
627fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
628 let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
629 let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..64]).unwrap());
630 (l_hash, r_hash)
631}
632
633pub async fn outboard(
638 data: impl AsyncStreamReader,
639 tree: BaoTree,
640 mut outboard: impl OutboardMut,
641) -> io::Result<blake3::Hash> {
642 let mut buffer = vec![0u8; tree.chunk_group_bytes()];
643 let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?;
644 Ok(hash)
645}
646
647async fn outboard_impl(
649 tree: BaoTree,
650 mut data: impl AsyncStreamReader,
651 mut outboard: impl OutboardMut,
652 buffer: &mut [u8],
653) -> io::Result<blake3::Hash> {
654 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
656 debug_assert!(buffer.len() == tree.chunk_group_bytes());
657 for item in tree.post_order_chunks_iter() {
658 match item {
659 BaoChunk::Parent { is_root, node, .. } => {
660 let right_hash = stack.pop().unwrap();
661 let left_hash = stack.pop().unwrap();
662 outboard.save(node, &(left_hash, right_hash)).await?;
663 let parent = parent_cv(&left_hash, &right_hash, is_root);
664 stack.push(parent);
665 }
666 BaoChunk::Leaf {
667 size,
668 is_root,
669 start_chunk,
670 ..
671 } => {
672 let buf = data.read_bytes_exact(size).await?;
673 let hash = hash_subtree(start_chunk.0, &buf, is_root);
674 stack.push(hash);
675 }
676 }
677 }
678 debug_assert_eq!(stack.len(), 1);
679 let hash = stack.pop().unwrap();
680 Ok(hash)
681}
682
683pub async fn outboard_post_order(
690 data: impl AsyncStreamReader,
691 tree: BaoTree,
692 mut outboard: impl AsyncStreamWriter,
693) -> io::Result<blake3::Hash> {
694 let mut buffer = vec![0u8; tree.chunk_group_bytes()];
695 let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?;
696 Ok(hash)
697}
698
699async fn outboard_post_order_impl(
701 tree: BaoTree,
702 mut data: impl AsyncStreamReader,
703 mut outboard: impl AsyncStreamWriter,
704 buffer: &mut [u8],
705) -> io::Result<blake3::Hash> {
706 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
708 debug_assert!(buffer.len() == tree.chunk_group_bytes());
709 for item in tree.post_order_chunks_iter() {
710 match item {
711 BaoChunk::Parent { is_root, .. } => {
712 let right_hash = stack.pop().unwrap();
713 let left_hash = stack.pop().unwrap();
714 outboard.write(left_hash.as_bytes()).await?;
715 outboard.write(right_hash.as_bytes()).await?;
716 let parent = parent_cv(&left_hash, &right_hash, is_root);
717 stack.push(parent);
718 }
719 BaoChunk::Leaf {
720 size,
721 is_root,
722 start_chunk,
723 ..
724 } => {
725 let buf = data.read_bytes_exact(size).await?;
726 let hash = hash_subtree(start_chunk.0, &buf, is_root);
727 stack.push(hash);
728 }
729 }
730 }
731 debug_assert_eq!(stack.len(), 1);
732 let hash = stack.pop().unwrap();
733 Ok(hash)
734}
735
736pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
741 let tree = from.tree();
742 for node in tree.pre_order_nodes_iter() {
743 if let Some(hash_pair) = from.load(node).await? {
744 to.save(node, &hash_pair).await?;
745 }
746 }
747 Ok(())
748}
749
750#[cfg(feature = "validate")]
751mod validate {
752 use std::{io, ops::Range};
753
754 use futures_lite::{FutureExt, Stream};
755 use genawaiter::sync::{Co, Gen};
756 use iroh_io::AsyncSliceReader;
757
758 use super::Outboard;
759 use crate::{
760 blake3, hash_subtree, io::LocalBoxFuture, parent_cv, rec::truncate_ranges, split, BaoTree,
761 ChunkNum, ChunkRangesRef, TreeNode,
762 };
763
764 pub fn valid_ranges<'a, O, D>(
770 outboard: O,
771 data: D,
772 ranges: &'a ChunkRangesRef,
773 ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
774 where
775 O: Outboard + 'a,
776 D: AsyncSliceReader + 'a,
777 {
778 Gen::new(move |co| async move {
779 if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
780 {
781 co.yield_(Err(cause)).await;
782 }
783 })
784 }
785
786 struct RecursiveDataValidator<'a, O: Outboard, D: AsyncSliceReader> {
787 tree: BaoTree,
788 shifted_filled_size: TreeNode,
789 outboard: O,
790 data: D,
791 co: &'a Co<io::Result<Range<ChunkNum>>>,
792 }
793
794 impl<O: Outboard, D: AsyncSliceReader> RecursiveDataValidator<'_, O, D> {
795 async fn validate(
796 outboard: O,
797 data: D,
798 ranges: &ChunkRangesRef,
799 co: &Co<io::Result<Range<ChunkNum>>>,
800 ) -> io::Result<()> {
801 let tree = outboard.tree();
802 if tree.blocks() == 1 {
803 let mut data = data;
805 let data = data
806 .read_exact_at(0, tree.size().try_into().unwrap())
807 .await?;
808 let actual = hash_subtree(0, &data, true);
809 if actual == outboard.root() {
810 co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
811 }
812 return Ok(());
813 }
814 let ranges = truncate_ranges(ranges, tree.size());
815 let root_hash = outboard.root();
816 let (shifted_root, shifted_filled_size) = tree.shifted();
817 let mut validator = RecursiveDataValidator {
818 tree,
819 shifted_filled_size,
820 outboard,
821 data,
822 co,
823 };
824 validator
825 .validate_rec(&root_hash, shifted_root, true, ranges)
826 .await
827 }
828
829 async fn yield_if_valid(
830 &mut self,
831 range: Range<u64>,
832 hash: &blake3::Hash,
833 is_root: bool,
834 ) -> io::Result<()> {
835 let len = (range.end - range.start).try_into().unwrap();
836 let data = self.data.read_exact_at(range.start, len).await?;
837 let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, &data, is_root);
839 if &actual == hash {
840 self.co
842 .yield_(Ok(
843 ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
844 ))
845 .await;
846 }
847 io::Result::Ok(())
848 }
849
850 fn validate_rec<'b>(
851 &'b mut self,
852 parent_hash: &'b blake3::Hash,
853 shifted: TreeNode,
854 is_root: bool,
855 ranges: &'b ChunkRangesRef,
856 ) -> LocalBoxFuture<'b, io::Result<()>> {
857 async move {
858 if ranges.is_empty() {
859 return Ok(());
861 }
862 let node = shifted.subtract_block_size(self.tree.block_size.0);
863 let (l, m, r) = self.tree.leaf_byte_ranges3(node);
864 if !self.tree.is_relevant_for_outboard(node) {
865 self.yield_if_valid(l..r, parent_hash, is_root).await?;
866 return Ok(());
867 }
868 let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
869 return Ok(());
871 };
872 let actual = parent_cv(&l_hash, &r_hash, is_root);
873 if &actual != parent_hash {
874 return Ok(());
876 };
877 let (l_ranges, r_ranges) = split(ranges, node);
878 if shifted.is_leaf() {
879 if !l_ranges.is_empty() {
880 self.yield_if_valid(l..m, &l_hash, false).await?;
881 }
882 if !r_ranges.is_empty() {
883 self.yield_if_valid(m..r, &r_hash, false).await?;
884 }
885 } else {
886 let left = shifted.left_child().unwrap();
888 self.validate_rec(&l_hash, left, false, l_ranges).await?;
889 let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
890 self.validate_rec(&r_hash, right, false, r_ranges).await?;
891 }
892 Ok(())
893 }
894 .boxed_local()
895 }
896 }
897
898 pub fn valid_outboard_ranges<'a, O>(
902 outboard: O,
903 ranges: &'a ChunkRangesRef,
904 ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
905 where
906 O: Outboard + 'a,
907 {
908 Gen::new(move |co| async move {
909 if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
910 co.yield_(Err(cause)).await;
911 }
912 })
913 }
914
915 struct RecursiveOutboardValidator<'a, O: Outboard> {
916 tree: BaoTree,
917 shifted_filled_size: TreeNode,
918 outboard: O,
919 co: &'a Co<io::Result<Range<ChunkNum>>>,
920 }
921
922 impl<O: Outboard> RecursiveOutboardValidator<'_, O> {
923 async fn validate(
924 outboard: O,
925 ranges: &ChunkRangesRef,
926 co: &Co<io::Result<Range<ChunkNum>>>,
927 ) -> io::Result<()> {
928 let tree = outboard.tree();
929 if tree.blocks() == 1 {
930 co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
932 return Ok(());
933 }
934 let ranges = truncate_ranges(ranges, tree.size());
935 let root_hash = outboard.root();
936 let (shifted_root, shifted_filled_size) = tree.shifted();
937 let mut validator = RecursiveOutboardValidator {
938 tree,
939 shifted_filled_size,
940 outboard,
941 co,
942 };
943 validator
944 .validate_rec(&root_hash, shifted_root, true, ranges)
945 .await
946 }
947
948 fn validate_rec<'b>(
949 &'b mut self,
950 parent_hash: &'b blake3::Hash,
951 shifted: TreeNode,
952 is_root: bool,
953 ranges: &'b ChunkRangesRef,
954 ) -> LocalBoxFuture<'b, io::Result<()>> {
955 Box::pin(async move {
956 let yield_node_range = |range: Range<u64>| {
957 self.co.yield_(Ok(
958 ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
959 ))
960 };
961 if ranges.is_empty() {
962 return Ok(());
964 }
965 let node = shifted.subtract_block_size(self.tree.block_size.0);
966 let (l, m, r) = self.tree.leaf_byte_ranges3(node);
967 if !self.tree.is_relevant_for_outboard(node) {
968 yield_node_range(l..r).await;
969 return Ok(());
970 }
971 let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
972 return Ok(());
974 };
975 let actual = parent_cv(&l_hash, &r_hash, is_root);
976 if &actual != parent_hash {
977 return Ok(());
979 };
980 let (l_ranges, r_ranges) = split(ranges, node);
981 if shifted.is_leaf() {
982 if !l_ranges.is_empty() {
983 yield_node_range(l..m).await;
984 }
985 if !r_ranges.is_empty() {
986 yield_node_range(m..r).await;
987 }
988 } else {
989 let left = shifted.left_child().unwrap();
991 self.validate_rec(&l_hash, left, false, l_ranges).await?;
992 let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
993 self.validate_rec(&r_hash, right, false, r_ranges).await?;
994 }
995 Ok(())
996 })
997 }
998 }
999}
1000#[cfg(feature = "validate")]
1001pub use validate::{valid_outboard_ranges, valid_ranges};