kutil_http/transcoding/
body.rs

1use super::super::body::*;
2
3use {
4    ::bytes::*,
5    async_compression::*,
6    http::*,
7    http_body::*,
8    kutil_std::error::*,
9    kutil_transcoding::{reader::*, *},
10    pin_project::*,
11    std::{collections::*, io, pin::*, result::Result, task::*},
12    tokio_util::io::*,
13};
14
15const BUFFER_INITIAL_CAPACITY: usize = 8 * 1_024; // 8 KiB
16
17//
18// TranscodingBody
19//
20
21/// [Body] wrapper that can encode, decode, or pass through.
22///
23/// Note that the resulting number (and of course sizes) of the data frames will not necessarily
24/// match those of the wrapped body.
25///
26/// Relies on [TranscodingReader].
27#[pin_project]
28pub struct TranscodingBody<InnerBodyT>
29where
30    InnerBodyT: Body,
31    InnerBodyT::Error: Into<CapturedError>,
32{
33    #[pin]
34    reader: TranscodingReader<BodyReader<InnerBodyT>>,
35    buffer: BytesMut,
36    trailers: Option<VecDeque<HeaderMap>>,
37}
38
39impl<InnerBodyT> TranscodingBody<InnerBodyT>
40where
41    InnerBodyT: Body,
42    InnerBodyT::Error: Into<CapturedError>,
43{
44    /// Constructor.
45    pub fn new(reader: TranscodingReader<BodyReader<InnerBodyT>>) -> Self {
46        Self { reader, buffer: BytesMut::with_capacity(0), trailers: None }
47    }
48
49    fn validate_buffer_capacity(&mut self) {
50        let capacity = self.buffer.capacity();
51        if capacity < BUFFER_INITIAL_CAPACITY {
52            self.buffer.reserve(BUFFER_INITIAL_CAPACITY - capacity);
53        }
54    }
55}
56
57impl<BodyT> From<Bytes> for TranscodingBody<BodyT>
58where
59    BodyT: Body + From<Bytes>,
60    BodyT::Error: Into<CapturedError>,
61{
62    fn from(bytes: Bytes) -> Self {
63        let body: BodyT = bytes.into();
64        body.into_transcoding_passthrough_with_first_bytes(None)
65    }
66}
67
68impl<InnerBodyT> Body for TranscodingBody<InnerBodyT>
69where
70    InnerBodyT: Body,
71    InnerBodyT::Data: From<Bytes>,
72    InnerBodyT::Error: Into<CapturedError>,
73{
74    type Data = InnerBodyT::Data;
75    type Error = io::Error;
76
77    fn poll_frame(
78        mut self: Pin<&mut Self>,
79        context: &mut Context<'_>,
80    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
81        // Return remaining buffer as data frame
82        if self.buffer.has_remaining() {
83            let bytes = self.buffer.split().freeze();
84            let frame = Frame::data(bytes.into());
85            return Poll::Ready(Some(Ok(frame)));
86        }
87
88        self.validate_buffer_capacity();
89
90        let projected_self = self.as_mut().project();
91
92        Poll::Ready({
93            let count = ready!(poll_read_buf(projected_self.reader, context, projected_self.buffer))?;
94
95            if count != 0 {
96                let bytes = projected_self.buffer.split_to(count).freeze();
97                let frame = Frame::data(bytes.into());
98                Some(Ok(frame))
99            } else {
100                // count = 0 means we are done with data
101
102                // Make sure we have the trailers
103                if self.trailers.is_none() {
104                    let trailers = &self.reader.inner().trailers;
105                    if !trailers.is_empty() {
106                        self.trailers = Some(trailers.clone().into());
107                    }
108                }
109
110                // Return the next trailer frame
111                self.trailers
112                    .as_mut()
113                    .and_then(|trailers| trailers.pop_front().map(|trailers| Ok(Frame::trailers(trailers))))
114            }
115        })
116    }
117}
118
119//
120// IntoTranscodingBody
121//
122
123/// Into a [TranscodingBody].
124pub trait IntoTranscodingBody<BodyT>: Sized
125where
126    BodyT: Body,
127    BodyT::Error: Into<CapturedError>,
128{
129    /// Into passthrough [TranscodingBody].
130    fn into_transcoding_passthrough(self) -> TranscodingBody<BodyT> {
131        self.into_transcoding_passthrough_with_first_bytes(None)
132    }
133
134    /// Into passthrough [TranscodingBody].
135    fn into_transcoding_passthrough_with_first_bytes(self, first_bytes: Option<Bytes>) -> TranscodingBody<BodyT>;
136
137    /// Into encoding [TranscodingBody].
138    fn into_encoding(self, encoding: &Encoding) -> TranscodingBody<BodyT> {
139        self.into_encoding_with_first_bytes(None, encoding)
140    }
141
142    /// Into encoding [TranscodingBody].
143    fn into_encoding_with_first_bytes(self, first_bytes: Option<Bytes>, encoding: &Encoding) -> TranscodingBody<BodyT>;
144
145    /// Into decoding [TranscodingBody].
146    fn into_decoding(self, encoding: &Encoding) -> TranscodingBody<BodyT> {
147        self.into_decoding_with_first_bytes(None, encoding)
148    }
149
150    /// Into decoding [TranscodingBody].
151    fn into_decoding_with_first_bytes(self, first_bytes: Option<Bytes>, encoding: &Encoding) -> TranscodingBody<BodyT>;
152}
153
154impl<BodyT> IntoTranscodingBody<BodyT> for BodyT
155where
156    BodyT: Body,
157    BodyT::Error: Into<CapturedError>,
158{
159    fn into_transcoding_passthrough_with_first_bytes(self, first_bytes: Option<Bytes>) -> TranscodingBody<BodyT> {
160        TranscodingBody::new(self.into_reader_with_first_bytes(first_bytes).into_passthrough_reader())
161    }
162
163    fn into_encoding_with_first_bytes(self, first_bytes: Option<Bytes>, encoding: &Encoding) -> TranscodingBody<BodyT> {
164        TranscodingBody::new(
165            self.into_reader_with_first_bytes(first_bytes).into_encoding_reader(encoding, Level::Fastest),
166        )
167    }
168
169    fn into_decoding_with_first_bytes(self, first_bytes: Option<Bytes>, encoding: &Encoding) -> TranscodingBody<BodyT> {
170        TranscodingBody::new(self.into_reader_with_first_bytes(first_bytes).into_decoding_reader(encoding))
171    }
172}