iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::{
20    fmt::{self, Debug},
21    time::{Duration, Instant},
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_snafu::SpanTrace;
28use nested_enum_utils::common_fields;
29use serde::{Deserialize, Serialize};
30use snafu::{Backtrace, IntoError, ResultExt, Snafu};
31use tracing::{debug, error};
32
33use crate::{
34    protocol::ChunkRangesSeq,
35    store::IROH_BLOCK_SIZE,
36    util::{RecvStream, SendStream},
37    Hash,
38};
39
40mod error;
41pub mod request;
42pub(crate) use error::get_error;
43pub use error::{GetError, GetResult};
44
45type DefaultReader = iroh::endpoint::RecvStream;
46type DefaultWriter = iroh::endpoint::SendStream;
47
48pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
49    pub connection_id: u64,
50    pub t0: Instant,
51    pub recv: R,
52    pub send: W,
53}
54
55impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
56    pub fn new(connection_id: u64, recv: R, send: W) -> Self {
57        Self {
58            t0: Instant::now(),
59            recv,
60            send,
61            connection_id,
62        }
63    }
64}
65
66/// Stats about the transfer.
67#[derive(
68    Debug,
69    Default,
70    Clone,
71    PartialEq,
72    Eq,
73    Serialize,
74    Deserialize,
75    derive_more::Deref,
76    derive_more::DerefMut,
77)]
78pub struct Stats {
79    /// Counters
80    #[deref]
81    #[deref_mut]
82    pub counters: RequestCounters,
83    /// The time it took to transfer the data
84    pub elapsed: Duration,
85}
86
87impl Stats {
88    /// Transfer rate in megabits per second
89    pub fn mbits(&self) -> f64 {
90        let data_len_bit = self.total_bytes_read() * 8;
91        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
92    }
93
94    pub fn total_bytes_read(&self) -> u64 {
95        self.payload_bytes_read + self.other_bytes_read
96    }
97
98    pub fn combine(&mut self, that: &Stats) {
99        self.payload_bytes_written += that.payload_bytes_written;
100        self.other_bytes_written += that.other_bytes_written;
101        self.payload_bytes_read += that.payload_bytes_read;
102        self.other_bytes_read += that.other_bytes_read;
103        self.elapsed += that.elapsed;
104    }
105}
106
107/// Finite state machine for get responses.
108///
109/// This is the low level API for getting data from a peer.
110#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
111pub mod fsm {
112    use std::{io, result};
113
114    use bao_tree::{
115        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
116        BaoTree, ChunkRanges, TreeNode,
117    };
118    use derive_more::From;
119    use iroh::endpoint::Connection;
120    use iroh_io::AsyncSliceWriter;
121
122    use super::*;
123    use crate::{
124        get::get_error::BadRequestSnafu,
125        protocol::{
126            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
127        },
128        util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
129    };
130
131    self_cell::self_cell! {
132        struct RangesIterInner {
133            owner: ChunkRangesSeq,
134            #[not_covariant]
135            dependent: NonEmptyRequestRangeSpecIter,
136        }
137    }
138
139    /// The entry point of the get response machine
140    pub fn start(
141        connection: Connection,
142        request: GetRequest,
143        counters: RequestCounters,
144    ) -> AtInitial {
145        AtInitial::new(connection, request, counters)
146    }
147
148    /// Start with a get many request. Todo: turn this into distinct states.
149    pub async fn start_get_many(
150        connection: Connection,
151        request: GetManyRequest,
152        counters: RequestCounters,
153    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
154        let start = Instant::now();
155        let (mut writer, reader) = connection
156            .open_bi()
157            .await
158            .map_err(|e| OpenSnafu.into_error(e.into()))?;
159        let request = Request::GetMany(request);
160        let request_bytes = postcard::to_stdvec(&request)
161            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
162        writer
163            .send_bytes(request_bytes.into())
164            .await
165            .context(connected_next_error::WriteSnafu)?;
166        let Request::GetMany(request) = request else {
167            unreachable!();
168        };
169        let mut ranges_iter = RangesIter::new(request.ranges.clone());
170        let first_item = ranges_iter.next();
171        let misc = Box::new(Misc {
172            counters,
173            start,
174            ranges_iter,
175        });
176        Ok(match first_item {
177            Some((child_offset, child_ranges)) => Ok(AtStartChild {
178                ranges: child_ranges,
179                reader,
180                misc,
181                offset: child_offset,
182            }),
183            None => Err(AtClosing::new(misc, reader, true)),
184        })
185    }
186
187    /// Owned iterator for the ranges in a request
188    ///
189    /// We need an owned iterator for a fsm style API, otherwise we would have
190    /// to drag a lifetime around every single state.
191    struct RangesIter(RangesIterInner);
192
193    impl fmt::Debug for RangesIter {
194        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195            f.debug_struct("RangesIter").finish()
196        }
197    }
198
199    impl RangesIter {
200        pub fn new(owner: ChunkRangesSeq) -> Self {
201            Self(RangesIterInner::new(owner, |owner| {
202                owner.iter_non_empty_infinite()
203            }))
204        }
205
206        pub fn offset(&self) -> u64 {
207            self.0.with_dependent(|_owner, iter| iter.offset())
208        }
209    }
210
211    impl Iterator for RangesIter {
212        type Item = (u64, ChunkRanges);
213
214        fn next(&mut self) -> Option<Self::Item> {
215            self.0.with_dependent_mut(|_owner, iter| {
216                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
217            })
218        }
219    }
220
221    /// Initial state of the get response machine
222    #[derive(Debug)]
223    pub struct AtInitial {
224        connection: Connection,
225        request: GetRequest,
226        counters: RequestCounters,
227    }
228
229    impl AtInitial {
230        /// Create a new get response
231        ///
232        /// `connection` is an existing connection
233        /// `request` is the request to be sent
234        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
235            Self {
236                connection,
237                request,
238                counters,
239            }
240        }
241
242        /// Initiate a new bidi stream to use for the get response
243        pub async fn next(self) -> Result<AtConnected, InitialNextError> {
244            let start = Instant::now();
245            let (writer, reader) = self
246                .connection
247                .open_bi()
248                .await
249                .map_err(|e| OpenSnafu.into_error(e.into()))?;
250            Ok(AtConnected {
251                start,
252                reader,
253                writer,
254                request: self.request,
255                counters: self.counters,
256            })
257        }
258    }
259
260    /// Error that you can get from [`AtConnected::next`]
261    #[common_fields({
262        backtrace: Option<Backtrace>,
263        #[snafu(implicit)]
264        span_trace: SpanTrace,
265    })]
266    #[allow(missing_docs)]
267    #[derive(Debug, Snafu)]
268    #[non_exhaustive]
269    pub enum InitialNextError {
270        Open { source: io::Error },
271    }
272
273    /// State of the get response machine after the handshake has been sent
274    #[derive(Debug)]
275    pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
276        start: Instant,
277        reader: R,
278        writer: W,
279        request: GetRequest,
280        counters: RequestCounters,
281    }
282
283    /// Possible next states after the handshake has been sent
284    #[derive(Debug, From)]
285    pub enum ConnectedNext<R: RecvStream = DefaultReader> {
286        /// First response is either a collection or a single blob
287        StartRoot(AtStartRoot<R>),
288        /// First response is a child
289        StartChild(AtStartChild<R>),
290        /// Request is empty
291        Closing(AtClosing<R>),
292    }
293
294    /// Error that you can get from [`AtConnected::next`]
295    #[common_fields({
296        backtrace: Option<Backtrace>,
297        #[snafu(implicit)]
298        span_trace: SpanTrace,
299    })]
300    #[allow(missing_docs)]
301    #[derive(Debug, Snafu)]
302    #[snafu(module)]
303    #[non_exhaustive]
304    pub enum ConnectedNextError {
305        /// Error when serializing the request
306        #[snafu(display("postcard ser: {source}"))]
307        PostcardSer { source: postcard::Error },
308        /// The serialized request is too long to be sent
309        #[snafu(display("request too big"))]
310        RequestTooBig {},
311        /// Error when writing the request to the [`SendStream`].
312        #[snafu(display("write: {source}"))]
313        Write { source: io::Error },
314    }
315
316    impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
317        pub fn new(
318            start: Instant,
319            reader: R,
320            writer: W,
321            request: GetRequest,
322            counters: RequestCounters,
323        ) -> Self {
324            Self {
325                start,
326                reader,
327                writer,
328                request,
329                counters,
330            }
331        }
332
333        /// Send the request and move to the next state
334        ///
335        /// The next state will be either `StartRoot` or `StartChild` depending on whether
336        /// the request requests part of the collection or not.
337        ///
338        /// If the request is empty, this can also move directly to `Finished`.
339        pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
340            let Self {
341                start,
342                reader,
343                mut writer,
344                mut request,
345                mut counters,
346            } = self;
347            // 1. Send Request
348            counters.other_bytes_written += {
349                debug!("sending request");
350                let wrapped = Request::Get(request);
351                let request_bytes = postcard::to_stdvec(&wrapped)
352                    .context(connected_next_error::PostcardSerSnafu)?;
353                let Request::Get(x) = wrapped else {
354                    unreachable!();
355                };
356                request = x;
357
358                if request_bytes.len() > MAX_MESSAGE_SIZE {
359                    return Err(connected_next_error::RequestTooBigSnafu.build());
360                }
361
362                // write the request itself
363                let len = request_bytes.len() as u64;
364                writer
365                    .send_bytes(request_bytes.into())
366                    .await
367                    .context(connected_next_error::WriteSnafu)?;
368                writer
369                    .sync()
370                    .await
371                    .context(connected_next_error::WriteSnafu)?;
372                len
373            };
374
375            // 2. Finish writing before expecting a response
376            drop(writer);
377
378            let hash = request.hash;
379            let ranges_iter = RangesIter::new(request.ranges);
380            // this is in a box so we don't have to memcpy it on every state transition
381            let mut misc = Box::new(Misc {
382                counters,
383                start,
384                ranges_iter,
385            });
386            Ok(match misc.ranges_iter.next() {
387                Some((offset, ranges)) => {
388                    if offset == 0 {
389                        AtStartRoot {
390                            reader,
391                            ranges,
392                            misc,
393                            hash,
394                        }
395                        .into()
396                    } else {
397                        AtStartChild {
398                            reader,
399                            ranges,
400                            misc,
401                            offset,
402                        }
403                        .into()
404                    }
405                }
406                None => AtClosing::new(misc, reader, true).into(),
407            })
408        }
409    }
410
411    /// State of the get response when we start reading a collection
412    #[derive(Debug)]
413    pub struct AtStartRoot<R: RecvStream = DefaultReader> {
414        ranges: ChunkRanges,
415        reader: R,
416        misc: Box<Misc>,
417        hash: Hash,
418    }
419
420    /// State of the get response when we start reading a child
421    #[derive(Debug)]
422    pub struct AtStartChild<R: RecvStream = DefaultReader> {
423        ranges: ChunkRanges,
424        reader: R,
425        misc: Box<Misc>,
426        offset: u64,
427    }
428
429    impl<R: RecvStream> AtStartChild<R> {
430        /// The offset of the child we are currently reading
431        ///
432        /// This must be used to determine the hash needed to call next.
433        /// If this is larger than the number of children in the collection,
434        /// you can call finish to stop reading the response.
435        pub fn offset(&self) -> u64 {
436            self.offset
437        }
438
439        /// The ranges we have requested for the child
440        pub fn ranges(&self) -> &ChunkRanges {
441            &self.ranges
442        }
443
444        /// Go into the next state, reading the header
445        ///
446        /// This requires passing in the hash of the child for validation
447        pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
448            AtBlobHeader {
449                reader: self.reader,
450                ranges: self.ranges,
451                misc: self.misc,
452                hash,
453            }
454        }
455
456        /// Finish the get response without reading further
457        ///
458        /// This is used if you know that there are no more children from having
459        /// read the collection, or when you want to stop reading the response
460        /// early.
461        pub fn finish(self) -> AtClosing<R> {
462            AtClosing::new(self.misc, self.reader, false)
463        }
464    }
465
466    impl<R: RecvStream> AtStartRoot<R> {
467        /// The ranges we have requested for the child
468        pub fn ranges(&self) -> &ChunkRanges {
469            &self.ranges
470        }
471
472        /// Hash of the root blob
473        pub fn hash(&self) -> Hash {
474            self.hash
475        }
476
477        /// Go into the next state, reading the header
478        ///
479        /// For the collection we already know the hash, since it was part of the request
480        pub fn next(self) -> AtBlobHeader<R> {
481            AtBlobHeader {
482                reader: self.reader,
483                ranges: self.ranges,
484                hash: self.hash,
485                misc: self.misc,
486            }
487        }
488
489        /// Finish the get response without reading further
490        pub fn finish(self) -> AtClosing<R> {
491            AtClosing::new(self.misc, self.reader, false)
492        }
493    }
494
495    /// State before reading a size header
496    #[derive(Debug)]
497    pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
498        ranges: ChunkRanges,
499        reader: R,
500        misc: Box<Misc>,
501        hash: Hash,
502    }
503
504    /// Error that you can get from [`AtBlobHeader::next`]
505    #[common_fields({
506        backtrace: Option<Backtrace>,
507        #[snafu(implicit)]
508        span_trace: SpanTrace,
509    })]
510    #[non_exhaustive]
511    #[derive(Debug, Snafu)]
512    #[snafu(module)]
513    pub enum AtBlobHeaderNextError {
514        /// Eof when reading the size header
515        ///
516        /// This indicates that the provider does not have the requested data.
517        #[snafu(display("not found"))]
518        NotFound {},
519        /// Generic io error
520        #[snafu(display("io: {source}"))]
521        Read { source: io::Error },
522    }
523
524    impl From<AtBlobHeaderNextError> for io::Error {
525        fn from(cause: AtBlobHeaderNextError) -> Self {
526            match cause {
527                AtBlobHeaderNextError::NotFound { .. } => {
528                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
529                }
530                AtBlobHeaderNextError::Read { source, .. } => source,
531            }
532        }
533    }
534
535    impl<R: RecvStream> AtBlobHeader<R> {
536        /// Read the size header, returning it and going into the `Content` state.
537        pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
538            let mut size = [0; 8];
539            self.reader.recv_exact(&mut size).await.map_err(|cause| {
540                if cause.kind() == io::ErrorKind::UnexpectedEof {
541                    at_blob_header_next_error::NotFoundSnafu.build()
542                } else {
543                    at_blob_header_next_error::ReadSnafu.into_error(cause)
544                }
545            })?;
546            self.misc.other_bytes_read += 8;
547            let size = u64::from_le_bytes(size);
548            let stream = ResponseDecoder::new(
549                self.hash.into(),
550                self.ranges,
551                BaoTree::new(size, IROH_BLOCK_SIZE),
552                RecvStreamAsyncStreamReader::new(self.reader),
553            );
554            Ok((
555                AtBlobContent {
556                    stream,
557                    misc: self.misc,
558                },
559                size,
560            ))
561        }
562
563        /// Drain the response and throw away the result
564        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
565            let (content, _size) = self.next().await?;
566            content.drain().await
567        }
568
569        /// Concatenate the entire response into a vec
570        ///
571        /// For a request that does not request the complete blob, this will just
572        /// concatenate the ranges that were requested.
573        pub async fn concatenate_into_vec(
574            self,
575        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
576            let (content, _size) = self.next().await?;
577            content.concatenate_into_vec().await
578        }
579
580        /// Write the entire blob to a slice writer.
581        pub async fn write_all<D: AsyncSliceWriter>(
582            self,
583            data: D,
584        ) -> result::Result<AtEndBlob<R>, DecodeError> {
585            let (content, _size) = self.next().await?;
586            let res = content.write_all(data).await?;
587            Ok(res)
588        }
589
590        /// Write the entire blob to a slice writer and to an optional outboard.
591        ///
592        /// The outboard is only written to if the blob is larger than a single
593        /// chunk group.
594        pub async fn write_all_with_outboard<D, O>(
595            self,
596            outboard: Option<O>,
597            data: D,
598        ) -> result::Result<AtEndBlob<R>, DecodeError>
599        where
600            D: AsyncSliceWriter,
601            O: OutboardMut,
602        {
603            let (content, _size) = self.next().await?;
604            let res = content.write_all_with_outboard(outboard, data).await?;
605            Ok(res)
606        }
607
608        /// The hash of the blob we are reading.
609        pub fn hash(&self) -> Hash {
610            self.hash
611        }
612
613        /// The ranges we have requested for the current hash.
614        pub fn ranges(&self) -> &ChunkRanges {
615            &self.ranges
616        }
617
618        /// The current offset of the blob we are reading.
619        pub fn offset(&self) -> u64 {
620            self.misc.ranges_iter.offset()
621        }
622    }
623
624    /// State while we are reading content
625    #[derive(Debug)]
626    pub struct AtBlobContent<R: RecvStream = DefaultReader> {
627        stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
628        misc: Box<Misc>,
629    }
630
631    /// Decode error that you can get once you have sent the request and are
632    /// decoding the response, e.g. from [`AtBlobContent::next`].
633    ///
634    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
635    /// that we are reading from a [`RecvStream`], so read errors will be
636    /// propagated as [`DecodeError::Read`], containing a [`io::Error`].
637    ///
638    /// When the provider finds that it does not have a chunk that we requested,
639    /// or that the chunk is invalid, it will stop sending data without producing
640    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
641    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
642    /// is missing but the connection as well that the provider is otherwise healthy.
643    ///
644    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
645    /// variants indicate that the provider has sent us invalid data. A well-behaved
646    /// provider should never do this, so this is an indication that the provider is
647    /// not behaving correctly.
648    #[common_fields({
649        backtrace: Option<Backtrace>,
650        #[snafu(implicit)]
651        span_trace: SpanTrace,
652    })]
653    #[non_exhaustive]
654    #[derive(Debug, Snafu)]
655    #[snafu(module)]
656    pub enum DecodeError {
657        /// A chunk was not found or invalid, so the provider stopped sending data
658        #[snafu(display("not found"))]
659        ChunkNotFound {},
660        /// A parent was not found or invalid, so the provider stopped sending data
661        #[snafu(display("parent not found {node:?}"))]
662        ParentNotFound { node: TreeNode },
663        /// A parent was not found or invalid, so the provider stopped sending data
664        #[snafu(display("chunk not found {num}"))]
665        LeafNotFound { num: ChunkNum },
666        /// The hash of a parent did not match the expected hash
667        #[snafu(display("parent hash mismatch: {node:?}"))]
668        ParentHashMismatch { node: TreeNode },
669        /// The hash of a leaf did not match the expected hash
670        #[snafu(display("leaf hash mismatch: {num}"))]
671        LeafHashMismatch { num: ChunkNum },
672        /// Error when reading from the stream
673        #[snafu(display("read: {source}"))]
674        Read { source: io::Error },
675        /// A generic io error
676        #[snafu(display("io: {source}"))]
677        Write { source: io::Error },
678    }
679
680    impl DecodeError {
681        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
682            decode_error::LeafHashMismatchSnafu { num }.build()
683        }
684    }
685
686    impl From<AtBlobHeaderNextError> for DecodeError {
687        fn from(cause: AtBlobHeaderNextError) -> Self {
688            match cause {
689                AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
690                AtBlobHeaderNextError::Read { source, .. } => {
691                    decode_error::ReadSnafu.into_error(source)
692                }
693            }
694        }
695    }
696
697    impl From<DecodeError> for io::Error {
698        fn from(cause: DecodeError) -> Self {
699            match cause {
700                DecodeError::ParentNotFound { .. } => {
701                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
702                }
703                DecodeError::LeafNotFound { .. } => {
704                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
705                }
706                DecodeError::Read { source, .. } => source,
707                DecodeError::Write { source, .. } => source,
708                _ => io::Error::other(cause),
709            }
710        }
711    }
712
713    impl From<bao_tree::io::DecodeError> for DecodeError {
714        fn from(value: bao_tree::io::DecodeError) -> Self {
715            match value {
716                bao_tree::io::DecodeError::ParentNotFound(node) => {
717                    decode_error::ParentNotFoundSnafu { node }.build()
718                }
719                bao_tree::io::DecodeError::LeafNotFound(num) => {
720                    decode_error::LeafNotFoundSnafu { num }.build()
721                }
722                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
723                    decode_error::ParentHashMismatchSnafu { node }.build()
724                }
725                bao_tree::io::DecodeError::LeafHashMismatch(num) => {
726                    decode_error::LeafHashMismatchSnafu { num }.build()
727                }
728                bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
729            }
730        }
731    }
732
733    /// The next state after reading a content item
734    #[derive(Debug, From)]
735    pub enum BlobContentNext<R: RecvStream> {
736        /// We expect more content
737        More(
738            (
739                AtBlobContent<R>,
740                result::Result<BaoContentItem, DecodeError>,
741            ),
742        ),
743        /// We are done with this blob
744        Done(AtEndBlob<R>),
745    }
746
747    impl<R: RecvStream> AtBlobContent<R> {
748        /// Read the next item, either content, an error, or the end of the blob
749        pub async fn next(self) -> BlobContentNext<R> {
750            match self.stream.next().await {
751                ResponseDecoderNext::More((stream, res)) => {
752                    let mut next = Self { stream, ..self };
753                    let res = res.map_err(DecodeError::from);
754                    match &res {
755                        Ok(BaoContentItem::Parent(_)) => {
756                            next.misc.other_bytes_read += 64;
757                        }
758                        Ok(BaoContentItem::Leaf(leaf)) => {
759                            next.misc.payload_bytes_read += leaf.data.len() as u64;
760                        }
761                        _ => {}
762                    }
763                    BlobContentNext::More((next, res))
764                }
765                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
766                    stream: stream.into_inner(),
767                    misc: self.misc,
768                }),
769            }
770        }
771
772        /// The geometry of the tree we are currently reading.
773        pub fn tree(&self) -> bao_tree::BaoTree {
774            self.stream.tree()
775        }
776
777        /// The hash of the blob we are reading.
778        pub fn hash(&self) -> Hash {
779            (*self.stream.hash()).into()
780        }
781
782        /// The current offset of the blob we are reading.
783        pub fn offset(&self) -> u64 {
784            self.misc.ranges_iter.offset()
785        }
786
787        /// Current stats
788        pub fn stats(&self) -> Stats {
789            Stats {
790                counters: self.misc.counters,
791                elapsed: self.misc.start.elapsed(),
792            }
793        }
794
795        /// Drain the response and throw away the result
796        pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
797            let mut content = self;
798            loop {
799                match content.next().await {
800                    BlobContentNext::More((content1, res)) => {
801                        let _ = res?;
802                        content = content1;
803                    }
804                    BlobContentNext::Done(end) => {
805                        break Ok(end);
806                    }
807                }
808            }
809        }
810
811        /// Concatenate the entire response into a vec
812        pub async fn concatenate_into_vec(
813            self,
814        ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
815            let mut res = Vec::with_capacity(1024);
816            let mut curr = self;
817            let done = loop {
818                match curr.next().await {
819                    BlobContentNext::More((next, data)) => {
820                        if let BaoContentItem::Leaf(leaf) = data? {
821                            res.extend_from_slice(&leaf.data);
822                        }
823                        curr = next;
824                    }
825                    BlobContentNext::Done(done) => {
826                        // we are done with the root blob
827                        break done;
828                    }
829                }
830            };
831            Ok((done, res))
832        }
833
834        /// Write the entire blob to a slice writer and to an optional outboard.
835        ///
836        /// The outboard is only written to if the blob is larger than a single
837        /// chunk group.
838        pub async fn write_all_with_outboard<D, O>(
839            self,
840            mut outboard: Option<O>,
841            mut data: D,
842        ) -> result::Result<AtEndBlob<R>, DecodeError>
843        where
844            D: AsyncSliceWriter,
845            O: OutboardMut,
846        {
847            let mut content = self;
848            loop {
849                match content.next().await {
850                    BlobContentNext::More((content1, item)) => {
851                        content = content1;
852                        match item? {
853                            BaoContentItem::Parent(parent) => {
854                                if let Some(outboard) = outboard.as_mut() {
855                                    outboard
856                                        .save(parent.node, &parent.pair)
857                                        .await
858                                        .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
859                                }
860                            }
861                            BaoContentItem::Leaf(leaf) => {
862                                data.write_bytes_at(leaf.offset, leaf.data)
863                                    .await
864                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
865                            }
866                        }
867                    }
868                    BlobContentNext::Done(end) => {
869                        return Ok(end);
870                    }
871                }
872            }
873        }
874
875        /// Write the entire blob to a slice writer.
876        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
877        where
878            D: AsyncSliceWriter,
879        {
880            let mut content = self;
881            loop {
882                match content.next().await {
883                    BlobContentNext::More((content1, item)) => {
884                        content = content1;
885                        match item? {
886                            BaoContentItem::Parent(_) => {}
887                            BaoContentItem::Leaf(leaf) => {
888                                data.write_bytes_at(leaf.offset, leaf.data)
889                                    .await
890                                    .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
891                            }
892                        }
893                    }
894                    BlobContentNext::Done(end) => {
895                        return Ok(end);
896                    }
897                }
898            }
899        }
900
901        /// Immediately finish the get response without reading further
902        pub fn finish(self) -> AtClosing<R> {
903            AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
904        }
905    }
906
907    /// State after we have read all the content for a blob
908    #[derive(Debug)]
909    pub struct AtEndBlob<R: RecvStream = DefaultReader> {
910        stream: R,
911        misc: Box<Misc>,
912    }
913
914    /// The next state after the end of a blob
915    #[derive(Debug, From)]
916    pub enum EndBlobNext<R: RecvStream = DefaultReader> {
917        /// Response is expected to have more children
918        MoreChildren(AtStartChild<R>),
919        /// No more children expected
920        Closing(AtClosing<R>),
921    }
922
923    impl<R: RecvStream> AtEndBlob<R> {
924        /// Read the next child, or finish
925        pub fn next(mut self) -> EndBlobNext<R> {
926            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
927                AtStartChild {
928                    reader: self.stream,
929                    offset,
930                    ranges,
931                    misc: self.misc,
932                }
933                .into()
934            } else {
935                AtClosing::new(self.misc, self.stream, true).into()
936            }
937        }
938    }
939
940    /// State when finishing the get response
941    #[derive(Debug)]
942    pub struct AtClosing<R: RecvStream = DefaultReader> {
943        misc: Box<Misc>,
944        reader: R,
945        check_extra_data: bool,
946    }
947
948    impl<R: RecvStream> AtClosing<R> {
949        fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
950            Self {
951                misc,
952                reader,
953                check_extra_data,
954            }
955        }
956
957        /// Finish the get response, returning statistics
958        pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
959            // Shut down the stream
960            let mut reader = self.reader;
961            if self.check_extra_data {
962                let rest = reader.recv_bytes(1).await?;
963                if !rest.is_empty() {
964                    error!("Unexpected extra data at the end of the stream");
965                }
966            }
967            Ok(Stats {
968                counters: self.misc.counters,
969                elapsed: self.misc.start.elapsed(),
970            })
971        }
972    }
973
974    /// Error that you can get from [`AtBlobHeader::next`]
975    #[common_fields({
976        backtrace: Option<Backtrace>,
977        #[snafu(implicit)]
978        span_trace: SpanTrace,
979    })]
980    #[non_exhaustive]
981    #[derive(Debug, Snafu)]
982    #[snafu(module)]
983    pub enum AtClosingNextError {
984        /// Generic io error
985        #[snafu(transparent)]
986        Read { source: io::Error },
987    }
988
989    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
990    pub struct RequestCounters {
991        /// payload bytes written
992        pub payload_bytes_written: u64,
993        /// request, hash pair and size bytes written
994        pub other_bytes_written: u64,
995        /// payload bytes read
996        pub payload_bytes_read: u64,
997        /// hash pair and size bytes read
998        pub other_bytes_read: u64,
999    }
1000
1001    /// Stuff we need to hold on to while going through the machine states
1002    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1003    struct Misc {
1004        /// start time for statistics
1005        start: Instant,
1006        /// counters
1007        #[deref]
1008        #[deref_mut]
1009        counters: RequestCounters,
1010        /// iterator over the ranges of the collection and the children
1011        ranges_iter: RangesIter,
1012    }
1013}