hreq_h2/tokio_codec/
framed.rs

1use super::decoder::Decoder;
2use super::encoder::Encoder;
3use super::framed_impl::{FramedImpl, RWFrames, ReadFrame, WriteFrame};
4
5use futures_io::{AsyncRead, AsyncWrite};
6use futures_core::Stream;
7
8use bytes::BytesMut;
9use futures_sink::Sink;
10use pin_project_lite::pin_project;
11use std::fmt;
12use std::io;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15
16pin_project! {
17    /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18    /// the `Encoder` and `Decoder` traits to encode and decode frames.
19    ///
20    /// You can create a `Framed` instance by using the [`Decoder::framed`] adapter, or
21    /// by using the `new` function seen below.
22    ///
23    /// [`Stream`]: tokio::stream::Stream
24    /// [`Sink`]: futures_sink::Sink
25    /// [`AsyncRead`]: tokio::io::AsyncRead
26    /// [`Decoder::framed`]: crate::codec::Decoder::framed()
27    pub struct Framed<T, U> {
28        #[pin]
29        inner: FramedImpl<T, U, RWFrames>
30    }
31}
32
33impl<T, U> Framed<T, U>
34where
35    T: AsyncRead + AsyncWrite,
36{
37    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
38    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
39    ///
40    /// Raw I/O objects work with byte sequences, but higher-level code usually
41    /// wants to batch these into meaningful chunks, called "frames". This
42    /// method layers framing on top of an I/O object, by using the codec
43    /// traits to handle encoding and decoding of messages frames. Note that
44    /// the incoming and outgoing frame types may be distinct.
45    ///
46    /// This function returns a *single* object that is both [`Stream`] and
47    /// [`Sink`]; grouping this into a single object is often useful for layering
48    /// things like gzip or TLS, which require both read and write access to the
49    /// underlying object.
50    ///
51    /// If you want to work more directly with the streams and sink, consider
52    /// calling [`split`] on the `Framed` returned by this method, which will
53    /// break them into separate objects, allowing them to interact more easily.
54    ///
55    /// [`Stream`]: tokio::stream::Stream
56    /// [`Sink`]: futures_sink::Sink
57    /// [`Decode`]: crate::codec::Decoder
58    /// [`Encoder`]: crate::codec::Encoder
59    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
60    pub fn new(inner: T, codec: U) -> Framed<T, U> {
61        Framed {
62            inner: FramedImpl {
63                inner,
64                codec,
65                state: Default::default(),
66            },
67        }
68    }
69
70    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
71    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data,
72    /// with a specific read buffer initial capacity.
73    ///
74    /// Raw I/O objects work with byte sequences, but higher-level code usually
75    /// wants to batch these into meaningful chunks, called "frames". This
76    /// method layers framing on top of an I/O object, by using the codec
77    /// traits to handle encoding and decoding of messages frames. Note that
78    /// the incoming and outgoing frame types may be distinct.
79    ///
80    /// This function returns a *single* object that is both [`Stream`] and
81    /// [`Sink`]; grouping this into a single object is often useful for layering
82    /// things like gzip or TLS, which require both read and write access to the
83    /// underlying object.
84    ///
85    /// If you want to work more directly with the streams and sink, consider
86    /// calling [`split`] on the `Framed` returned by this method, which will
87    /// break them into separate objects, allowing them to interact more easily.
88    ///
89    /// [`Stream`]: tokio::stream::Stream
90    /// [`Sink`]: futures_sink::Sink
91    /// [`Decode`]: crate::codec::Decoder
92    /// [`Encoder`]: crate::codec::Encoder
93    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
94    pub fn with_capacity(inner: T, codec: U, capacity: usize) -> Framed<T, U> {
95        Framed {
96            inner: FramedImpl {
97                inner,
98                codec,
99                state: RWFrames {
100                    read: ReadFrame {
101                        eof: false,
102                        is_readable: false,
103                        buffer: BytesMut::with_capacity(capacity),
104                    },
105                    write: WriteFrame::default(),
106                },
107            },
108        }
109    }
110}
111
112impl<T, U> Framed<T, U> {
113    /// Provides a [`Stream`] and [`Sink`] interface for reading and writing to this
114    /// I/O object, using [`Decoder`] and [`Encoder`] to read and write the raw data.
115    ///
116    /// Raw I/O objects work with byte sequences, but higher-level code usually
117    /// wants to batch these into meaningful chunks, called "frames". This
118    /// method layers framing on top of an I/O object, by using the `Codec`
119    /// traits to handle encoding and decoding of messages frames. Note that
120    /// the incoming and outgoing frame types may be distinct.
121    ///
122    /// This function returns a *single* object that is both [`Stream`] and
123    /// [`Sink`]; grouping this into a single object is often useful for layering
124    /// things like gzip or TLS, which require both read and write access to the
125    /// underlying object.
126    ///
127    /// This objects takes a stream and a readbuffer and a writebuffer. These field
128    /// can be obtained from an existing `Framed` with the [`into_parts`] method.
129    ///
130    /// If you want to work more directly with the streams and sink, consider
131    /// calling [`split`] on the `Framed` returned by this method, which will
132    /// break them into separate objects, allowing them to interact more easily.
133    ///
134    /// [`Stream`]: tokio::stream::Stream
135    /// [`Sink`]: futures_sink::Sink
136    /// [`Decoder`]: crate::codec::Decoder
137    /// [`Encoder`]: crate::codec::Encoder
138    /// [`into_parts`]: crate::codec::Framed::into_parts()
139    /// [`split`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.split
140    pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
141        Framed {
142            inner: FramedImpl {
143                inner: parts.io,
144                codec: parts.codec,
145                state: RWFrames {
146                    read: parts.read_buf.into(),
147                    write: parts.write_buf.into(),
148                },
149            },
150        }
151    }
152
153    /// Returns a reference to the underlying I/O stream wrapped by
154    /// `Framed`.
155    ///
156    /// Note that care should be taken to not tamper with the underlying stream
157    /// of data coming in as it may corrupt the stream of frames otherwise
158    /// being worked with.
159    pub fn get_ref(&self) -> &T {
160        &self.inner.inner
161    }
162
163    /// Returns a mutable reference to the underlying I/O stream wrapped by
164    /// `Framed`.
165    ///
166    /// Note that care should be taken to not tamper with the underlying stream
167    /// of data coming in as it may corrupt the stream of frames otherwise
168    /// being worked with.
169    pub fn get_mut(&mut self) -> &mut T {
170        &mut self.inner.inner
171    }
172
173    /// Returns a reference to the underlying codec wrapped by
174    /// `Framed`.
175    ///
176    /// Note that care should be taken to not tamper with the underlying codec
177    /// as it may corrupt the stream of frames otherwise being worked with.
178    pub fn codec(&self) -> &U {
179        &self.inner.codec
180    }
181
182    /// Returns a mutable reference to the underlying codec wrapped by
183    /// `Framed`.
184    ///
185    /// Note that care should be taken to not tamper with the underlying codec
186    /// as it may corrupt the stream of frames otherwise being worked with.
187    pub fn codec_mut(&mut self) -> &mut U {
188        &mut self.inner.codec
189    }
190
191    /// Returns a reference to the read buffer.
192    pub fn read_buffer(&self) -> &BytesMut {
193        &self.inner.state.read.buffer
194    }
195
196    /// Returns a mutable reference to the read buffer.
197    pub fn read_buffer_mut(&mut self) -> &mut BytesMut {
198        &mut self.inner.state.read.buffer
199    }
200
201    /// Consumes the `Framed`, returning its underlying I/O stream.
202    ///
203    /// Note that care should be taken to not tamper with the underlying stream
204    /// of data coming in as it may corrupt the stream of frames otherwise
205    /// being worked with.
206    pub fn into_inner(self) -> T {
207        self.inner.inner
208    }
209
210    /// Consumes the `Framed`, returning its underlying I/O stream, the buffer
211    /// with unprocessed data, and the codec.
212    ///
213    /// Note that care should be taken to not tamper with the underlying stream
214    /// of data coming in as it may corrupt the stream of frames otherwise
215    /// being worked with.
216    pub fn into_parts(self) -> FramedParts<T, U> {
217        FramedParts {
218            io: self.inner.inner,
219            codec: self.inner.codec,
220            read_buf: self.inner.state.read.buffer,
221            write_buf: self.inner.state.write.buffer,
222            _priv: (),
223        }
224    }
225}
226
227// This impl just defers to the underlying FramedImpl
228impl<T, U> Stream for Framed<T, U>
229where
230    T: AsyncRead,
231    U: Decoder,
232{
233    type Item = Result<U::Item, U::Error>;
234
235    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236        self.project().inner.poll_next(cx)
237    }
238}
239
240// This impl just defers to the underlying FramedImpl
241impl<T, I, U> Sink<I> for Framed<T, U>
242where
243    T: AsyncWrite,
244    U: Encoder<I>,
245    U::Error: From<io::Error>,
246{
247    type Error = U::Error;
248
249    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
250        self.project().inner.poll_ready(cx)
251    }
252
253    fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
254        self.project().inner.start_send(item)
255    }
256
257    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
258        self.project().inner.poll_flush(cx)
259    }
260
261    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
262        self.project().inner.poll_close(cx)
263    }
264}
265
266impl<T, U> fmt::Debug for Framed<T, U>
267where
268    T: fmt::Debug,
269    U: fmt::Debug,
270{
271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272        f.debug_struct("Framed")
273            .field("io", self.get_ref())
274            .field("codec", self.codec())
275            .finish()
276    }
277}
278
279/// `FramedParts` contains an export of the data of a Framed transport.
280/// It can be used to construct a new [`Framed`] with a different codec.
281/// It contains all current buffers and the inner transport.
282///
283/// [`Framed`]: crate::codec::Framed
284#[derive(Debug)]
285#[allow(clippy::manual_non_exhaustive)]
286pub struct FramedParts<T, U> {
287    /// The inner transport used to read bytes to and write bytes to
288    pub io: T,
289
290    /// The codec
291    pub codec: U,
292
293    /// The buffer with read but unprocessed data.
294    pub read_buf: BytesMut,
295
296    /// A buffer with unprocessed data which are not written yet.
297    pub write_buf: BytesMut,
298
299    /// This private field allows us to add additional fields in the future in a
300    /// backwards compatible way.
301    _priv: (),
302}
303
304impl<T, U> FramedParts<T, U> {
305    /// Create a new, default, `FramedParts`
306    pub fn new<I>(io: T, codec: U) -> FramedParts<T, U>
307    where
308        U: Encoder<I>,
309    {
310        FramedParts {
311            io,
312            codec,
313            read_buf: BytesMut::new(),
314            write_buf: BytesMut::new(),
315            _priv: (),
316        }
317    }
318}