iroh_blobs/
get.rs

1//! The low level client side API
2//!
3//! Note that while using this API directly is fine, a simpler way to get data
4//! to a store is to use the [`crate::api::remote`] API, in particular the
5//! [`crate::api::remote::Remote::fetch`] function to download data to your
6//! local store.
7//!
8//! To get data, create a connection using an [`iroh::Endpoint`].
9//!
10//! Create a [`crate::protocol::GetRequest`] describing the data you want to get.
11//!
12//! Then create a state machine using [fsm::start] and
13//! drive it to completion by calling next on each state.
14//!
15//! For some states you have to provide additional arguments when calling next,
16//! or you can choose to finish early.
17//!
18//! [iroh]: https://docs.rs/iroh
19use std::{
20    error::Error,
21    fmt::{self, Debug},
22    time::{Duration, Instant},
23};
24
25use anyhow::Result;
26use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
27use fsm::RequestCounters;
28use iroh::endpoint::{self, RecvStream, SendStream};
29use iroh_io::TokioStreamReader;
30use n0_snafu::SpanTrace;
31use nested_enum_utils::common_fields;
32use serde::{Deserialize, Serialize};
33use snafu::{Backtrace, IntoError, ResultExt, Snafu};
34use tracing::{debug, error};
35
36use crate::{protocol::ChunkRangesSeq, store::IROH_BLOCK_SIZE, Hash};
37
38mod error;
39pub mod request;
40pub(crate) use error::{BadRequestSnafu, LocalFailureSnafu};
41pub use error::{GetError, GetResult};
42
43type WrappedRecvStream = TokioStreamReader<RecvStream>;
44
45/// Stats about the transfer.
46#[derive(
47    Debug,
48    Default,
49    Clone,
50    PartialEq,
51    Eq,
52    Serialize,
53    Deserialize,
54    derive_more::Deref,
55    derive_more::DerefMut,
56)]
57pub struct Stats {
58    /// Counters
59    #[deref]
60    #[deref_mut]
61    pub counters: RequestCounters,
62    /// The time it took to transfer the data
63    pub elapsed: Duration,
64}
65
66impl Stats {
67    /// Transfer rate in megabits per second
68    pub fn mbits(&self) -> f64 {
69        let data_len_bit = self.total_bytes_read() * 8;
70        data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
71    }
72
73    pub fn total_bytes_read(&self) -> u64 {
74        self.payload_bytes_read + self.other_bytes_read
75    }
76
77    pub fn combine(&mut self, that: &Stats) {
78        self.payload_bytes_written += that.payload_bytes_written;
79        self.other_bytes_written += that.other_bytes_written;
80        self.payload_bytes_read += that.payload_bytes_read;
81        self.other_bytes_read += that.other_bytes_read;
82        self.elapsed += that.elapsed;
83    }
84}
85
86/// Finite state machine for get responses.
87///
88/// This is the low level API for getting data from a peer.
89#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
90pub mod fsm {
91    use std::{io, result};
92
93    use bao_tree::{
94        io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
95        BaoTree, ChunkRanges, TreeNode,
96    };
97    use derive_more::From;
98    use iroh::endpoint::Connection;
99    use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
100
101    use super::*;
102    use crate::{
103        get::error::BadRequestSnafu,
104        protocol::{
105            GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
106        },
107    };
108
109    self_cell::self_cell! {
110        struct RangesIterInner {
111            owner: ChunkRangesSeq,
112            #[not_covariant]
113            dependent: NonEmptyRequestRangeSpecIter,
114        }
115    }
116
117    /// The entry point of the get response machine
118    pub fn start(
119        connection: Connection,
120        request: GetRequest,
121        counters: RequestCounters,
122    ) -> AtInitial {
123        AtInitial::new(connection, request, counters)
124    }
125
126    /// Start with a get many request. Todo: turn this into distinct states.
127    pub async fn start_get_many(
128        connection: Connection,
129        request: GetManyRequest,
130        counters: RequestCounters,
131    ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
132        let start = Instant::now();
133        let (mut writer, reader) = connection.open_bi().await?;
134        let request = Request::GetMany(request);
135        let request_bytes = postcard::to_stdvec(&request)
136            .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
137        writer.write_all(&request_bytes).await?;
138        writer.finish()?;
139        let Request::GetMany(request) = request else {
140            unreachable!();
141        };
142        let reader = TokioStreamReader::new(reader);
143        let mut ranges_iter = RangesIter::new(request.ranges.clone());
144        let first_item = ranges_iter.next();
145        let misc = Box::new(Misc {
146            counters,
147            start,
148            ranges_iter,
149        });
150        Ok(match first_item {
151            Some((child_offset, child_ranges)) => Ok(AtStartChild {
152                ranges: child_ranges,
153                reader,
154                misc,
155                offset: child_offset,
156            }),
157            None => Err(AtClosing::new(misc, reader, true)),
158        })
159    }
160
161    /// Owned iterator for the ranges in a request
162    ///
163    /// We need an owned iterator for a fsm style API, otherwise we would have
164    /// to drag a lifetime around every single state.
165    struct RangesIter(RangesIterInner);
166
167    impl fmt::Debug for RangesIter {
168        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169            f.debug_struct("RangesIter").finish()
170        }
171    }
172
173    impl RangesIter {
174        pub fn new(owner: ChunkRangesSeq) -> Self {
175            Self(RangesIterInner::new(owner, |owner| {
176                owner.iter_non_empty_infinite()
177            }))
178        }
179
180        pub fn offset(&self) -> u64 {
181            self.0.with_dependent(|_owner, iter| iter.offset())
182        }
183    }
184
185    impl Iterator for RangesIter {
186        type Item = (u64, ChunkRanges);
187
188        fn next(&mut self) -> Option<Self::Item> {
189            self.0.with_dependent_mut(|_owner, iter| {
190                iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
191            })
192        }
193    }
194
195    /// Initial state of the get response machine
196    #[derive(Debug)]
197    pub struct AtInitial {
198        connection: Connection,
199        request: GetRequest,
200        counters: RequestCounters,
201    }
202
203    impl AtInitial {
204        /// Create a new get response
205        ///
206        /// `connection` is an existing connection
207        /// `request` is the request to be sent
208        pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
209            Self {
210                connection,
211                request,
212                counters,
213            }
214        }
215
216        /// Initiate a new bidi stream to use for the get response
217        pub async fn next(self) -> Result<AtConnected, endpoint::ConnectionError> {
218            let start = Instant::now();
219            let (writer, reader) = self.connection.open_bi().await?;
220            let reader = TokioStreamReader::new(reader);
221            Ok(AtConnected {
222                start,
223                reader,
224                writer,
225                request: self.request,
226                counters: self.counters,
227            })
228        }
229    }
230
231    /// State of the get response machine after the handshake has been sent
232    #[derive(Debug)]
233    pub struct AtConnected {
234        start: Instant,
235        reader: WrappedRecvStream,
236        writer: SendStream,
237        request: GetRequest,
238        counters: RequestCounters,
239    }
240
241    /// Possible next states after the handshake has been sent
242    #[derive(Debug, From)]
243    pub enum ConnectedNext {
244        /// First response is either a collection or a single blob
245        StartRoot(AtStartRoot),
246        /// First response is a child
247        StartChild(AtStartChild),
248        /// Request is empty
249        Closing(AtClosing),
250    }
251
252    /// Error that you can get from [`AtConnected::next`]
253    #[common_fields({
254        backtrace: Option<Backtrace>,
255        #[snafu(implicit)]
256        span_trace: SpanTrace,
257    })]
258    #[allow(missing_docs)]
259    #[derive(Debug, Snafu)]
260    #[non_exhaustive]
261    pub enum ConnectedNextError {
262        /// Error when serializing the request
263        #[snafu(display("postcard ser: {source}"))]
264        PostcardSer { source: postcard::Error },
265        /// The serialized request is too long to be sent
266        #[snafu(display("request too big"))]
267        RequestTooBig {},
268        /// Error when writing the request to the [`SendStream`].
269        #[snafu(display("write: {source}"))]
270        Write { source: quinn::WriteError },
271        /// Quic connection is closed.
272        #[snafu(display("closed"))]
273        Closed { source: quinn::ClosedStream },
274        /// A generic io error
275        #[snafu(transparent)]
276        Io { source: io::Error },
277    }
278
279    impl AtConnected {
280        /// Send the request and move to the next state
281        ///
282        /// The next state will be either `StartRoot` or `StartChild` depending on whether
283        /// the request requests part of the collection or not.
284        ///
285        /// If the request is empty, this can also move directly to `Finished`.
286        pub async fn next(self) -> Result<ConnectedNext, ConnectedNextError> {
287            let Self {
288                start,
289                reader,
290                mut writer,
291                mut request,
292                mut counters,
293            } = self;
294            // 1. Send Request
295            counters.other_bytes_written += {
296                debug!("sending request");
297                let wrapped = Request::Get(request);
298                let request_bytes = postcard::to_stdvec(&wrapped).context(PostcardSerSnafu)?;
299                let Request::Get(x) = wrapped else {
300                    unreachable!();
301                };
302                request = x;
303
304                if request_bytes.len() > MAX_MESSAGE_SIZE {
305                    return Err(RequestTooBigSnafu.build());
306                }
307
308                // write the request itself
309                writer.write_all(&request_bytes).await.context(WriteSnafu)?;
310                request_bytes.len() as u64
311            };
312
313            // 2. Finish writing before expecting a response
314            writer.finish().context(ClosedSnafu)?;
315
316            let hash = request.hash;
317            let ranges_iter = RangesIter::new(request.ranges);
318            // this is in a box so we don't have to memcpy it on every state transition
319            let mut misc = Box::new(Misc {
320                counters,
321                start,
322                ranges_iter,
323            });
324            Ok(match misc.ranges_iter.next() {
325                Some((offset, ranges)) => {
326                    if offset == 0 {
327                        AtStartRoot {
328                            reader,
329                            ranges,
330                            misc,
331                            hash,
332                        }
333                        .into()
334                    } else {
335                        AtStartChild {
336                            reader,
337                            ranges,
338                            misc,
339                            offset,
340                        }
341                        .into()
342                    }
343                }
344                None => AtClosing::new(misc, reader, true).into(),
345            })
346        }
347    }
348
349    /// State of the get response when we start reading a collection
350    #[derive(Debug)]
351    pub struct AtStartRoot {
352        ranges: ChunkRanges,
353        reader: TokioStreamReader<RecvStream>,
354        misc: Box<Misc>,
355        hash: Hash,
356    }
357
358    /// State of the get response when we start reading a child
359    #[derive(Debug)]
360    pub struct AtStartChild {
361        ranges: ChunkRanges,
362        reader: TokioStreamReader<RecvStream>,
363        misc: Box<Misc>,
364        offset: u64,
365    }
366
367    impl AtStartChild {
368        /// The offset of the child we are currently reading
369        ///
370        /// This must be used to determine the hash needed to call next.
371        /// If this is larger than the number of children in the collection,
372        /// you can call finish to stop reading the response.
373        pub fn offset(&self) -> u64 {
374            self.offset
375        }
376
377        /// The ranges we have requested for the child
378        pub fn ranges(&self) -> &ChunkRanges {
379            &self.ranges
380        }
381
382        /// Go into the next state, reading the header
383        ///
384        /// This requires passing in the hash of the child for validation
385        pub fn next(self, hash: Hash) -> AtBlobHeader {
386            AtBlobHeader {
387                reader: self.reader,
388                ranges: self.ranges,
389                misc: self.misc,
390                hash,
391            }
392        }
393
394        /// Finish the get response without reading further
395        ///
396        /// This is used if you know that there are no more children from having
397        /// read the collection, or when you want to stop reading the response
398        /// early.
399        pub fn finish(self) -> AtClosing {
400            AtClosing::new(self.misc, self.reader, false)
401        }
402    }
403
404    impl AtStartRoot {
405        /// The ranges we have requested for the child
406        pub fn ranges(&self) -> &ChunkRanges {
407            &self.ranges
408        }
409
410        /// Hash of the root blob
411        pub fn hash(&self) -> Hash {
412            self.hash
413        }
414
415        /// Go into the next state, reading the header
416        ///
417        /// For the collection we already know the hash, since it was part of the request
418        pub fn next(self) -> AtBlobHeader {
419            AtBlobHeader {
420                reader: self.reader,
421                ranges: self.ranges,
422                hash: self.hash,
423                misc: self.misc,
424            }
425        }
426
427        /// Finish the get response without reading further
428        pub fn finish(self) -> AtClosing {
429            AtClosing::new(self.misc, self.reader, false)
430        }
431    }
432
433    /// State before reading a size header
434    #[derive(Debug)]
435    pub struct AtBlobHeader {
436        ranges: ChunkRanges,
437        reader: TokioStreamReader<RecvStream>,
438        misc: Box<Misc>,
439        hash: Hash,
440    }
441
442    /// Error that you can get from [`AtBlobHeader::next`]
443    #[common_fields({
444        backtrace: Option<Backtrace>,
445        #[snafu(implicit)]
446        span_trace: SpanTrace,
447    })]
448    #[non_exhaustive]
449    #[derive(Debug, Snafu)]
450    pub enum AtBlobHeaderNextError {
451        /// Eof when reading the size header
452        ///
453        /// This indicates that the provider does not have the requested data.
454        #[snafu(display("not found"))]
455        NotFound {},
456        /// Quinn read error when reading the size header
457        #[snafu(display("read: {source}"))]
458        EndpointRead { source: endpoint::ReadError },
459        /// Generic io error
460        #[snafu(display("io: {source}"))]
461        Io { source: io::Error },
462    }
463
464    impl From<AtBlobHeaderNextError> for io::Error {
465        fn from(cause: AtBlobHeaderNextError) -> Self {
466            match cause {
467                AtBlobHeaderNextError::NotFound { .. } => {
468                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
469                }
470                AtBlobHeaderNextError::EndpointRead { source, .. } => source.into(),
471                AtBlobHeaderNextError::Io { source, .. } => source,
472            }
473        }
474    }
475
476    impl AtBlobHeader {
477        /// Read the size header, returning it and going into the `Content` state.
478        pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
479            let size = self.reader.read::<8>().await.map_err(|cause| {
480                if cause.kind() == io::ErrorKind::UnexpectedEof {
481                    NotFoundSnafu.build()
482                } else if let Some(e) = cause
483                    .get_ref()
484                    .and_then(|x| x.downcast_ref::<endpoint::ReadError>())
485                {
486                    EndpointReadSnafu.into_error(e.clone())
487                } else {
488                    IoSnafu.into_error(cause)
489                }
490            })?;
491            self.misc.other_bytes_read += 8;
492            let size = u64::from_le_bytes(size);
493            let stream = ResponseDecoder::new(
494                self.hash.into(),
495                self.ranges,
496                BaoTree::new(size, IROH_BLOCK_SIZE),
497                self.reader,
498            );
499            Ok((
500                AtBlobContent {
501                    stream,
502                    misc: self.misc,
503                },
504                size,
505            ))
506        }
507
508        /// Drain the response and throw away the result
509        pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
510            let (content, _size) = self.next().await?;
511            content.drain().await
512        }
513
514        /// Concatenate the entire response into a vec
515        ///
516        /// For a request that does not request the complete blob, this will just
517        /// concatenate the ranges that were requested.
518        pub async fn concatenate_into_vec(
519            self,
520        ) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
521            let (content, _size) = self.next().await?;
522            content.concatenate_into_vec().await
523        }
524
525        /// Write the entire blob to a slice writer.
526        pub async fn write_all<D: AsyncSliceWriter>(
527            self,
528            data: D,
529        ) -> result::Result<AtEndBlob, DecodeError> {
530            let (content, _size) = self.next().await?;
531            let res = content.write_all(data).await?;
532            Ok(res)
533        }
534
535        /// Write the entire blob to a slice writer and to an optional outboard.
536        ///
537        /// The outboard is only written to if the blob is larger than a single
538        /// chunk group.
539        pub async fn write_all_with_outboard<D, O>(
540            self,
541            outboard: Option<O>,
542            data: D,
543        ) -> result::Result<AtEndBlob, DecodeError>
544        where
545            D: AsyncSliceWriter,
546            O: OutboardMut,
547        {
548            let (content, _size) = self.next().await?;
549            let res = content.write_all_with_outboard(outboard, data).await?;
550            Ok(res)
551        }
552
553        /// The hash of the blob we are reading.
554        pub fn hash(&self) -> Hash {
555            self.hash
556        }
557
558        /// The ranges we have requested for the current hash.
559        pub fn ranges(&self) -> &ChunkRanges {
560            &self.ranges
561        }
562
563        /// The current offset of the blob we are reading.
564        pub fn offset(&self) -> u64 {
565            self.misc.ranges_iter.offset()
566        }
567    }
568
569    /// State while we are reading content
570    #[derive(Debug)]
571    pub struct AtBlobContent {
572        stream: ResponseDecoder<WrappedRecvStream>,
573        misc: Box<Misc>,
574    }
575
576    /// Decode error that you can get once you have sent the request and are
577    /// decoding the response, e.g. from [`AtBlobContent::next`].
578    ///
579    /// This is similar to [`bao_tree::io::DecodeError`], but takes into account
580    /// that we are reading from a [`RecvStream`], so read errors will be
581    /// propagated as [`DecodeError::Read`], containing a [`ReadError`].
582    /// This carries more concrete information about the error than an [`io::Error`].
583    ///
584    /// When the provider finds that it does not have a chunk that we requested,
585    /// or that the chunk is invalid, it will stop sending data without producing
586    /// an error. This is indicated by either the [`DecodeError::ParentNotFound`] or
587    /// [`DecodeError::LeafNotFound`] variant, which can be used to detect that data
588    /// is missing but the connection as well that the provider is otherwise healthy.
589    ///
590    /// The [`DecodeError::ParentHashMismatch`] and [`DecodeError::LeafHashMismatch`]
591    /// variants indicate that the provider has sent us invalid data. A well-behaved
592    /// provider should never do this, so this is an indication that the provider is
593    /// not behaving correctly.
594    ///
595    /// The [`DecodeError::DecodeIo`] variant is just a fallback for any other io error that
596    /// is not actually a [`DecodeError::Read`].
597    ///
598    /// [`ReadError`]: endpoint::ReadError
599    #[common_fields({
600        backtrace: Option<Backtrace>,
601        #[snafu(implicit)]
602        span_trace: SpanTrace,
603    })]
604    #[non_exhaustive]
605    #[derive(Debug, Snafu)]
606    pub enum DecodeError {
607        /// A chunk was not found or invalid, so the provider stopped sending data
608        #[snafu(display("not found"))]
609        ChunkNotFound {},
610        /// A parent was not found or invalid, so the provider stopped sending data
611        #[snafu(display("parent not found {node:?}"))]
612        ParentNotFound { node: TreeNode },
613        /// A parent was not found or invalid, so the provider stopped sending data
614        #[snafu(display("chunk not found {num}"))]
615        LeafNotFound { num: ChunkNum },
616        /// The hash of a parent did not match the expected hash
617        #[snafu(display("parent hash mismatch: {node:?}"))]
618        ParentHashMismatch { node: TreeNode },
619        /// The hash of a leaf did not match the expected hash
620        #[snafu(display("leaf hash mismatch: {num}"))]
621        LeafHashMismatch { num: ChunkNum },
622        /// Error when reading from the stream
623        #[snafu(display("read: {source}"))]
624        Read { source: endpoint::ReadError },
625        /// A generic io error
626        #[snafu(display("io: {source}"))]
627        DecodeIo { source: io::Error },
628    }
629
630    impl DecodeError {
631        pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
632            LeafHashMismatchSnafu { num }.build()
633        }
634    }
635
636    impl From<AtBlobHeaderNextError> for DecodeError {
637        fn from(cause: AtBlobHeaderNextError) -> Self {
638            match cause {
639                AtBlobHeaderNextError::NotFound { .. } => ChunkNotFoundSnafu.build(),
640                AtBlobHeaderNextError::EndpointRead { source, .. } => ReadSnafu.into_error(source),
641                AtBlobHeaderNextError::Io { source, .. } => DecodeIoSnafu.into_error(source),
642            }
643        }
644    }
645
646    impl From<DecodeError> for io::Error {
647        fn from(cause: DecodeError) -> Self {
648            match cause {
649                DecodeError::ParentNotFound { .. } => {
650                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
651                }
652                DecodeError::LeafNotFound { .. } => {
653                    io::Error::new(io::ErrorKind::UnexpectedEof, cause)
654                }
655                DecodeError::Read { source, .. } => source.into(),
656                DecodeError::DecodeIo { source, .. } => source,
657                _ => io::Error::other(cause),
658            }
659        }
660    }
661
662    impl From<io::Error> for DecodeError {
663        fn from(value: io::Error) -> Self {
664            DecodeIoSnafu.into_error(value)
665        }
666    }
667
668    impl From<bao_tree::io::DecodeError> for DecodeError {
669        fn from(value: bao_tree::io::DecodeError) -> Self {
670            match value {
671                bao_tree::io::DecodeError::ParentNotFound(x) => {
672                    ParentNotFoundSnafu { node: x }.build()
673                }
674                bao_tree::io::DecodeError::LeafNotFound(x) => LeafNotFoundSnafu { num: x }.build(),
675                bao_tree::io::DecodeError::ParentHashMismatch(node) => {
676                    ParentHashMismatchSnafu { node }.build()
677                }
678                bao_tree::io::DecodeError::LeafHashMismatch(chunk) => {
679                    LeafHashMismatchSnafu { num: chunk }.build()
680                }
681                bao_tree::io::DecodeError::Io(cause) => {
682                    if let Some(inner) = cause.get_ref() {
683                        if let Some(e) = inner.downcast_ref::<endpoint::ReadError>() {
684                            ReadSnafu.into_error(e.clone())
685                        } else {
686                            DecodeIoSnafu.into_error(cause)
687                        }
688                    } else {
689                        DecodeIoSnafu.into_error(cause)
690                    }
691                }
692            }
693        }
694    }
695
696    /// The next state after reading a content item
697    #[derive(Debug, From)]
698    pub enum BlobContentNext {
699        /// We expect more content
700        More((AtBlobContent, result::Result<BaoContentItem, DecodeError>)),
701        /// We are done with this blob
702        Done(AtEndBlob),
703    }
704
705    impl AtBlobContent {
706        /// Read the next item, either content, an error, or the end of the blob
707        pub async fn next(self) -> BlobContentNext {
708            match self.stream.next().await {
709                ResponseDecoderNext::More((stream, res)) => {
710                    let mut next = Self { stream, ..self };
711                    let res = res.map_err(DecodeError::from);
712                    match &res {
713                        Ok(BaoContentItem::Parent(_)) => {
714                            next.misc.other_bytes_read += 64;
715                        }
716                        Ok(BaoContentItem::Leaf(leaf)) => {
717                            next.misc.payload_bytes_read += leaf.data.len() as u64;
718                        }
719                        _ => {}
720                    }
721                    BlobContentNext::More((next, res))
722                }
723                ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
724                    stream,
725                    misc: self.misc,
726                }),
727            }
728        }
729
730        /// The geometry of the tree we are currently reading.
731        pub fn tree(&self) -> bao_tree::BaoTree {
732            self.stream.tree()
733        }
734
735        /// The hash of the blob we are reading.
736        pub fn hash(&self) -> Hash {
737            (*self.stream.hash()).into()
738        }
739
740        /// The current offset of the blob we are reading.
741        pub fn offset(&self) -> u64 {
742            self.misc.ranges_iter.offset()
743        }
744
745        /// Current stats
746        pub fn stats(&self) -> Stats {
747            Stats {
748                counters: self.misc.counters,
749                elapsed: self.misc.start.elapsed(),
750            }
751        }
752
753        /// Drain the response and throw away the result
754        pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
755            let mut content = self;
756            loop {
757                match content.next().await {
758                    BlobContentNext::More((content1, res)) => {
759                        let _ = res?;
760                        content = content1;
761                    }
762                    BlobContentNext::Done(end) => {
763                        break Ok(end);
764                    }
765                }
766            }
767        }
768
769        /// Concatenate the entire response into a vec
770        pub async fn concatenate_into_vec(
771            self,
772        ) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
773            let mut res = Vec::with_capacity(1024);
774            let mut curr = self;
775            let done = loop {
776                match curr.next().await {
777                    BlobContentNext::More((next, data)) => {
778                        if let BaoContentItem::Leaf(leaf) = data? {
779                            res.extend_from_slice(&leaf.data);
780                        }
781                        curr = next;
782                    }
783                    BlobContentNext::Done(done) => {
784                        // we are done with the root blob
785                        break done;
786                    }
787                }
788            };
789            Ok((done, res))
790        }
791
792        /// Write the entire blob to a slice writer and to an optional outboard.
793        ///
794        /// The outboard is only written to if the blob is larger than a single
795        /// chunk group.
796        pub async fn write_all_with_outboard<D, O>(
797            self,
798            mut outboard: Option<O>,
799            mut data: D,
800        ) -> result::Result<AtEndBlob, DecodeError>
801        where
802            D: AsyncSliceWriter,
803            O: OutboardMut,
804        {
805            let mut content = self;
806            loop {
807                match content.next().await {
808                    BlobContentNext::More((content1, item)) => {
809                        content = content1;
810                        match item? {
811                            BaoContentItem::Parent(parent) => {
812                                if let Some(outboard) = outboard.as_mut() {
813                                    outboard.save(parent.node, &parent.pair).await?;
814                                }
815                            }
816                            BaoContentItem::Leaf(leaf) => {
817                                data.write_bytes_at(leaf.offset, leaf.data).await?;
818                            }
819                        }
820                    }
821                    BlobContentNext::Done(end) => {
822                        return Ok(end);
823                    }
824                }
825            }
826        }
827
828        /// Write the entire blob to a slice writer.
829        pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob, DecodeError>
830        where
831            D: AsyncSliceWriter,
832        {
833            let mut content = self;
834            loop {
835                match content.next().await {
836                    BlobContentNext::More((content1, item)) => {
837                        content = content1;
838                        match item? {
839                            BaoContentItem::Parent(_) => {}
840                            BaoContentItem::Leaf(leaf) => {
841                                data.write_bytes_at(leaf.offset, leaf.data).await?;
842                            }
843                        }
844                    }
845                    BlobContentNext::Done(end) => {
846                        return Ok(end);
847                    }
848                }
849            }
850        }
851
852        /// Immediately finish the get response without reading further
853        pub fn finish(self) -> AtClosing {
854            AtClosing::new(self.misc, self.stream.finish(), false)
855        }
856    }
857
858    /// State after we have read all the content for a blob
859    #[derive(Debug)]
860    pub struct AtEndBlob {
861        stream: WrappedRecvStream,
862        misc: Box<Misc>,
863    }
864
865    /// The next state after the end of a blob
866    #[derive(Debug, From)]
867    pub enum EndBlobNext {
868        /// Response is expected to have more children
869        MoreChildren(AtStartChild),
870        /// No more children expected
871        Closing(AtClosing),
872    }
873
874    impl AtEndBlob {
875        /// Read the next child, or finish
876        pub fn next(mut self) -> EndBlobNext {
877            if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
878                AtStartChild {
879                    reader: self.stream,
880                    offset,
881                    ranges,
882                    misc: self.misc,
883                }
884                .into()
885            } else {
886                AtClosing::new(self.misc, self.stream, true).into()
887            }
888        }
889    }
890
891    /// State when finishing the get response
892    #[derive(Debug)]
893    pub struct AtClosing {
894        misc: Box<Misc>,
895        reader: WrappedRecvStream,
896        check_extra_data: bool,
897    }
898
899    impl AtClosing {
900        fn new(misc: Box<Misc>, reader: WrappedRecvStream, check_extra_data: bool) -> Self {
901            Self {
902                misc,
903                reader,
904                check_extra_data,
905            }
906        }
907
908        /// Finish the get response, returning statistics
909        pub async fn next(self) -> result::Result<Stats, endpoint::ReadError> {
910            // Shut down the stream
911            let reader = self.reader;
912            let mut reader = reader.into_inner();
913            if self.check_extra_data {
914                if let Some(chunk) = reader.read_chunk(8, false).await? {
915                    reader.stop(0u8.into()).ok();
916                    error!("Received unexpected data from the provider: {chunk:?}");
917                }
918            } else {
919                reader.stop(0u8.into()).ok();
920            }
921            Ok(Stats {
922                counters: self.misc.counters,
923                elapsed: self.misc.start.elapsed(),
924            })
925        }
926    }
927
928    #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
929    pub struct RequestCounters {
930        /// payload bytes written
931        pub payload_bytes_written: u64,
932        /// request, hash pair and size bytes written
933        pub other_bytes_written: u64,
934        /// payload bytes read
935        pub payload_bytes_read: u64,
936        /// hash pair and size bytes read
937        pub other_bytes_read: u64,
938    }
939
940    /// Stuff we need to hold on to while going through the machine states
941    #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
942    struct Misc {
943        /// start time for statistics
944        start: Instant,
945        /// counters
946        #[deref]
947        #[deref_mut]
948        counters: RequestCounters,
949        /// iterator over the ranges of the collection and the children
950        ranges_iter: RangesIter,
951    }
952}
953
954/// Error when processing a response
955#[common_fields({
956    backtrace: Option<Backtrace>,
957    #[snafu(implicit)]
958    span_trace: SpanTrace,
959})]
960#[allow(missing_docs)]
961#[non_exhaustive]
962#[derive(Debug, Snafu)]
963pub enum GetResponseError {
964    /// Error when opening a stream
965    #[snafu(display("connection: {source}"))]
966    Connection { source: endpoint::ConnectionError },
967    /// Error when writing the handshake or request to the stream
968    #[snafu(display("write: {source}"))]
969    Write { source: endpoint::WriteError },
970    /// Error when reading from the stream
971    #[snafu(display("read: {source}"))]
972    Read { source: endpoint::ReadError },
973    /// Error when decoding, e.g. hash mismatch
974    #[snafu(display("decode: {source}"))]
975    Decode { source: bao_tree::io::DecodeError },
976    /// A generic error
977    #[snafu(display("generic: {source}"))]
978    Generic { source: anyhow::Error },
979}
980
981impl From<postcard::Error> for GetResponseError {
982    fn from(cause: postcard::Error) -> Self {
983        GenericSnafu.into_error(cause.into())
984    }
985}
986
987impl From<bao_tree::io::DecodeError> for GetResponseError {
988    fn from(cause: bao_tree::io::DecodeError) -> Self {
989        match cause {
990            bao_tree::io::DecodeError::Io(cause) => {
991                // try to downcast to specific quinn errors
992                if let Some(source) = cause.source() {
993                    if let Some(error) = source.downcast_ref::<endpoint::ConnectionError>() {
994                        return ConnectionSnafu.into_error(error.clone());
995                    }
996                    if let Some(error) = source.downcast_ref::<endpoint::ReadError>() {
997                        return ReadSnafu.into_error(error.clone());
998                    }
999                    if let Some(error) = source.downcast_ref::<endpoint::WriteError>() {
1000                        return WriteSnafu.into_error(error.clone());
1001                    }
1002                }
1003                GenericSnafu.into_error(cause.into())
1004            }
1005            _ => DecodeSnafu.into_error(cause),
1006        }
1007    }
1008}
1009
1010impl From<anyhow::Error> for GetResponseError {
1011    fn from(cause: anyhow::Error) -> Self {
1012        GenericSnafu.into_error(cause)
1013    }
1014}
1015
1016impl From<GetResponseError> for std::io::Error {
1017    fn from(cause: GetResponseError) -> Self {
1018        Self::other(cause)
1019    }
1020}