hreq_h2/tokio_codec/framed.rs
1use super::decoder::Decoder;
2use super::encoder::Encoder;
3use super::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_io::{AsyncRead, AsyncWrite};
6use futures_core::Stream;
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17 /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 ///
20 /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21 /// by using the `new` function seen below.
22 ///
23 /// [`Stream`]: tokio::stream::Stream
24 /// [`Sink`]: futures_sink::Sink
25 /// [`AsyncRead`]: tokio::io::AsyncRead
26 /// [`Decoder::framed`]: crate::codec::Decoder::framed()
27 pub struct Framed<T, U> {
28 #[pin]
29 inner: FramedImpl<T, U, RWFrames>
30 }
31}
32
33impl<T, U> Framed<T, U>
34where
35 T: AsyncRead + AsyncWrite,
36{
37 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
38 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
39 ///
40 /// Raw I/O objects work with byte sequences, but higher-level code usually
41 /// wants to batch these into meaningful chunks, called "frames". This
42 /// method layers framing on top of an I/O object, by using the codec
43 /// traits to handle encoding and decoding of messages frames. Note that
44 /// the incoming and outgoing frame types may be distinct.
45 ///
46 /// This function returns a *single* object that is both [`Stream`] and
47 /// [`Sink`]; grouping this into a single object is often useful for layering
48 /// things like gzip or TLS, which require both read and write access to the
49 /// underlying object.
50 ///
51 /// If you want to work more directly with the streams and sink, consider
52 /// calling [`split`] on the `Framed` returned by this method, which will
53 /// break them into separate objects, allowing them to interact more easily.
54 ///
55 /// [`Stream`]: tokio::stream::Stream
56 /// [`Sink`]: futures_sink::Sink
57 /// [`Decode`]: crate::codec::Decoder
58 /// [`Encoder`]: crate::codec::Encoder
59 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
60 pub fn new(inner: T, codec: U) -> Framed<T, U> {
61 Framed {
62 inner: FramedImpl {
63 inner,
64 codec,
65 state: Default::default(),
66 },
67 }
68 }
69
70 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
71 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
72 /// with a specific read buffer initial capacity.
73 ///
74 /// Raw I/O objects work with byte sequences, but higher-level code usually
75 /// wants to batch these into meaningful chunks, called "frames". This
76 /// method layers framing on top of an I/O object, by using the codec
77 /// traits to handle encoding and decoding of messages frames. Note that
78 /// the incoming and outgoing frame types may be distinct.
79 ///
80 /// This function returns a *single* object that is both [`Stream`] and
81 /// [`Sink`]; grouping this into a single object is often useful for layering
82 /// things like gzip or TLS, which require both read and write access to the
83 /// underlying object.
84 ///
85 /// If you want to work more directly with the streams and sink, consider
86 /// calling [`split`] on the `Framed` returned by this method, which will
87 /// break them into separate objects, allowing them to interact more easily.
88 ///
89 /// [`Stream`]: tokio::stream::Stream
90 /// [`Sink`]: futures_sink::Sink
91 /// [`Decode`]: crate::codec::Decoder
92 /// [`Encoder`]: crate::codec::Encoder
93 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
94 pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
95 Framed {
96 inner: FramedImpl {
97 inner,
98 codec,
99 state: RWFrames {
100 read: ReadFrame {
101 eof: false,
102 is_readable: false,
103 buffer: BytesMut::with_capacity(capacity),
104 },
105 write: WriteFrame::default(),
106 },
107 },
108 }
109 }
110}
111
112impl<T, U> Framed<T, U> {
113 /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
114 /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
115 ///
116 /// Raw I/O objects work with byte sequences, but higher-level code usually
117 /// wants to batch these into meaningful chunks, called "frames". This
118 /// method layers framing on top of an I/O object, by using the `Codec`
119 /// traits to handle encoding and decoding of messages frames. Note that
120 /// the incoming and outgoing frame types may be distinct.
121 ///
122 /// This function returns a *single* object that is both [`Stream`] and
123 /// [`Sink`]; grouping this into a single object is often useful for layering
124 /// things like gzip or TLS, which require both read and write access to the
125 /// underlying object.
126 ///
127 /// This objects takes a stream and a readbuffer and a writebuffer. These field
128 /// can be obtained from an existing `Framed` with the [`into_parts`] method.
129 ///
130 /// If you want to work more directly with the streams and sink, consider
131 /// calling [`split`] on the `Framed` returned by this method, which will
132 /// break them into separate objects, allowing them to interact more easily.
133 ///
134 /// [`Stream`]: tokio::stream::Stream
135 /// [`Sink`]: futures_sink::Sink
136 /// [`Decoder`]: crate::codec::Decoder
137 /// [`Encoder`]: crate::codec::Encoder
138 /// [`into_parts`]: crate::codec::Framed::into_parts()
139 /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
140 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
141 Framed {
142 inner: FramedImpl {
143 inner: parts.io,
144 codec: parts.codec,
145 state: RWFrames {
146 read: parts.read_buf.into(),
147 write: parts.write_buf.into(),
148 },
149 },
150 }
151 }
152
153 /// Returns a reference to the underlying I/O stream wrapped by
154 /// `Framed`.
155 ///
156 /// Note that care should be taken to not tamper with the underlying stream
157 /// of data coming in as it may corrupt the stream of frames otherwise
158 /// being worked with.
159 pub fn get_ref(&self) -> &T {
160 &self.inner.inner
161 }
162
163 /// Returns a mutable reference to the underlying I/O stream wrapped by
164 /// `Framed`.
165 ///
166 /// Note that care should be taken to not tamper with the underlying stream
167 /// of data coming in as it may corrupt the stream of frames otherwise
168 /// being worked with.
169 pub fn get_mut(&mut self) -> &mut T {
170 &mut self.inner.inner
171 }
172
173 /// Returns a reference to the underlying codec wrapped by
174 /// `Framed`.
175 ///
176 /// Note that care should be taken to not tamper with the underlying codec
177 /// as it may corrupt the stream of frames otherwise being worked with.
178 pub fn codec(&self) -> &U {
179 &self.inner.codec
180 }
181
182 /// Returns a mutable reference to the underlying codec wrapped by
183 /// `Framed`.
184 ///
185 /// Note that care should be taken to not tamper with the underlying codec
186 /// as it may corrupt the stream of frames otherwise being worked with.
187 pub fn codec_mut(&mut self) -> &mut U {
188 &mut self.inner.codec
189 }
190
191 /// Returns a reference to the read buffer.
192 pub fn read_buffer(&self) -> &BytesMut {
193 &self.inner.state.read.buffer
194 }
195
196 /// Returns a mutable reference to the read buffer.
197 pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
198 &mut self.inner.state.read.buffer
199 }
200
201 /// Consumes the `Framed`, returning its underlying I/O stream.
202 ///
203 /// Note that care should be taken to not tamper with the underlying stream
204 /// of data coming in as it may corrupt the stream of frames otherwise
205 /// being worked with.
206 pub fn into_inner(self) -> T {
207 self.inner.inner
208 }
209
210 /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
211 /// with unprocessed data, and the codec.
212 ///
213 /// Note that care should be taken to not tamper with the underlying stream
214 /// of data coming in as it may corrupt the stream of frames otherwise
215 /// being worked with.
216 pub fn into_parts(self) -> FramedParts<T, U> {
217 FramedParts {
218 io: self.inner.inner,
219 codec: self.inner.codec,
220 read_buf: self.inner.state.read.buffer,
221 write_buf: self.inner.state.write.buffer,
222 _priv: (),
223 }
224 }
225}
226
227// This impl just defers to the underlying FramedImpl
228impl<T, U> Stream for Framed<T, U>
229where
230 T: AsyncRead,
231 U: Decoder,
232{
233 type Item = Result<U::Item, U::Error>;
234
235 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236 self.project().inner.poll_next(cx)
237 }
238}
239
240// This impl just defers to the underlying FramedImpl
241impl<T, I, U> Sink<I> for Framed<T, U>
242where
243 T: AsyncWrite,
244 U: Encoder<I>,
245 U::Error: From<io::Error>,
246{
247 type Error = U::Error;
248
249 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
250 self.project().inner.poll_ready(cx)
251 }
252
253 fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
254 self.project().inner.start_send(item)
255 }
256
257 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
258 self.project().inner.poll_flush(cx)
259 }
260
261 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
262 self.project().inner.poll_close(cx)
263 }
264}
265
266impl<T, U> fmt::Debug for Framed<T, U>
267where
268 T: fmt::Debug,
269 U: fmt::Debug,
270{
271 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272 f.debug_struct("Framed")
273 .field("io", self.get_ref())
274 .field("codec", self.codec())
275 .finish()
276 }
277}
278
279/// `FramedParts` contains an export of the data of a Framed transport.
280/// It can be used to construct a new [`Framed`] with a different codec.
281/// It contains all current buffers and the inner transport.
282///
283/// [`Framed`]: crate::codec::Framed
284#[derive(Debug)]
285#[allow(clippy::manual_non_exhaustive)]
286pub struct FramedParts<T, U> {
287 /// The inner transport used to read bytes to and write bytes to
288 pub io: T,
289
290 /// The codec
291 pub codec: U,
292
293 /// The buffer with read but unprocessed data.
294 pub read_buf: BytesMut,
295
296 /// A buffer with unprocessed data which are not written yet.
297 pub write_buf: BytesMut,
298
299 /// This private field allows us to add additional fields in the future in a
300 /// backwards compatible way.
301 _priv: (),
302}
303
304impl<T, U> FramedParts<T, U> {
305 /// Create a new, default, `FramedParts`
306 pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
307 where
308 U: Encoder<I>,
309 {
310 FramedParts {
311 io,
312 codec,
313 read_buf: BytesMut::new(),
314 write_buf: BytesMut::new(),
315 _priv: (),
316 }
317 }
318}