1use std::{
20 error::Error,
21 fmt::{self, Debug},
22 time::{Duration, Instant},
23};
24
25use anyhow::Result;
26use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
27use fsm::RequestCounters;
28use iroh::endpoint::{self, RecvStream, SendStream};
29use iroh_io::TokioStreamReader;
30use n0_snafu::SpanTrace;
31use nested_enum_utils::common_fields;
32use serde::{Deserialize, Serialize};
33use snafu::{Backtrace, IntoError, ResultExt, Snafu};
34use tracing::{debug, error};
35
36use crate::{protocol::ChunkRangesSeq, store::IROH_BLOCK_SIZE, Hash};
37
38mod error;
39pub mod request;
40pub(crate) use error::{BadRequestSnafu, LocalFailureSnafu};
41pub use error::{GetError, GetResult};
42
43type WrappedRecvStream = TokioStreamReader<RecvStream>;
44
45#[derive(
47 Debug,
48 Default,
49 Clone,
50 PartialEq,
51 Eq,
52 Serialize,
53 Deserialize,
54 derive_more::Deref,
55 derive_more::DerefMut,
56)]
57pub struct Stats {
58 #[deref]
60 #[deref_mut]
61 pub counters: RequestCounters,
62 pub elapsed: Duration,
64}
65
66impl Stats {
67 pub fn mbits(&self) -> f64 {
69 let data_len_bit = self.total_bytes_read() * 8;
70 data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
71 }
72
73 pub fn total_bytes_read(&self) -> u64 {
74 self.payload_bytes_read + self.other_bytes_read
75 }
76
77 pub fn combine(&mut self, that: &Stats) {
78 self.payload_bytes_written += that.payload_bytes_written;
79 self.other_bytes_written += that.other_bytes_written;
80 self.payload_bytes_read += that.payload_bytes_read;
81 self.other_bytes_read += that.other_bytes_read;
82 self.elapsed += that.elapsed;
83 }
84}
85
86#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
90pub mod fsm {
91 use std::{io, result};
92
93 use bao_tree::{
94 io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
95 BaoTree, ChunkRanges, TreeNode,
96 };
97 use derive_more::From;
98 use iroh::endpoint::Connection;
99 use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
100
101 use super::*;
102 use crate::{
103 get::error::BadRequestSnafu,
104 protocol::{
105 GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
106 },
107 };
108
109 self_cell::self_cell! {
110 struct RangesIterInner {
111 owner: ChunkRangesSeq,
112 #[not_covariant]
113 dependent: NonEmptyRequestRangeSpecIter,
114 }
115 }
116
117 pub fn start(
119 connection: Connection,
120 request: GetRequest,
121 counters: RequestCounters,
122 ) -> AtInitial {
123 AtInitial::new(connection, request, counters)
124 }
125
126 pub async fn start_get_many(
128 connection: Connection,
129 request: GetManyRequest,
130 counters: RequestCounters,
131 ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
132 let start = Instant::now();
133 let (mut writer, reader) = connection.open_bi().await?;
134 let request = Request::GetMany(request);
135 let request_bytes = postcard::to_stdvec(&request)
136 .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
137 writer.write_all(&request_bytes).await?;
138 writer.finish()?;
139 let Request::GetMany(request) = request else {
140 unreachable!();
141 };
142 let reader = TokioStreamReader::new(reader);
143 let mut ranges_iter = RangesIter::new(request.ranges.clone());
144 let first_item = ranges_iter.next();
145 let misc = Box::new(Misc {
146 counters,
147 start,
148 ranges_iter,
149 });
150 Ok(match first_item {
151 Some((child_offset, child_ranges)) => Ok(AtStartChild {
152 ranges: child_ranges,
153 reader,
154 misc,
155 offset: child_offset,
156 }),
157 None => Err(AtClosing::new(misc, reader, true)),
158 })
159 }
160
161 struct RangesIter(RangesIterInner);
166
167 impl fmt::Debug for RangesIter {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 f.debug_struct("RangesIter").finish()
170 }
171 }
172
173 impl RangesIter {
174 pub fn new(owner: ChunkRangesSeq) -> Self {
175 Self(RangesIterInner::new(owner, |owner| {
176 owner.iter_non_empty_infinite()
177 }))
178 }
179
180 pub fn offset(&self) -> u64 {
181 self.0.with_dependent(|_owner, iter| iter.offset())
182 }
183 }
184
185 impl Iterator for RangesIter {
186 type Item = (u64, ChunkRanges);
187
188 fn next(&mut self) -> Option<Self::Item> {
189 self.0.with_dependent_mut(|_owner, iter| {
190 iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
191 })
192 }
193 }
194
195 #[derive(Debug)]
197 pub struct AtInitial {
198 connection: Connection,
199 request: GetRequest,
200 counters: RequestCounters,
201 }
202
203 impl AtInitial {
204 pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
209 Self {
210 connection,
211 request,
212 counters,
213 }
214 }
215
216 pub async fn next(self) -> Result<AtConnected, endpoint::ConnectionError> {
218 let start = Instant::now();
219 let (writer, reader) = self.connection.open_bi().await?;
220 let reader = TokioStreamReader::new(reader);
221 Ok(AtConnected {
222 start,
223 reader,
224 writer,
225 request: self.request,
226 counters: self.counters,
227 })
228 }
229 }
230
231 #[derive(Debug)]
233 pub struct AtConnected {
234 start: Instant,
235 reader: WrappedRecvStream,
236 writer: SendStream,
237 request: GetRequest,
238 counters: RequestCounters,
239 }
240
241 #[derive(Debug, From)]
243 pub enum ConnectedNext {
244 StartRoot(AtStartRoot),
246 StartChild(AtStartChild),
248 Closing(AtClosing),
250 }
251
252 #[common_fields({
254 backtrace: Option<Backtrace>,
255 #[snafu(implicit)]
256 span_trace: SpanTrace,
257 })]
258 #[allow(missing_docs)]
259 #[derive(Debug, Snafu)]
260 #[non_exhaustive]
261 pub enum ConnectedNextError {
262 #[snafu(display("postcard ser: {source}"))]
264 PostcardSer { source: postcard::Error },
265 #[snafu(display("request too big"))]
267 RequestTooBig {},
268 #[snafu(display("write: {source}"))]
270 Write { source: quinn::WriteError },
271 #[snafu(display("closed"))]
273 Closed { source: quinn::ClosedStream },
274 #[snafu(transparent)]
276 Io { source: io::Error },
277 }
278
279 impl AtConnected {
280 pub async fn next(self) -> Result<ConnectedNext, ConnectedNextError> {
287 let Self {
288 start,
289 reader,
290 mut writer,
291 mut request,
292 mut counters,
293 } = self;
294 counters.other_bytes_written += {
296 debug!("sending request");
297 let wrapped = Request::Get(request);
298 let request_bytes = postcard::to_stdvec(&wrapped).context(PostcardSerSnafu)?;
299 let Request::Get(x) = wrapped else {
300 unreachable!();
301 };
302 request = x;
303
304 if request_bytes.len() > MAX_MESSAGE_SIZE {
305 return Err(RequestTooBigSnafu.build());
306 }
307
308 writer.write_all(&request_bytes).await.context(WriteSnafu)?;
310 request_bytes.len() as u64
311 };
312
313 writer.finish().context(ClosedSnafu)?;
315
316 let hash = request.hash;
317 let ranges_iter = RangesIter::new(request.ranges);
318 let mut misc = Box::new(Misc {
320 counters,
321 start,
322 ranges_iter,
323 });
324 Ok(match misc.ranges_iter.next() {
325 Some((offset, ranges)) => {
326 if offset == 0 {
327 AtStartRoot {
328 reader,
329 ranges,
330 misc,
331 hash,
332 }
333 .into()
334 } else {
335 AtStartChild {
336 reader,
337 ranges,
338 misc,
339 offset,
340 }
341 .into()
342 }
343 }
344 None => AtClosing::new(misc, reader, true).into(),
345 })
346 }
347 }
348
349 #[derive(Debug)]
351 pub struct AtStartRoot {
352 ranges: ChunkRanges,
353 reader: TokioStreamReader<RecvStream>,
354 misc: Box<Misc>,
355 hash: Hash,
356 }
357
358 #[derive(Debug)]
360 pub struct AtStartChild {
361 ranges: ChunkRanges,
362 reader: TokioStreamReader<RecvStream>,
363 misc: Box<Misc>,
364 offset: u64,
365 }
366
367 impl AtStartChild {
368 pub fn offset(&self) -> u64 {
374 self.offset
375 }
376
377 pub fn ranges(&self) -> &ChunkRanges {
379 &self.ranges
380 }
381
382 pub fn next(self, hash: Hash) -> AtBlobHeader {
386 AtBlobHeader {
387 reader: self.reader,
388 ranges: self.ranges,
389 misc: self.misc,
390 hash,
391 }
392 }
393
394 pub fn finish(self) -> AtClosing {
400 AtClosing::new(self.misc, self.reader, false)
401 }
402 }
403
404 impl AtStartRoot {
405 pub fn ranges(&self) -> &ChunkRanges {
407 &self.ranges
408 }
409
410 pub fn hash(&self) -> Hash {
412 self.hash
413 }
414
415 pub fn next(self) -> AtBlobHeader {
419 AtBlobHeader {
420 reader: self.reader,
421 ranges: self.ranges,
422 hash: self.hash,
423 misc: self.misc,
424 }
425 }
426
427 pub fn finish(self) -> AtClosing {
429 AtClosing::new(self.misc, self.reader, false)
430 }
431 }
432
433 #[derive(Debug)]
435 pub struct AtBlobHeader {
436 ranges: ChunkRanges,
437 reader: TokioStreamReader<RecvStream>,
438 misc: Box<Misc>,
439 hash: Hash,
440 }
441
442 #[common_fields({
444 backtrace: Option<Backtrace>,
445 #[snafu(implicit)]
446 span_trace: SpanTrace,
447 })]
448 #[non_exhaustive]
449 #[derive(Debug, Snafu)]
450 pub enum AtBlobHeaderNextError {
451 #[snafu(display("not found"))]
455 NotFound {},
456 #[snafu(display("read: {source}"))]
458 EndpointRead { source: endpoint::ReadError },
459 #[snafu(display("io: {source}"))]
461 Io { source: io::Error },
462 }
463
464 impl From<AtBlobHeaderNextError> for io::Error {
465 fn from(cause: AtBlobHeaderNextError) -> Self {
466 match cause {
467 AtBlobHeaderNextError::NotFound { .. } => {
468 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
469 }
470 AtBlobHeaderNextError::EndpointRead { source, .. } => source.into(),
471 AtBlobHeaderNextError::Io { source, .. } => source,
472 }
473 }
474 }
475
476 impl AtBlobHeader {
477 pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
479 let size = self.reader.read::<8>().await.map_err(|cause| {
480 if cause.kind() == io::ErrorKind::UnexpectedEof {
481 NotFoundSnafu.build()
482 } else if let Some(e) = cause
483 .get_ref()
484 .and_then(|x| x.downcast_ref::<endpoint::ReadError>())
485 {
486 EndpointReadSnafu.into_error(e.clone())
487 } else {
488 IoSnafu.into_error(cause)
489 }
490 })?;
491 self.misc.other_bytes_read += 8;
492 let size = u64::from_le_bytes(size);
493 let stream = ResponseDecoder::new(
494 self.hash.into(),
495 self.ranges,
496 BaoTree::new(size, IROH_BLOCK_SIZE),
497 self.reader,
498 );
499 Ok((
500 AtBlobContent {
501 stream,
502 misc: self.misc,
503 },
504 size,
505 ))
506 }
507
508 pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
510 let (content, _size) = self.next().await?;
511 content.drain().await
512 }
513
514 pub async fn concatenate_into_vec(
519 self,
520 ) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
521 let (content, _size) = self.next().await?;
522 content.concatenate_into_vec().await
523 }
524
525 pub async fn write_all<D: AsyncSliceWriter>(
527 self,
528 data: D,
529 ) -> result::Result<AtEndBlob, DecodeError> {
530 let (content, _size) = self.next().await?;
531 let res = content.write_all(data).await?;
532 Ok(res)
533 }
534
535 pub async fn write_all_with_outboard<D, O>(
540 self,
541 outboard: Option<O>,
542 data: D,
543 ) -> result::Result<AtEndBlob, DecodeError>
544 where
545 D: AsyncSliceWriter,
546 O: OutboardMut,
547 {
548 let (content, _size) = self.next().await?;
549 let res = content.write_all_with_outboard(outboard, data).await?;
550 Ok(res)
551 }
552
553 pub fn hash(&self) -> Hash {
555 self.hash
556 }
557
558 pub fn ranges(&self) -> &ChunkRanges {
560 &self.ranges
561 }
562
563 pub fn offset(&self) -> u64 {
565 self.misc.ranges_iter.offset()
566 }
567 }
568
569 #[derive(Debug)]
571 pub struct AtBlobContent {
572 stream: ResponseDecoder<WrappedRecvStream>,
573 misc: Box<Misc>,
574 }
575
576 #[common_fields({
600 backtrace: Option<Backtrace>,
601 #[snafu(implicit)]
602 span_trace: SpanTrace,
603 })]
604 #[non_exhaustive]
605 #[derive(Debug, Snafu)]
606 pub enum DecodeError {
607 #[snafu(display("not found"))]
609 ChunkNotFound {},
610 #[snafu(display("parent not found {node:?}"))]
612 ParentNotFound { node: TreeNode },
613 #[snafu(display("chunk not found {num}"))]
615 LeafNotFound { num: ChunkNum },
616 #[snafu(display("parent hash mismatch: {node:?}"))]
618 ParentHashMismatch { node: TreeNode },
619 #[snafu(display("leaf hash mismatch: {num}"))]
621 LeafHashMismatch { num: ChunkNum },
622 #[snafu(display("read: {source}"))]
624 Read { source: endpoint::ReadError },
625 #[snafu(display("io: {source}"))]
627 DecodeIo { source: io::Error },
628 }
629
630 impl DecodeError {
631 pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
632 LeafHashMismatchSnafu { num }.build()
633 }
634 }
635
636 impl From<AtBlobHeaderNextError> for DecodeError {
637 fn from(cause: AtBlobHeaderNextError) -> Self {
638 match cause {
639 AtBlobHeaderNextError::NotFound { .. } => ChunkNotFoundSnafu.build(),
640 AtBlobHeaderNextError::EndpointRead { source, .. } => ReadSnafu.into_error(source),
641 AtBlobHeaderNextError::Io { source, .. } => DecodeIoSnafu.into_error(source),
642 }
643 }
644 }
645
646 impl From<DecodeError> for io::Error {
647 fn from(cause: DecodeError) -> Self {
648 match cause {
649 DecodeError::ParentNotFound { .. } => {
650 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
651 }
652 DecodeError::LeafNotFound { .. } => {
653 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
654 }
655 DecodeError::Read { source, .. } => source.into(),
656 DecodeError::DecodeIo { source, .. } => source,
657 _ => io::Error::other(cause),
658 }
659 }
660 }
661
662 impl From<io::Error> for DecodeError {
663 fn from(value: io::Error) -> Self {
664 DecodeIoSnafu.into_error(value)
665 }
666 }
667
668 impl From<bao_tree::io::DecodeError> for DecodeError {
669 fn from(value: bao_tree::io::DecodeError) -> Self {
670 match value {
671 bao_tree::io::DecodeError::ParentNotFound(x) => {
672 ParentNotFoundSnafu { node: x }.build()
673 }
674 bao_tree::io::DecodeError::LeafNotFound(x) => LeafNotFoundSnafu { num: x }.build(),
675 bao_tree::io::DecodeError::ParentHashMismatch(node) => {
676 ParentHashMismatchSnafu { node }.build()
677 }
678 bao_tree::io::DecodeError::LeafHashMismatch(chunk) => {
679 LeafHashMismatchSnafu { num: chunk }.build()
680 }
681 bao_tree::io::DecodeError::Io(cause) => {
682 if let Some(inner) = cause.get_ref() {
683 if let Some(e) = inner.downcast_ref::<endpoint::ReadError>() {
684 ReadSnafu.into_error(e.clone())
685 } else {
686 DecodeIoSnafu.into_error(cause)
687 }
688 } else {
689 DecodeIoSnafu.into_error(cause)
690 }
691 }
692 }
693 }
694 }
695
696 #[derive(Debug, From)]
698 pub enum BlobContentNext {
699 More((AtBlobContent, result::Result<BaoContentItem, DecodeError>)),
701 Done(AtEndBlob),
703 }
704
705 impl AtBlobContent {
706 pub async fn next(self) -> BlobContentNext {
708 match self.stream.next().await {
709 ResponseDecoderNext::More((stream, res)) => {
710 let mut next = Self { stream, ..self };
711 let res = res.map_err(DecodeError::from);
712 match &res {
713 Ok(BaoContentItem::Parent(_)) => {
714 next.misc.other_bytes_read += 64;
715 }
716 Ok(BaoContentItem::Leaf(leaf)) => {
717 next.misc.payload_bytes_read += leaf.data.len() as u64;
718 }
719 _ => {}
720 }
721 BlobContentNext::More((next, res))
722 }
723 ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
724 stream,
725 misc: self.misc,
726 }),
727 }
728 }
729
730 pub fn tree(&self) -> bao_tree::BaoTree {
732 self.stream.tree()
733 }
734
735 pub fn hash(&self) -> Hash {
737 (*self.stream.hash()).into()
738 }
739
740 pub fn offset(&self) -> u64 {
742 self.misc.ranges_iter.offset()
743 }
744
745 pub fn stats(&self) -> Stats {
747 Stats {
748 counters: self.misc.counters,
749 elapsed: self.misc.start.elapsed(),
750 }
751 }
752
753 pub async fn drain(self) -> result::Result<AtEndBlob, DecodeError> {
755 let mut content = self;
756 loop {
757 match content.next().await {
758 BlobContentNext::More((content1, res)) => {
759 let _ = res?;
760 content = content1;
761 }
762 BlobContentNext::Done(end) => {
763 break Ok(end);
764 }
765 }
766 }
767 }
768
769 pub async fn concatenate_into_vec(
771 self,
772 ) -> result::Result<(AtEndBlob, Vec<u8>), DecodeError> {
773 let mut res = Vec::with_capacity(1024);
774 let mut curr = self;
775 let done = loop {
776 match curr.next().await {
777 BlobContentNext::More((next, data)) => {
778 if let BaoContentItem::Leaf(leaf) = data? {
779 res.extend_from_slice(&leaf.data);
780 }
781 curr = next;
782 }
783 BlobContentNext::Done(done) => {
784 break done;
786 }
787 }
788 };
789 Ok((done, res))
790 }
791
792 pub async fn write_all_with_outboard<D, O>(
797 self,
798 mut outboard: Option<O>,
799 mut data: D,
800 ) -> result::Result<AtEndBlob, DecodeError>
801 where
802 D: AsyncSliceWriter,
803 O: OutboardMut,
804 {
805 let mut content = self;
806 loop {
807 match content.next().await {
808 BlobContentNext::More((content1, item)) => {
809 content = content1;
810 match item? {
811 BaoContentItem::Parent(parent) => {
812 if let Some(outboard) = outboard.as_mut() {
813 outboard.save(parent.node, &parent.pair).await?;
814 }
815 }
816 BaoContentItem::Leaf(leaf) => {
817 data.write_bytes_at(leaf.offset, leaf.data).await?;
818 }
819 }
820 }
821 BlobContentNext::Done(end) => {
822 return Ok(end);
823 }
824 }
825 }
826 }
827
828 pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob, DecodeError>
830 where
831 D: AsyncSliceWriter,
832 {
833 let mut content = self;
834 loop {
835 match content.next().await {
836 BlobContentNext::More((content1, item)) => {
837 content = content1;
838 match item? {
839 BaoContentItem::Parent(_) => {}
840 BaoContentItem::Leaf(leaf) => {
841 data.write_bytes_at(leaf.offset, leaf.data).await?;
842 }
843 }
844 }
845 BlobContentNext::Done(end) => {
846 return Ok(end);
847 }
848 }
849 }
850 }
851
852 pub fn finish(self) -> AtClosing {
854 AtClosing::new(self.misc, self.stream.finish(), false)
855 }
856 }
857
858 #[derive(Debug)]
860 pub struct AtEndBlob {
861 stream: WrappedRecvStream,
862 misc: Box<Misc>,
863 }
864
865 #[derive(Debug, From)]
867 pub enum EndBlobNext {
868 MoreChildren(AtStartChild),
870 Closing(AtClosing),
872 }
873
874 impl AtEndBlob {
875 pub fn next(mut self) -> EndBlobNext {
877 if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
878 AtStartChild {
879 reader: self.stream,
880 offset,
881 ranges,
882 misc: self.misc,
883 }
884 .into()
885 } else {
886 AtClosing::new(self.misc, self.stream, true).into()
887 }
888 }
889 }
890
891 #[derive(Debug)]
893 pub struct AtClosing {
894 misc: Box<Misc>,
895 reader: WrappedRecvStream,
896 check_extra_data: bool,
897 }
898
899 impl AtClosing {
900 fn new(misc: Box<Misc>, reader: WrappedRecvStream, check_extra_data: bool) -> Self {
901 Self {
902 misc,
903 reader,
904 check_extra_data,
905 }
906 }
907
908 pub async fn next(self) -> result::Result<Stats, endpoint::ReadError> {
910 let reader = self.reader;
912 let mut reader = reader.into_inner();
913 if self.check_extra_data {
914 if let Some(chunk) = reader.read_chunk(8, false).await? {
915 reader.stop(0u8.into()).ok();
916 error!("Received unexpected data from the provider: {chunk:?}");
917 }
918 } else {
919 reader.stop(0u8.into()).ok();
920 }
921 Ok(Stats {
922 counters: self.misc.counters,
923 elapsed: self.misc.start.elapsed(),
924 })
925 }
926 }
927
928 #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
929 pub struct RequestCounters {
930 pub payload_bytes_written: u64,
932 pub other_bytes_written: u64,
934 pub payload_bytes_read: u64,
936 pub other_bytes_read: u64,
938 }
939
940 #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
942 struct Misc {
943 start: Instant,
945 #[deref]
947 #[deref_mut]
948 counters: RequestCounters,
949 ranges_iter: RangesIter,
951 }
952}
953
954#[common_fields({
956 backtrace: Option<Backtrace>,
957 #[snafu(implicit)]
958 span_trace: SpanTrace,
959})]
960#[allow(missing_docs)]
961#[non_exhaustive]
962#[derive(Debug, Snafu)]
963pub enum GetResponseError {
964 #[snafu(display("connection: {source}"))]
966 Connection { source: endpoint::ConnectionError },
967 #[snafu(display("write: {source}"))]
969 Write { source: endpoint::WriteError },
970 #[snafu(display("read: {source}"))]
972 Read { source: endpoint::ReadError },
973 #[snafu(display("decode: {source}"))]
975 Decode { source: bao_tree::io::DecodeError },
976 #[snafu(display("generic: {source}"))]
978 Generic { source: anyhow::Error },
979}
980
981impl From<postcard::Error> for GetResponseError {
982 fn from(cause: postcard::Error) -> Self {
983 GenericSnafu.into_error(cause.into())
984 }
985}
986
987impl From<bao_tree::io::DecodeError> for GetResponseError {
988 fn from(cause: bao_tree::io::DecodeError) -> Self {
989 match cause {
990 bao_tree::io::DecodeError::Io(cause) => {
991 if let Some(source) = cause.source() {
993 if let Some(error) = source.downcast_ref::<endpoint::ConnectionError>() {
994 return ConnectionSnafu.into_error(error.clone());
995 }
996 if let Some(error) = source.downcast_ref::<endpoint::ReadError>() {
997 return ReadSnafu.into_error(error.clone());
998 }
999 if let Some(error) = source.downcast_ref::<endpoint::WriteError>() {
1000 return WriteSnafu.into_error(error.clone());
1001 }
1002 }
1003 GenericSnafu.into_error(cause.into())
1004 }
1005 _ => DecodeSnafu.into_error(cause),
1006 }
1007 }
1008}
1009
1010impl From<anyhow::Error> for GetResponseError {
1011 fn from(cause: anyhow::Error) -> Self {
1012 GenericSnafu.into_error(cause)
1013 }
1014}
1015
1016impl From<GetResponseError> for std::io::Error {
1017 fn from(cause: GetResponseError) -> Self {
1018 Self::other(cause)
1019 }
1020}