kutil_http/body/
reader.rs1use {
2 bytes::*,
3 http::*,
4 http_body::*,
5 kutil_std::error::*,
6 std::{cmp::*, io, pin::*, task::*},
7 tokio::io::*,
8};
9
10const REMAINDER_INITIAL_CAPACITY: usize = 8 * 1_024; pub struct BodyReader<BodyT> {
18 body: Pin<Box<BodyT>>,
19
20 pub remainder: BytesMut,
22
23 pub trailers: Vec<HeaderMap>,
25}
26
27impl<BodyT> BodyReader<BodyT> {
28 pub fn new(body: BodyT) -> Self {
30 Self::new_with_first_bytes(body, None)
31 }
32
33 pub fn new_with_first_bytes(body: BodyT, first_bytes: Option<Bytes>) -> Self {
35 let remainder = match first_bytes {
36 Some(first_bytes) => first_bytes.into(),
37 None => BytesMut::with_capacity(0),
38 };
39
40 Self { body: Box::pin(body), remainder, trailers: Vec::new() }
41 }
42
43 pub fn into_inner(self) -> (BodyT, BytesMut, Vec<HeaderMap>)
48 where
49 BodyT: Unpin,
50 {
51 (*Pin::into_inner(self.body), self.remainder, self.trailers)
52 }
53
54 fn validate_remainder_capacity(&mut self) {
55 let capacity = self.remainder.capacity();
56 if capacity < REMAINDER_INITIAL_CAPACITY {
57 self.remainder.reserve(REMAINDER_INITIAL_CAPACITY - capacity);
58 }
59 }
60}
61
62impl<BodyT> AsyncRead for BodyReader<BodyT>
63where
64 BodyT: Body,
65 BodyT::Error: Into<CapturedError>, {
67 fn poll_read(
68 mut self: Pin<&mut Self>,
69 context: &mut Context<'_>,
70 buffer: &mut ReadBuf<'_>,
71 ) -> Poll<io::Result<()>> {
72 if self.remainder.has_remaining() {
74 let size = min(buffer.remaining_mut(), self.remainder.remaining());
75
76 if size != 0 {
77 let bytes = self.remainder.copy_to_bytes(size);
78 buffer.put(bytes);
79
80 if !buffer.has_remaining_mut() {
81 return Poll::Ready(Ok(()));
83 }
84 }
85 }
86
87 Poll::Ready(match ready!(self.body.as_mut().poll_frame(context)) {
88 Some(result) => {
89 let frame = result.map_err(io::Error::other)?;
90 match frame.into_data() {
91 Ok(mut data) => {
92 let size = min(buffer.remaining_mut(), data.remaining());
94
95 if size != 0 {
96 let bytes = data.copy_to_bytes(size);
97 buffer.put(bytes);
98 }
99
100 if data.has_remaining() {
102 self.validate_remainder_capacity();
103 self.remainder.put(data);
104 }
105
106 Ok(())
107 }
108
109 Err(frame) => {
111 match frame.into_trailers() {
112 Ok(trailers) => {
113 tracing::debug!("trailers frame");
114 self.trailers.push(trailers);
115
116 }
120
121 Err(_frame) => {
122 tracing::warn!("frame is not data and not trailers");
123 }
124 }
125
126 Ok(())
127 }
128 }
129 }
130
131 None => Ok(()),
132 })
133 }
134}
135
136pub trait IntoBodyReader<BodyT>: Sized {
142 fn into_reader(self) -> BodyReader<BodyT> {
144 self.into_reader_with_first_bytes(None)
145 }
146
147 fn into_reader_with_first_bytes(self, first_bytes: Option<Bytes>) -> BodyReader<BodyT>;
149}
150
151impl<BodyT> IntoBodyReader<BodyT> for BodyT
152where
153 BodyT: Body,
154 BodyT::Error: Into<CapturedError>,
155{
156 fn into_reader_with_first_bytes(self, first_bytes: Option<Bytes>) -> BodyReader<BodyT> {
157 BodyReader::new_with_first_bytes(self, first_bytes)
158 }
159}