1use std::{
12 future::Future,
13 io::{self, Cursor},
14 result,
15};
16
17use crate::{
18 blake3, hash_subtree,
19 iter::ResponseIter,
20 rec::{encode_selected_rec, truncate_ranges, truncate_ranges_owned},
21 ChunkRanges, ChunkRangesRef,
22};
23use blake3::guts::parent_cv;
24use bytes::Bytes;
25use iroh_io::{AsyncStreamReader, AsyncStreamWriter};
26use smallvec::SmallVec;
27
28pub use super::BaoContentItem;
29use crate::{
30 io::{
31 error::EncodeError,
32 outboard::{PostOrderOutboard, PreOrderOutboard},
33 Leaf, Parent,
34 },
35 iter::BaoChunk,
36 BaoTree, BlockSize, TreeNode,
37};
38pub use iroh_io::{AsyncSliceReader, AsyncSliceWriter};
39
40use super::{combine_hash_pair, DecodeError};
41
42pub trait Outboard {
67 fn root(&self) -> blake3::Hash;
69 fn tree(&self) -> BaoTree;
71 fn load(
76 &mut self,
77 node: TreeNode,
78 ) -> impl Future<Output = io::Result<Option<(blake3::Hash, blake3::Hash)>>>;
79}
80
81pub trait OutboardMut: Sized {
90 fn save(
92 &mut self,
93 node: TreeNode,
94 hash_pair: &(blake3::Hash, blake3::Hash),
95 ) -> impl Future<Output = io::Result<()>>;
96
97 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
99}
100
101pub trait CreateOutboard {
105 #[allow(async_fn_in_trait)]
110 async fn create(mut data: impl AsyncSliceReader, block_size: BlockSize) -> io::Result<Self>
111 where
112 Self: Default + Sized,
113 {
114 let size = data.size().await?;
115 Self::create_sized(Cursor::new(data), size, block_size).await
116 }
117
118 fn create_sized(
122 data: impl AsyncStreamReader,
123 size: u64,
124 block_size: BlockSize,
125 ) -> impl Future<Output = io::Result<Self>>
126 where
127 Self: Default + Sized;
128
129 fn init_from(&mut self, data: impl AsyncStreamReader) -> impl Future<Output = io::Result<()>>;
137}
138
139impl<'b, O: Outboard> Outboard for &'b mut O {
140 fn root(&self) -> blake3::Hash {
141 (**self).root()
142 }
143
144 fn tree(&self) -> BaoTree {
145 (**self).tree()
146 }
147
148 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
149 (**self).load(node).await
150 }
151}
152
153impl<R: AsyncSliceReader> Outboard for PreOrderOutboard<R> {
154 fn root(&self) -> blake3::Hash {
155 self.root
156 }
157
158 fn tree(&self) -> BaoTree {
159 self.tree
160 }
161
162 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
163 let Some(offset) = self.tree.pre_order_offset(node) else {
164 return Ok(None);
165 };
166 let offset = offset * 64;
167 let content = self.data.read_at(offset, 64).await?;
168 Ok(Some(if content.len() != 64 {
169 (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
170 } else {
171 parse_hash_pair(content)?
172 }))
173 }
174}
175
176impl<'b, O: OutboardMut> OutboardMut for &'b mut O {
177 async fn save(
178 &mut self,
179 node: TreeNode,
180 hash_pair: &(blake3::Hash, blake3::Hash),
181 ) -> io::Result<()> {
182 (**self).save(node, hash_pair).await
183 }
184
185 async fn sync(&mut self) -> io::Result<()> {
186 (**self).sync().await
187 }
188}
189
190impl<W: AsyncSliceWriter> OutboardMut for PreOrderOutboard<W> {
191 async fn save(
192 &mut self,
193 node: TreeNode,
194 hash_pair: &(blake3::Hash, blake3::Hash),
195 ) -> io::Result<()> {
196 let Some(offset) = self.tree.pre_order_offset(node) else {
197 return Ok(());
198 };
199 let offset = offset * 64;
200 let mut buf = [0u8; 64];
201 buf[..32].copy_from_slice(hash_pair.0.as_bytes());
202 buf[32..].copy_from_slice(hash_pair.1.as_bytes());
203 self.data.write_at(offset, &buf).await?;
204 Ok(())
205 }
206
207 async fn sync(&mut self) -> io::Result<()> {
208 self.data.sync().await
209 }
210}
211
212impl<W: AsyncSliceWriter> OutboardMut for PostOrderOutboard<W> {
213 async fn save(
214 &mut self,
215 node: TreeNode,
216 hash_pair: &(blake3::Hash, blake3::Hash),
217 ) -> io::Result<()> {
218 let Some(offset) = self.tree.post_order_offset(node) else {
219 return Ok(());
220 };
221 let offset = offset.value() * 64;
222 let mut buf = [0u8; 64];
223 buf[..32].copy_from_slice(hash_pair.0.as_bytes());
224 buf[32..].copy_from_slice(hash_pair.1.as_bytes());
225 self.data.write_at(offset, &buf).await?;
226 Ok(())
227 }
228
229 async fn sync(&mut self) -> io::Result<()> {
230 self.data.sync().await
231 }
232}
233
234impl<W: AsyncSliceWriter> CreateOutboard for PreOrderOutboard<W> {
235 async fn create_sized(
236 data: impl AsyncStreamReader,
237 size: u64,
238 block_size: BlockSize,
239 ) -> io::Result<Self>
240 where
241 Self: Default + Sized,
242 {
243 let mut res = Self {
244 tree: BaoTree::new(size, block_size),
245 ..Self::default()
246 };
247 res.init_from(data).await?;
248 Ok(res)
249 }
250
251 async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
252 let mut this = self;
253 let root = outboard(data, this.tree, &mut this).await?;
254 this.root = root;
255 this.sync().await?;
256 Ok(())
257 }
258}
259
260impl<W: AsyncSliceWriter> CreateOutboard for PostOrderOutboard<W> {
261 async fn create_sized(
262 data: impl AsyncStreamReader,
263 size: u64,
264 block_size: BlockSize,
265 ) -> io::Result<Self>
266 where
267 Self: Default + Sized,
268 {
269 let mut res = Self {
270 tree: BaoTree::new(size, block_size),
271 ..Self::default()
272 };
273 res.init_from(data).await?;
274 Ok(res)
275 }
276
277 async fn init_from(&mut self, data: impl AsyncStreamReader) -> io::Result<()> {
278 let mut this = self;
279 let root = outboard(data, this.tree, &mut this).await?;
280 this.root = root;
281 this.sync().await?;
282 Ok(())
283 }
284}
285
286impl<R: AsyncSliceReader> Outboard for PostOrderOutboard<R> {
287 fn root(&self) -> blake3::Hash {
288 self.root
289 }
290
291 fn tree(&self) -> BaoTree {
292 self.tree
293 }
294
295 async fn load(&mut self, node: TreeNode) -> io::Result<Option<(blake3::Hash, blake3::Hash)>> {
296 let Some(offset) = self.tree.post_order_offset(node) else {
297 return Ok(None);
298 };
299 let offset = offset.value() * 64;
300 let content = self.data.read_at(offset, 64).await?;
301 Ok(Some(if content.len() != 64 {
302 (blake3::Hash::from([0; 32]), blake3::Hash::from([0; 32]))
303 } else {
304 parse_hash_pair(content)?
305 }))
306 }
307}
308
309pub(crate) fn parse_hash_pair(buf: Bytes) -> io::Result<(blake3::Hash, blake3::Hash)> {
310 if buf.len() != 64 {
311 return Err(io::Error::new(
312 io::ErrorKind::UnexpectedEof,
313 "hash pair must be 64 bytes",
314 ));
315 }
316 let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
317 let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..]).unwrap());
318 Ok((l_hash, r_hash))
319}
320
321#[derive(Debug)]
322struct ResponseDecoderInner<R> {
323 iter: ResponseIter,
324 stack: SmallVec<[blake3::Hash; 10]>,
325 encoded: R,
326}
327
328impl<R> ResponseDecoderInner<R> {
329 fn new(tree: BaoTree, hash: blake3::Hash, ranges: ChunkRanges, encoded: R) -> Self {
330 let ranges = truncate_ranges_owned(ranges, tree.size());
332 let mut res = Self {
333 iter: ResponseIter::new(tree, ranges),
334 stack: SmallVec::new(),
335 encoded,
336 };
337 res.stack.push(hash);
338 res
339 }
340}
341
342#[derive(Debug)]
344pub struct ResponseDecoder<R>(Box<ResponseDecoderInner<R>>);
345
346#[derive(Debug)]
348pub enum ResponseDecoderNext<R> {
349 More(
351 (
352 ResponseDecoder<R>,
353 std::result::Result<BaoContentItem, DecodeError>,
354 ),
355 ),
356 Done(R),
358}
359
360impl<R: AsyncStreamReader> ResponseDecoder<R> {
361 pub fn new(hash: blake3::Hash, ranges: ChunkRanges, tree: BaoTree, encoded: R) -> Self {
365 Self(Box::new(ResponseDecoderInner::new(
366 tree, hash, ranges, encoded,
367 )))
368 }
369
370 pub async fn next(mut self) -> ResponseDecoderNext<R> {
372 if let Some(chunk) = self.0.iter.next() {
373 let item = self.next0(chunk).await;
374 ResponseDecoderNext::More((self, item))
375 } else {
376 ResponseDecoderNext::Done(self.0.encoded)
377 }
378 }
379
380 pub fn finish(self) -> R {
382 self.0.encoded
383 }
384
385 pub fn tree(&self) -> BaoTree {
387 self.0.iter.tree()
388 }
389
390 pub fn hash(&self) -> &blake3::Hash {
392 &self.0.stack[0]
393 }
394
395 async fn next0(&mut self, chunk: BaoChunk) -> std::result::Result<BaoContentItem, DecodeError> {
396 Ok(match chunk {
397 BaoChunk::Parent {
398 is_root,
399 right,
400 left,
401 node,
402 ..
403 } => {
404 let this = &mut self.0;
405 let buf = this
406 .encoded
407 .read::<64>()
408 .await
409 .map_err(|e| DecodeError::maybe_parent_not_found(e, node))?;
410 let pair @ (l_hash, r_hash) = read_parent(&buf);
411 let parent_hash = this.stack.pop().unwrap();
412 let actual = parent_cv(&l_hash, &r_hash, is_root);
413 if right {
416 this.stack.push(r_hash);
417 }
418 if left {
420 this.stack.push(l_hash);
421 }
422 if parent_hash != actual {
424 return Err(DecodeError::ParentHashMismatch(node));
425 }
426 Parent { pair, node }.into()
427 }
428 BaoChunk::Leaf {
429 size,
430 is_root,
431 start_chunk,
432 ..
433 } => {
434 let this = &mut self.0;
436 let data = this
437 .encoded
438 .read_bytes(size)
439 .await
440 .map_err(|e| DecodeError::maybe_leaf_not_found(e, start_chunk))?;
441 let leaf_hash = this.stack.pop().unwrap();
442 let actual = hash_subtree(start_chunk.0, &data, is_root);
443 if leaf_hash != actual {
444 return Err(DecodeError::LeafHashMismatch(start_chunk));
445 }
446 Leaf {
447 offset: start_chunk.to_bytes(),
448 data,
449 }
450 .into()
451 }
452 })
453 }
454}
455
456pub async fn encode_ranges<D, O, W>(
464 mut data: D,
465 mut outboard: O,
466 ranges: &ChunkRangesRef,
467 encoded: W,
468) -> result::Result<(), EncodeError>
469where
470 D: AsyncSliceReader,
471 O: Outboard,
472 W: AsyncStreamWriter,
473{
474 let mut encoded = encoded;
475 let tree = outboard.tree();
476 for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
477 match item {
478 BaoChunk::Parent { node, .. } => {
479 let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
480 let pair = combine_hash_pair(&l_hash, &r_hash);
481 encoded
482 .write(&pair)
483 .await
484 .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
485 }
486 BaoChunk::Leaf {
487 start_chunk, size, ..
488 } => {
489 let start = start_chunk.to_bytes();
490 let bytes = data.read_at(start, size).await?;
491 encoded
492 .write_bytes(bytes)
493 .await
494 .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
495 }
496 }
497 }
498 Ok(())
499}
500
501pub async fn encode_ranges_validated<D, O, W>(
509 mut data: D,
510 mut outboard: O,
511 ranges: &ChunkRangesRef,
512 encoded: W,
513) -> result::Result<(), EncodeError>
514where
515 D: AsyncSliceReader,
516 O: Outboard,
517 W: AsyncStreamWriter,
518{
519 let mut out_buf = Vec::new();
522 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
523 stack.push(outboard.root());
524 let mut encoded = encoded;
525 let tree = outboard.tree();
526 let ranges = truncate_ranges(ranges, tree.size());
527 for item in tree.ranges_pre_order_chunks_iter_ref(ranges, 0) {
528 match item {
529 BaoChunk::Parent {
530 is_root,
531 left,
532 right,
533 node,
534 ..
535 } => {
536 let (l_hash, r_hash) = outboard.load(node).await?.unwrap();
537 let actual = parent_cv(&l_hash, &r_hash, is_root);
538 let expected = stack.pop().unwrap();
539 if actual != expected {
540 return Err(EncodeError::ParentHashMismatch(node));
541 }
542 if right {
543 stack.push(r_hash);
544 }
545 if left {
546 stack.push(l_hash);
547 }
548 let pair = combine_hash_pair(&l_hash, &r_hash);
549 encoded
550 .write(&pair)
551 .await
552 .map_err(|e| EncodeError::maybe_parent_write(e, node))?;
553 }
554 BaoChunk::Leaf {
555 start_chunk,
556 size,
557 is_root,
558 ranges,
559 ..
560 } => {
561 let expected = stack.pop().unwrap();
562 let start = start_chunk.to_bytes();
563 let bytes = data.read_at(start, size).await?;
564 let (actual, to_write) = if !ranges.is_all() {
565 out_buf.clear();
570 let actual = encode_selected_rec(
571 start_chunk,
572 &bytes,
573 is_root,
574 ranges,
575 tree.block_size.to_u32(),
576 true,
577 &mut out_buf,
578 );
579 (actual, out_buf.clone().into())
580 } else {
581 let actual = hash_subtree(start_chunk.0, &bytes, is_root);
582 (actual, bytes)
583 };
584 if actual != expected {
585 return Err(EncodeError::LeafHashMismatch(start_chunk));
586 }
587 encoded
588 .write_bytes(to_write)
589 .await
590 .map_err(|e| EncodeError::maybe_leaf_write(e, start_chunk))?;
591 }
592 }
593 }
594 Ok(())
595}
596
597pub async fn decode_ranges<R, O, W>(
602 encoded: R,
603 ranges: ChunkRanges,
604 mut target: W,
605 mut outboard: O,
606) -> std::result::Result<(), DecodeError>
607where
608 O: OutboardMut + Outboard,
609 R: AsyncStreamReader,
610 W: AsyncSliceWriter,
611{
612 let mut reading = ResponseDecoder::new(outboard.root(), ranges, outboard.tree(), encoded);
613 loop {
614 let item = match reading.next().await {
615 ResponseDecoderNext::Done(_reader) => break,
616 ResponseDecoderNext::More((next, item)) => {
617 reading = next;
618 item?
619 }
620 };
621 match item {
622 BaoContentItem::Parent(Parent { node, pair }) => {
623 outboard.save(node, &pair).await?;
624 }
625 BaoContentItem::Leaf(Leaf { offset, data }) => {
626 target.write_bytes_at(offset, data).await?;
627 }
628 }
629 }
630 Ok(())
631}
632fn read_parent(buf: &[u8]) -> (blake3::Hash, blake3::Hash) {
633 let l_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[..32]).unwrap());
634 let r_hash = blake3::Hash::from(<[u8; 32]>::try_from(&buf[32..64]).unwrap());
635 (l_hash, r_hash)
636}
637
638pub async fn outboard(
643 data: impl AsyncStreamReader,
644 tree: BaoTree,
645 mut outboard: impl OutboardMut,
646) -> io::Result<blake3::Hash> {
647 let mut buffer = vec![0u8; tree.chunk_group_bytes()];
648 let hash = outboard_impl(tree, data, &mut outboard, &mut buffer).await?;
649 Ok(hash)
650}
651
652async fn outboard_impl(
654 tree: BaoTree,
655 mut data: impl AsyncStreamReader,
656 mut outboard: impl OutboardMut,
657 buffer: &mut [u8],
658) -> io::Result<blake3::Hash> {
659 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
661 debug_assert!(buffer.len() == tree.chunk_group_bytes());
662 for item in tree.post_order_chunks_iter() {
663 match item {
664 BaoChunk::Parent { is_root, node, .. } => {
665 let right_hash = stack.pop().unwrap();
666 let left_hash = stack.pop().unwrap();
667 outboard.save(node, &(left_hash, right_hash)).await?;
668 let parent = parent_cv(&left_hash, &right_hash, is_root);
669 stack.push(parent);
670 }
671 BaoChunk::Leaf {
672 size,
673 is_root,
674 start_chunk,
675 ..
676 } => {
677 let buf = data.read_bytes(size).await?;
678 let hash = hash_subtree(start_chunk.0, &buf, is_root);
679 stack.push(hash);
680 }
681 }
682 }
683 debug_assert_eq!(stack.len(), 1);
684 let hash = stack.pop().unwrap();
685 Ok(hash)
686}
687
688pub async fn outboard_post_order(
695 data: impl AsyncStreamReader,
696 tree: BaoTree,
697 mut outboard: impl AsyncStreamWriter,
698) -> io::Result<blake3::Hash> {
699 let mut buffer = vec![0u8; tree.chunk_group_bytes()];
700 let hash = outboard_post_order_impl(tree, data, &mut outboard, &mut buffer).await?;
701 Ok(hash)
702}
703
704async fn outboard_post_order_impl(
706 tree: BaoTree,
707 mut data: impl AsyncStreamReader,
708 mut outboard: impl AsyncStreamWriter,
709 buffer: &mut [u8],
710) -> io::Result<blake3::Hash> {
711 let mut stack = SmallVec::<[blake3::Hash; 10]>::new();
713 debug_assert!(buffer.len() == tree.chunk_group_bytes());
714 for item in tree.post_order_chunks_iter() {
715 match item {
716 BaoChunk::Parent { is_root, .. } => {
717 let right_hash = stack.pop().unwrap();
718 let left_hash = stack.pop().unwrap();
719 outboard.write(left_hash.as_bytes()).await?;
720 outboard.write(right_hash.as_bytes()).await?;
721 let parent = parent_cv(&left_hash, &right_hash, is_root);
722 stack.push(parent);
723 }
724 BaoChunk::Leaf {
725 size,
726 is_root,
727 start_chunk,
728 ..
729 } => {
730 let buf = data.read_bytes(size).await?;
731 let hash = hash_subtree(start_chunk.0, &buf, is_root);
732 stack.push(hash);
733 }
734 }
735 }
736 debug_assert_eq!(stack.len(), 1);
737 let hash = stack.pop().unwrap();
738 Ok(hash)
739}
740
741pub async fn copy(mut from: impl Outboard, mut to: impl OutboardMut) -> io::Result<()> {
746 let tree = from.tree();
747 for node in tree.pre_order_nodes_iter() {
748 if let Some(hash_pair) = from.load(node).await? {
749 to.save(node, &hash_pair).await?;
750 }
751 }
752 Ok(())
753}
754
755#[cfg(feature = "validate")]
756mod validate {
757 use std::{io, ops::Range};
758
759 use futures_lite::{FutureExt, Stream};
760 use genawaiter::sync::{Co, Gen};
761 use iroh_io::AsyncSliceReader;
762
763 use crate::{
764 blake3, hash_subtree, io::LocalBoxFuture, rec::truncate_ranges, split, BaoTree, ChunkNum,
765 ChunkRangesRef, TreeNode,
766 };
767
768 use super::Outboard;
769
770 pub fn valid_ranges<'a, O, D>(
776 outboard: O,
777 data: D,
778 ranges: &'a ChunkRangesRef,
779 ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
780 where
781 O: Outboard + 'a,
782 D: AsyncSliceReader + 'a,
783 {
784 Gen::new(move |co| async move {
785 if let Err(cause) = RecursiveDataValidator::validate(outboard, data, ranges, &co).await
786 {
787 co.yield_(Err(cause)).await;
788 }
789 })
790 }
791
792 struct RecursiveDataValidator<'a, O: Outboard, D: AsyncSliceReader> {
793 tree: BaoTree,
794 shifted_filled_size: TreeNode,
795 outboard: O,
796 data: D,
797 co: &'a Co<io::Result<Range<ChunkNum>>>,
798 }
799
800 impl<'a, O: Outboard, D: AsyncSliceReader> RecursiveDataValidator<'a, O, D> {
801 async fn validate(
802 outboard: O,
803 data: D,
804 ranges: &ChunkRangesRef,
805 co: &Co<io::Result<Range<ChunkNum>>>,
806 ) -> io::Result<()> {
807 let tree = outboard.tree();
808 if tree.blocks() == 1 {
809 let mut data = data;
811 let data = data.read_at(0, tree.size().try_into().unwrap()).await?;
812 let actual = hash_subtree(0, &data, true);
813 if actual == outboard.root() {
814 co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
815 }
816 return Ok(());
817 }
818 let ranges = truncate_ranges(ranges, tree.size());
819 let root_hash = outboard.root();
820 let (shifted_root, shifted_filled_size) = tree.shifted();
821 let mut validator = RecursiveDataValidator {
822 tree,
823 shifted_filled_size,
824 outboard,
825 data,
826 co,
827 };
828 validator
829 .validate_rec(&root_hash, shifted_root, true, ranges)
830 .await
831 }
832
833 async fn yield_if_valid(
834 &mut self,
835 range: Range<u64>,
836 hash: &blake3::Hash,
837 is_root: bool,
838 ) -> io::Result<()> {
839 let len = (range.end - range.start).try_into().unwrap();
840 let data = self.data.read_at(range.start, len).await?;
841 let actual = hash_subtree(ChunkNum::full_chunks(range.start).0, &data, is_root);
843 if &actual == hash {
844 self.co
846 .yield_(Ok(
847 ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
848 ))
849 .await;
850 }
851 io::Result::Ok(())
852 }
853
854 fn validate_rec<'b>(
855 &'b mut self,
856 parent_hash: &'b blake3::Hash,
857 shifted: TreeNode,
858 is_root: bool,
859 ranges: &'b ChunkRangesRef,
860 ) -> LocalBoxFuture<'b, io::Result<()>> {
861 async move {
862 if ranges.is_empty() {
863 return Ok(());
865 }
866 let node = shifted.subtract_block_size(self.tree.block_size.0);
867 let (l, m, r) = self.tree.leaf_byte_ranges3(node);
868 if !self.tree.is_relevant_for_outboard(node) {
869 self.yield_if_valid(l..r, parent_hash, is_root).await?;
870 return Ok(());
871 }
872 let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
873 return Ok(());
875 };
876 let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root);
877 if &actual != parent_hash {
878 return Ok(());
880 };
881 let (l_ranges, r_ranges) = split(ranges, node);
882 if shifted.is_leaf() {
883 if !l_ranges.is_empty() {
884 self.yield_if_valid(l..m, &l_hash, false).await?;
885 }
886 if !r_ranges.is_empty() {
887 self.yield_if_valid(m..r, &r_hash, false).await?;
888 }
889 } else {
890 let left = shifted.left_child().unwrap();
892 self.validate_rec(&l_hash, left, false, l_ranges).await?;
893 let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
894 self.validate_rec(&r_hash, right, false, r_ranges).await?;
895 }
896 Ok(())
897 }
898 .boxed_local()
899 }
900 }
901
902 pub fn valid_outboard_ranges<'a, O>(
906 outboard: O,
907 ranges: &'a ChunkRangesRef,
908 ) -> impl Stream<Item = io::Result<Range<ChunkNum>>> + 'a
909 where
910 O: Outboard + 'a,
911 {
912 Gen::new(move |co| async move {
913 if let Err(cause) = RecursiveOutboardValidator::validate(outboard, ranges, &co).await {
914 co.yield_(Err(cause)).await;
915 }
916 })
917 }
918
919 struct RecursiveOutboardValidator<'a, O: Outboard> {
920 tree: BaoTree,
921 shifted_filled_size: TreeNode,
922 outboard: O,
923 co: &'a Co<io::Result<Range<ChunkNum>>>,
924 }
925
926 impl<'a, O: Outboard> RecursiveOutboardValidator<'a, O> {
927 async fn validate(
928 outboard: O,
929 ranges: &ChunkRangesRef,
930 co: &Co<io::Result<Range<ChunkNum>>>,
931 ) -> io::Result<()> {
932 let tree = outboard.tree();
933 if tree.blocks() == 1 {
934 co.yield_(Ok(ChunkNum(0)..tree.chunks())).await;
936 return Ok(());
937 }
938 let ranges = truncate_ranges(ranges, tree.size());
939 let root_hash = outboard.root();
940 let (shifted_root, shifted_filled_size) = tree.shifted();
941 let mut validator = RecursiveOutboardValidator {
942 tree,
943 shifted_filled_size,
944 outboard,
945 co,
946 };
947 validator
948 .validate_rec(&root_hash, shifted_root, true, ranges)
949 .await
950 }
951
952 fn validate_rec<'b>(
953 &'b mut self,
954 parent_hash: &'b blake3::Hash,
955 shifted: TreeNode,
956 is_root: bool,
957 ranges: &'b ChunkRangesRef,
958 ) -> LocalBoxFuture<'b, io::Result<()>> {
959 Box::pin(async move {
960 let yield_node_range = |range: Range<u64>| {
961 self.co.yield_(Ok(
962 ChunkNum::full_chunks(range.start)..ChunkNum::chunks(range.end)
963 ))
964 };
965 if ranges.is_empty() {
966 return Ok(());
968 }
969 let node = shifted.subtract_block_size(self.tree.block_size.0);
970 let (l, m, r) = self.tree.leaf_byte_ranges3(node);
971 if !self.tree.is_relevant_for_outboard(node) {
972 yield_node_range(l..r).await;
973 return Ok(());
974 }
975 let Some((l_hash, r_hash)) = self.outboard.load(node).await? else {
976 return Ok(());
978 };
979 let actual = blake3::guts::parent_cv(&l_hash, &r_hash, is_root);
980 if &actual != parent_hash {
981 return Ok(());
983 };
984 let (l_ranges, r_ranges) = split(ranges, node);
985 if shifted.is_leaf() {
986 if !l_ranges.is_empty() {
987 yield_node_range(l..m).await;
988 }
989 if !r_ranges.is_empty() {
990 yield_node_range(m..r).await;
991 }
992 } else {
993 let left = shifted.left_child().unwrap();
995 self.validate_rec(&l_hash, left, false, l_ranges).await?;
996 let right = shifted.right_descendant(self.shifted_filled_size).unwrap();
997 self.validate_rec(&r_hash, right, false, r_ranges).await?;
998 }
999 Ok(())
1000 })
1001 }
1002 }
1003}
1004#[cfg(feature = "validate")]
1005pub use validate::{valid_outboard_ranges, valid_ranges};