1use std::{
20 fmt::{self, Debug},
21 time::{Duration, Instant},
22};
23
24use anyhow::Result;
25use bao_tree::{io::fsm::BaoContentItem, ChunkNum};
26use fsm::RequestCounters;
27use n0_snafu::SpanTrace;
28use nested_enum_utils::common_fields;
29use serde::{Deserialize, Serialize};
30use snafu::{Backtrace, IntoError, ResultExt, Snafu};
31use tracing::{debug, error};
32
33use crate::{
34 protocol::ChunkRangesSeq,
35 store::IROH_BLOCK_SIZE,
36 util::{RecvStream, SendStream},
37 Hash,
38};
39
40mod error;
41pub mod request;
42pub(crate) use error::get_error;
43pub use error::{GetError, GetResult};
44
45type DefaultReader = iroh::endpoint::RecvStream;
46type DefaultWriter = iroh::endpoint::SendStream;
47
48pub struct StreamPair<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
49 pub connection_id: u64,
50 pub t0: Instant,
51 pub recv: R,
52 pub send: W,
53}
54
55impl<R: RecvStream, W: SendStream> StreamPair<R, W> {
56 pub fn new(connection_id: u64, recv: R, send: W) -> Self {
57 Self {
58 t0: Instant::now(),
59 recv,
60 send,
61 connection_id,
62 }
63 }
64}
65
66#[derive(
68 Debug,
69 Default,
70 Clone,
71 PartialEq,
72 Eq,
73 Serialize,
74 Deserialize,
75 derive_more::Deref,
76 derive_more::DerefMut,
77)]
78pub struct Stats {
79 #[deref]
81 #[deref_mut]
82 pub counters: RequestCounters,
83 pub elapsed: Duration,
85}
86
87impl Stats {
88 pub fn mbits(&self) -> f64 {
90 let data_len_bit = self.total_bytes_read() * 8;
91 data_len_bit as f64 / (1000. * 1000.) / self.elapsed.as_secs_f64()
92 }
93
94 pub fn total_bytes_read(&self) -> u64 {
95 self.payload_bytes_read + self.other_bytes_read
96 }
97
98 pub fn combine(&mut self, that: &Stats) {
99 self.payload_bytes_written += that.payload_bytes_written;
100 self.other_bytes_written += that.other_bytes_written;
101 self.payload_bytes_read += that.payload_bytes_read;
102 self.other_bytes_read += that.other_bytes_read;
103 self.elapsed += that.elapsed;
104 }
105}
106
107#[doc = include_str!("../docs/img/get_machine.drawio.svg")]
111pub mod fsm {
112 use std::{io, result};
113
114 use bao_tree::{
115 io::fsm::{OutboardMut, ResponseDecoder, ResponseDecoderNext},
116 BaoTree, ChunkRanges, TreeNode,
117 };
118 use derive_more::From;
119 use iroh::endpoint::Connection;
120 use iroh_io::AsyncSliceWriter;
121
122 use super::*;
123 use crate::{
124 get::get_error::BadRequestSnafu,
125 protocol::{
126 GetManyRequest, GetRequest, NonEmptyRequestRangeSpecIter, Request, MAX_MESSAGE_SIZE,
127 },
128 util::{RecvStream, RecvStreamAsyncStreamReader, SendStream},
129 };
130
131 self_cell::self_cell! {
132 struct RangesIterInner {
133 owner: ChunkRangesSeq,
134 #[not_covariant]
135 dependent: NonEmptyRequestRangeSpecIter,
136 }
137 }
138
139 pub fn start(
141 connection: Connection,
142 request: GetRequest,
143 counters: RequestCounters,
144 ) -> AtInitial {
145 AtInitial::new(connection, request, counters)
146 }
147
148 pub async fn start_get_many(
150 connection: Connection,
151 request: GetManyRequest,
152 counters: RequestCounters,
153 ) -> std::result::Result<Result<AtStartChild, AtClosing>, GetError> {
154 let start = Instant::now();
155 let (mut writer, reader) = connection
156 .open_bi()
157 .await
158 .map_err(|e| OpenSnafu.into_error(e.into()))?;
159 let request = Request::GetMany(request);
160 let request_bytes = postcard::to_stdvec(&request)
161 .map_err(|source| BadRequestSnafu.into_error(source.into()))?;
162 writer
163 .send_bytes(request_bytes.into())
164 .await
165 .context(connected_next_error::WriteSnafu)?;
166 let Request::GetMany(request) = request else {
167 unreachable!();
168 };
169 let mut ranges_iter = RangesIter::new(request.ranges.clone());
170 let first_item = ranges_iter.next();
171 let misc = Box::new(Misc {
172 counters,
173 start,
174 ranges_iter,
175 });
176 Ok(match first_item {
177 Some((child_offset, child_ranges)) => Ok(AtStartChild {
178 ranges: child_ranges,
179 reader,
180 misc,
181 offset: child_offset,
182 }),
183 None => Err(AtClosing::new(misc, reader, true)),
184 })
185 }
186
187 struct RangesIter(RangesIterInner);
192
193 impl fmt::Debug for RangesIter {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 f.debug_struct("RangesIter").finish()
196 }
197 }
198
199 impl RangesIter {
200 pub fn new(owner: ChunkRangesSeq) -> Self {
201 Self(RangesIterInner::new(owner, |owner| {
202 owner.iter_non_empty_infinite()
203 }))
204 }
205
206 pub fn offset(&self) -> u64 {
207 self.0.with_dependent(|_owner, iter| iter.offset())
208 }
209 }
210
211 impl Iterator for RangesIter {
212 type Item = (u64, ChunkRanges);
213
214 fn next(&mut self) -> Option<Self::Item> {
215 self.0.with_dependent_mut(|_owner, iter| {
216 iter.next().map(|(offset, ranges)| (offset, ranges.clone()))
217 })
218 }
219 }
220
221 #[derive(Debug)]
223 pub struct AtInitial {
224 connection: Connection,
225 request: GetRequest,
226 counters: RequestCounters,
227 }
228
229 impl AtInitial {
230 pub fn new(connection: Connection, request: GetRequest, counters: RequestCounters) -> Self {
235 Self {
236 connection,
237 request,
238 counters,
239 }
240 }
241
242 pub async fn next(self) -> Result<AtConnected, InitialNextError> {
244 let start = Instant::now();
245 let (writer, reader) = self
246 .connection
247 .open_bi()
248 .await
249 .map_err(|e| OpenSnafu.into_error(e.into()))?;
250 Ok(AtConnected {
251 start,
252 reader,
253 writer,
254 request: self.request,
255 counters: self.counters,
256 })
257 }
258 }
259
260 #[common_fields({
262 backtrace: Option<Backtrace>,
263 #[snafu(implicit)]
264 span_trace: SpanTrace,
265 })]
266 #[allow(missing_docs)]
267 #[derive(Debug, Snafu)]
268 #[non_exhaustive]
269 pub enum InitialNextError {
270 Open { source: io::Error },
271 }
272
273 #[derive(Debug)]
275 pub struct AtConnected<R: RecvStream = DefaultReader, W: SendStream = DefaultWriter> {
276 start: Instant,
277 reader: R,
278 writer: W,
279 request: GetRequest,
280 counters: RequestCounters,
281 }
282
283 #[derive(Debug, From)]
285 pub enum ConnectedNext<R: RecvStream = DefaultReader> {
286 StartRoot(AtStartRoot<R>),
288 StartChild(AtStartChild<R>),
290 Closing(AtClosing<R>),
292 }
293
294 #[common_fields({
296 backtrace: Option<Backtrace>,
297 #[snafu(implicit)]
298 span_trace: SpanTrace,
299 })]
300 #[allow(missing_docs)]
301 #[derive(Debug, Snafu)]
302 #[snafu(module)]
303 #[non_exhaustive]
304 pub enum ConnectedNextError {
305 #[snafu(display("postcard ser: {source}"))]
307 PostcardSer { source: postcard::Error },
308 #[snafu(display("request too big"))]
310 RequestTooBig {},
311 #[snafu(display("write: {source}"))]
313 Write { source: io::Error },
314 }
315
316 impl<R: RecvStream, W: SendStream> AtConnected<R, W> {
317 pub fn new(
318 start: Instant,
319 reader: R,
320 writer: W,
321 request: GetRequest,
322 counters: RequestCounters,
323 ) -> Self {
324 Self {
325 start,
326 reader,
327 writer,
328 request,
329 counters,
330 }
331 }
332
333 pub async fn next(self) -> Result<ConnectedNext<R>, ConnectedNextError> {
340 let Self {
341 start,
342 reader,
343 mut writer,
344 mut request,
345 mut counters,
346 } = self;
347 counters.other_bytes_written += {
349 debug!("sending request");
350 let wrapped = Request::Get(request);
351 let request_bytes = postcard::to_stdvec(&wrapped)
352 .context(connected_next_error::PostcardSerSnafu)?;
353 let Request::Get(x) = wrapped else {
354 unreachable!();
355 };
356 request = x;
357
358 if request_bytes.len() > MAX_MESSAGE_SIZE {
359 return Err(connected_next_error::RequestTooBigSnafu.build());
360 }
361
362 let len = request_bytes.len() as u64;
364 writer
365 .send_bytes(request_bytes.into())
366 .await
367 .context(connected_next_error::WriteSnafu)?;
368 writer
369 .sync()
370 .await
371 .context(connected_next_error::WriteSnafu)?;
372 len
373 };
374
375 drop(writer);
377
378 let hash = request.hash;
379 let ranges_iter = RangesIter::new(request.ranges);
380 let mut misc = Box::new(Misc {
382 counters,
383 start,
384 ranges_iter,
385 });
386 Ok(match misc.ranges_iter.next() {
387 Some((offset, ranges)) => {
388 if offset == 0 {
389 AtStartRoot {
390 reader,
391 ranges,
392 misc,
393 hash,
394 }
395 .into()
396 } else {
397 AtStartChild {
398 reader,
399 ranges,
400 misc,
401 offset,
402 }
403 .into()
404 }
405 }
406 None => AtClosing::new(misc, reader, true).into(),
407 })
408 }
409 }
410
411 #[derive(Debug)]
413 pub struct AtStartRoot<R: RecvStream = DefaultReader> {
414 ranges: ChunkRanges,
415 reader: R,
416 misc: Box<Misc>,
417 hash: Hash,
418 }
419
420 #[derive(Debug)]
422 pub struct AtStartChild<R: RecvStream = DefaultReader> {
423 ranges: ChunkRanges,
424 reader: R,
425 misc: Box<Misc>,
426 offset: u64,
427 }
428
429 impl<R: RecvStream> AtStartChild<R> {
430 pub fn offset(&self) -> u64 {
436 self.offset
437 }
438
439 pub fn ranges(&self) -> &ChunkRanges {
441 &self.ranges
442 }
443
444 pub fn next(self, hash: Hash) -> AtBlobHeader<R> {
448 AtBlobHeader {
449 reader: self.reader,
450 ranges: self.ranges,
451 misc: self.misc,
452 hash,
453 }
454 }
455
456 pub fn finish(self) -> AtClosing<R> {
462 AtClosing::new(self.misc, self.reader, false)
463 }
464 }
465
466 impl<R: RecvStream> AtStartRoot<R> {
467 pub fn ranges(&self) -> &ChunkRanges {
469 &self.ranges
470 }
471
472 pub fn hash(&self) -> Hash {
474 self.hash
475 }
476
477 pub fn next(self) -> AtBlobHeader<R> {
481 AtBlobHeader {
482 reader: self.reader,
483 ranges: self.ranges,
484 hash: self.hash,
485 misc: self.misc,
486 }
487 }
488
489 pub fn finish(self) -> AtClosing<R> {
491 AtClosing::new(self.misc, self.reader, false)
492 }
493 }
494
495 #[derive(Debug)]
497 pub struct AtBlobHeader<R: RecvStream = DefaultReader> {
498 ranges: ChunkRanges,
499 reader: R,
500 misc: Box<Misc>,
501 hash: Hash,
502 }
503
504 #[common_fields({
506 backtrace: Option<Backtrace>,
507 #[snafu(implicit)]
508 span_trace: SpanTrace,
509 })]
510 #[non_exhaustive]
511 #[derive(Debug, Snafu)]
512 #[snafu(module)]
513 pub enum AtBlobHeaderNextError {
514 #[snafu(display("not found"))]
518 NotFound {},
519 #[snafu(display("io: {source}"))]
521 Read { source: io::Error },
522 }
523
524 impl From<AtBlobHeaderNextError> for io::Error {
525 fn from(cause: AtBlobHeaderNextError) -> Self {
526 match cause {
527 AtBlobHeaderNextError::NotFound { .. } => {
528 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
529 }
530 AtBlobHeaderNextError::Read { source, .. } => source,
531 }
532 }
533 }
534
535 impl<R: RecvStream> AtBlobHeader<R> {
536 pub async fn next(mut self) -> Result<(AtBlobContent<R>, u64), AtBlobHeaderNextError> {
538 let mut size = [0; 8];
539 self.reader.recv_exact(&mut size).await.map_err(|cause| {
540 if cause.kind() == io::ErrorKind::UnexpectedEof {
541 at_blob_header_next_error::NotFoundSnafu.build()
542 } else {
543 at_blob_header_next_error::ReadSnafu.into_error(cause)
544 }
545 })?;
546 self.misc.other_bytes_read += 8;
547 let size = u64::from_le_bytes(size);
548 let stream = ResponseDecoder::new(
549 self.hash.into(),
550 self.ranges,
551 BaoTree::new(size, IROH_BLOCK_SIZE),
552 RecvStreamAsyncStreamReader::new(self.reader),
553 );
554 Ok((
555 AtBlobContent {
556 stream,
557 misc: self.misc,
558 },
559 size,
560 ))
561 }
562
563 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
565 let (content, _size) = self.next().await?;
566 content.drain().await
567 }
568
569 pub async fn concatenate_into_vec(
574 self,
575 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
576 let (content, _size) = self.next().await?;
577 content.concatenate_into_vec().await
578 }
579
580 pub async fn write_all<D: AsyncSliceWriter>(
582 self,
583 data: D,
584 ) -> result::Result<AtEndBlob<R>, DecodeError> {
585 let (content, _size) = self.next().await?;
586 let res = content.write_all(data).await?;
587 Ok(res)
588 }
589
590 pub async fn write_all_with_outboard<D, O>(
595 self,
596 outboard: Option<O>,
597 data: D,
598 ) -> result::Result<AtEndBlob<R>, DecodeError>
599 where
600 D: AsyncSliceWriter,
601 O: OutboardMut,
602 {
603 let (content, _size) = self.next().await?;
604 let res = content.write_all_with_outboard(outboard, data).await?;
605 Ok(res)
606 }
607
608 pub fn hash(&self) -> Hash {
610 self.hash
611 }
612
613 pub fn ranges(&self) -> &ChunkRanges {
615 &self.ranges
616 }
617
618 pub fn offset(&self) -> u64 {
620 self.misc.ranges_iter.offset()
621 }
622 }
623
624 #[derive(Debug)]
626 pub struct AtBlobContent<R: RecvStream = DefaultReader> {
627 stream: ResponseDecoder<RecvStreamAsyncStreamReader<R>>,
628 misc: Box<Misc>,
629 }
630
631 #[common_fields({
649 backtrace: Option<Backtrace>,
650 #[snafu(implicit)]
651 span_trace: SpanTrace,
652 })]
653 #[non_exhaustive]
654 #[derive(Debug, Snafu)]
655 #[snafu(module)]
656 pub enum DecodeError {
657 #[snafu(display("not found"))]
659 ChunkNotFound {},
660 #[snafu(display("parent not found {node:?}"))]
662 ParentNotFound { node: TreeNode },
663 #[snafu(display("chunk not found {num}"))]
665 LeafNotFound { num: ChunkNum },
666 #[snafu(display("parent hash mismatch: {node:?}"))]
668 ParentHashMismatch { node: TreeNode },
669 #[snafu(display("leaf hash mismatch: {num}"))]
671 LeafHashMismatch { num: ChunkNum },
672 #[snafu(display("read: {source}"))]
674 Read { source: io::Error },
675 #[snafu(display("io: {source}"))]
677 Write { source: io::Error },
678 }
679
680 impl DecodeError {
681 pub(crate) fn leaf_hash_mismatch(num: ChunkNum) -> Self {
682 decode_error::LeafHashMismatchSnafu { num }.build()
683 }
684 }
685
686 impl From<AtBlobHeaderNextError> for DecodeError {
687 fn from(cause: AtBlobHeaderNextError) -> Self {
688 match cause {
689 AtBlobHeaderNextError::NotFound { .. } => decode_error::ChunkNotFoundSnafu.build(),
690 AtBlobHeaderNextError::Read { source, .. } => {
691 decode_error::ReadSnafu.into_error(source)
692 }
693 }
694 }
695 }
696
697 impl From<DecodeError> for io::Error {
698 fn from(cause: DecodeError) -> Self {
699 match cause {
700 DecodeError::ParentNotFound { .. } => {
701 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
702 }
703 DecodeError::LeafNotFound { .. } => {
704 io::Error::new(io::ErrorKind::UnexpectedEof, cause)
705 }
706 DecodeError::Read { source, .. } => source,
707 DecodeError::Write { source, .. } => source,
708 _ => io::Error::other(cause),
709 }
710 }
711 }
712
713 impl From<bao_tree::io::DecodeError> for DecodeError {
714 fn from(value: bao_tree::io::DecodeError) -> Self {
715 match value {
716 bao_tree::io::DecodeError::ParentNotFound(node) => {
717 decode_error::ParentNotFoundSnafu { node }.build()
718 }
719 bao_tree::io::DecodeError::LeafNotFound(num) => {
720 decode_error::LeafNotFoundSnafu { num }.build()
721 }
722 bao_tree::io::DecodeError::ParentHashMismatch(node) => {
723 decode_error::ParentHashMismatchSnafu { node }.build()
724 }
725 bao_tree::io::DecodeError::LeafHashMismatch(num) => {
726 decode_error::LeafHashMismatchSnafu { num }.build()
727 }
728 bao_tree::io::DecodeError::Io(cause) => decode_error::ReadSnafu.into_error(cause),
729 }
730 }
731 }
732
733 #[derive(Debug, From)]
735 pub enum BlobContentNext<R: RecvStream> {
736 More(
738 (
739 AtBlobContent<R>,
740 result::Result<BaoContentItem, DecodeError>,
741 ),
742 ),
743 Done(AtEndBlob<R>),
745 }
746
747 impl<R: RecvStream> AtBlobContent<R> {
748 pub async fn next(self) -> BlobContentNext<R> {
750 match self.stream.next().await {
751 ResponseDecoderNext::More((stream, res)) => {
752 let mut next = Self { stream, ..self };
753 let res = res.map_err(DecodeError::from);
754 match &res {
755 Ok(BaoContentItem::Parent(_)) => {
756 next.misc.other_bytes_read += 64;
757 }
758 Ok(BaoContentItem::Leaf(leaf)) => {
759 next.misc.payload_bytes_read += leaf.data.len() as u64;
760 }
761 _ => {}
762 }
763 BlobContentNext::More((next, res))
764 }
765 ResponseDecoderNext::Done(stream) => BlobContentNext::Done(AtEndBlob {
766 stream: stream.into_inner(),
767 misc: self.misc,
768 }),
769 }
770 }
771
772 pub fn tree(&self) -> bao_tree::BaoTree {
774 self.stream.tree()
775 }
776
777 pub fn hash(&self) -> Hash {
779 (*self.stream.hash()).into()
780 }
781
782 pub fn offset(&self) -> u64 {
784 self.misc.ranges_iter.offset()
785 }
786
787 pub fn stats(&self) -> Stats {
789 Stats {
790 counters: self.misc.counters,
791 elapsed: self.misc.start.elapsed(),
792 }
793 }
794
795 pub async fn drain(self) -> result::Result<AtEndBlob<R>, DecodeError> {
797 let mut content = self;
798 loop {
799 match content.next().await {
800 BlobContentNext::More((content1, res)) => {
801 let _ = res?;
802 content = content1;
803 }
804 BlobContentNext::Done(end) => {
805 break Ok(end);
806 }
807 }
808 }
809 }
810
811 pub async fn concatenate_into_vec(
813 self,
814 ) -> result::Result<(AtEndBlob<R>, Vec<u8>), DecodeError> {
815 let mut res = Vec::with_capacity(1024);
816 let mut curr = self;
817 let done = loop {
818 match curr.next().await {
819 BlobContentNext::More((next, data)) => {
820 if let BaoContentItem::Leaf(leaf) = data? {
821 res.extend_from_slice(&leaf.data);
822 }
823 curr = next;
824 }
825 BlobContentNext::Done(done) => {
826 break done;
828 }
829 }
830 };
831 Ok((done, res))
832 }
833
834 pub async fn write_all_with_outboard<D, O>(
839 self,
840 mut outboard: Option<O>,
841 mut data: D,
842 ) -> result::Result<AtEndBlob<R>, DecodeError>
843 where
844 D: AsyncSliceWriter,
845 O: OutboardMut,
846 {
847 let mut content = self;
848 loop {
849 match content.next().await {
850 BlobContentNext::More((content1, item)) => {
851 content = content1;
852 match item? {
853 BaoContentItem::Parent(parent) => {
854 if let Some(outboard) = outboard.as_mut() {
855 outboard
856 .save(parent.node, &parent.pair)
857 .await
858 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
859 }
860 }
861 BaoContentItem::Leaf(leaf) => {
862 data.write_bytes_at(leaf.offset, leaf.data)
863 .await
864 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
865 }
866 }
867 }
868 BlobContentNext::Done(end) => {
869 return Ok(end);
870 }
871 }
872 }
873 }
874
875 pub async fn write_all<D>(self, mut data: D) -> result::Result<AtEndBlob<R>, DecodeError>
877 where
878 D: AsyncSliceWriter,
879 {
880 let mut content = self;
881 loop {
882 match content.next().await {
883 BlobContentNext::More((content1, item)) => {
884 content = content1;
885 match item? {
886 BaoContentItem::Parent(_) => {}
887 BaoContentItem::Leaf(leaf) => {
888 data.write_bytes_at(leaf.offset, leaf.data)
889 .await
890 .map_err(|e| decode_error::WriteSnafu.into_error(e))?;
891 }
892 }
893 }
894 BlobContentNext::Done(end) => {
895 return Ok(end);
896 }
897 }
898 }
899 }
900
901 pub fn finish(self) -> AtClosing<R> {
903 AtClosing::new(self.misc, self.stream.finish().into_inner(), false)
904 }
905 }
906
907 #[derive(Debug)]
909 pub struct AtEndBlob<R: RecvStream = DefaultReader> {
910 stream: R,
911 misc: Box<Misc>,
912 }
913
914 #[derive(Debug, From)]
916 pub enum EndBlobNext<R: RecvStream = DefaultReader> {
917 MoreChildren(AtStartChild<R>),
919 Closing(AtClosing<R>),
921 }
922
923 impl<R: RecvStream> AtEndBlob<R> {
924 pub fn next(mut self) -> EndBlobNext<R> {
926 if let Some((offset, ranges)) = self.misc.ranges_iter.next() {
927 AtStartChild {
928 reader: self.stream,
929 offset,
930 ranges,
931 misc: self.misc,
932 }
933 .into()
934 } else {
935 AtClosing::new(self.misc, self.stream, true).into()
936 }
937 }
938 }
939
940 #[derive(Debug)]
942 pub struct AtClosing<R: RecvStream = DefaultReader> {
943 misc: Box<Misc>,
944 reader: R,
945 check_extra_data: bool,
946 }
947
948 impl<R: RecvStream> AtClosing<R> {
949 fn new(misc: Box<Misc>, reader: R, check_extra_data: bool) -> Self {
950 Self {
951 misc,
952 reader,
953 check_extra_data,
954 }
955 }
956
957 pub async fn next(self) -> result::Result<Stats, AtClosingNextError> {
959 let mut reader = self.reader;
961 if self.check_extra_data {
962 let rest = reader.recv_bytes(1).await?;
963 if !rest.is_empty() {
964 error!("Unexpected extra data at the end of the stream");
965 }
966 }
967 Ok(Stats {
968 counters: self.misc.counters,
969 elapsed: self.misc.start.elapsed(),
970 })
971 }
972 }
973
974 #[common_fields({
976 backtrace: Option<Backtrace>,
977 #[snafu(implicit)]
978 span_trace: SpanTrace,
979 })]
980 #[non_exhaustive]
981 #[derive(Debug, Snafu)]
982 #[snafu(module)]
983 pub enum AtClosingNextError {
984 #[snafu(transparent)]
986 Read { source: io::Error },
987 }
988
989 #[derive(Debug, Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq)]
990 pub struct RequestCounters {
991 pub payload_bytes_written: u64,
993 pub other_bytes_written: u64,
995 pub payload_bytes_read: u64,
997 pub other_bytes_read: u64,
999 }
1000
1001 #[derive(Debug, derive_more::Deref, derive_more::DerefMut)]
1003 struct Misc {
1004 start: Instant,
1006 #[deref]
1008 #[deref_mut]
1009 counters: RequestCounters,
1010 ranges_iter: RangesIter,
1012 }
1013}