kutil_http/body/
reader.rs

1use {
2    bytes::*,
3    http::*,
4    http_body::*,
5    kutil_std::error::*,
6    std::{cmp::*, io, pin::*, task::*},
7    tokio::io::*,
8};
9
10const REMAINDER_INITIAL_CAPACITY: usize = 8 * 1_024; // 8 KiB
11
12//
13// BodyReader
14//
15
16/// [AsyncRead] wrapper for [Body].
17pub struct BodyReader<BodyT> {
18    body: Pin<Box<BodyT>>,
19
20    /// Remainder.
21    pub remainder: BytesMut,
22
23    /// Trailers
24    pub trailers: Vec<HeaderMap>,
25}
26
27impl<BodyT> BodyReader<BodyT> {
28    /// Constructor.
29    pub fn new(body: BodyT) -> Self {
30        Self::new_with_first_bytes(body, None)
31    }
32
33    /// Constructor.
34    pub fn new_with_first_bytes(body: BodyT, first_bytes: Option<Bytes>) -> Self {
35        let remainder = match first_bytes {
36            Some(first_bytes) => first_bytes.into(),
37            None => BytesMut::with_capacity(0),
38        };
39
40        Self { body: Box::pin(body), remainder, trailers: Vec::new() }
41    }
42
43    /// Back to the inner [Body].
44    ///
45    /// Note that the body may have changed if we have read from this reader, in which case the
46    /// returned remainder will be non-empty and/or we may have trailers.
47    pub fn into_inner(self) -> (BodyT, BytesMut, Vec<HeaderMap>)
48    where
49        BodyT: Unpin,
50    {
51        (*Pin::into_inner(self.body), self.remainder, self.trailers)
52    }
53
54    fn validate_remainder_capacity(&mut self) {
55        let capacity = self.remainder.capacity();
56        if capacity < REMAINDER_INITIAL_CAPACITY {
57            self.remainder.reserve(REMAINDER_INITIAL_CAPACITY - capacity);
58        }
59    }
60}
61
62impl<BodyT> AsyncRead for BodyReader<BodyT>
63where
64    BodyT: Body,
65    BodyT::Error: Into<CapturedError>, // so it can be used with io::Error::other
66{
67    fn poll_read(
68        mut self: Pin<&mut Self>,
69        context: &mut Context<'_>,
70        buffer: &mut ReadBuf<'_>,
71    ) -> Poll<io::Result<()>> {
72        // Copy as much as we can from the remainder
73        if self.remainder.has_remaining() {
74            let size = min(buffer.remaining_mut(), self.remainder.remaining());
75
76            if size != 0 {
77                let bytes = self.remainder.copy_to_bytes(size);
78                buffer.put(bytes);
79
80                if !buffer.has_remaining_mut() {
81                    // Buffer is full
82                    return Poll::Ready(Ok(()));
83                }
84            }
85        }
86
87        Poll::Ready(match ready!(self.body.as_mut().poll_frame(context)) {
88            Some(result) => {
89                let frame = result.map_err(io::Error::other)?;
90                match frame.into_data() {
91                    Ok(mut data) => {
92                        // Copy as much as we can from the data
93                        let size = min(buffer.remaining_mut(), data.remaining());
94
95                        if size != 0 {
96                            let bytes = data.copy_to_bytes(size);
97                            buffer.put(bytes);
98                        }
99
100                        // Store leftover data in the remainder
101                        if data.has_remaining() {
102                            self.validate_remainder_capacity();
103                            self.remainder.put(data);
104                        }
105
106                        Ok(())
107                    }
108
109                    // Note that this is not actually an error
110                    Err(frame) => {
111                        match frame.into_trailers() {
112                            Ok(trailers) => {
113                                tracing::debug!("trailers frame");
114                                self.trailers.push(trailers);
115
116                                // Note: There really shouldn't be more than one trailers frame,
117                                // but Body::poll_frame doesn't explicitly disallow it so we
118                                // make sure to collect them all into a vector
119                            }
120
121                            Err(_frame) => {
122                                tracing::warn!("frame is not data and not trailers");
123                            }
124                        }
125
126                        Ok(())
127                    }
128                }
129            }
130
131            None => Ok(()),
132        })
133    }
134}
135
136//
137// IntoBodyReader
138//
139
140/// Into [BodyReader].
141pub trait IntoBodyReader<BodyT>: Sized {
142    /// Into [BodyReader].
143    fn into_reader(self) -> BodyReader<BodyT> {
144        self.into_reader_with_first_bytes(None)
145    }
146
147    /// Into [BodyReader].
148    fn into_reader_with_first_bytes(self, first_bytes: Option<Bytes>) -> BodyReader<BodyT>;
149}
150
151impl<BodyT> IntoBodyReader<BodyT> for BodyT
152where
153    BodyT: Body,
154    BodyT::Error: Into<CapturedError>,
155{
156    fn into_reader_with_first_bytes(self, first_bytes: Option<Bytes>) -> BodyReader<BodyT> {
157        BodyReader::new_with_first_bytes(self, first_bytes)
158    }
159}