tokio_io/_tokio_codec/
framed.rs

1#![allow(deprecated)]
2
3use std::fmt;
4use std::io::{self, Read, Write};
5
6use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
7use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
8use codec::{Decoder, Encoder};
9use {AsyncRead, AsyncWrite};
10
11use bytes::BytesMut;
12use futures::{Poll, Sink, StartSend, Stream};
13
14/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
15/// the `Encoder` and `Decoder` traits to encode and decode frames.
16///
17/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
18pub struct Framed<T, U> {
19    inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
20}
21
22pub struct Fuse<T, U>(pub T, pub U);
23
24impl<T, U> Framed<T, U>
25where
26    T: AsyncRead + AsyncWrite,
27    U: Decoder + Encoder,
28{
29    /// Provides a `Stream` and `Sink` interface for reading and writing to this
30    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
31    ///
32    /// Raw I/O objects work with byte sequences, but higher-level code usually
33    /// wants to batch these into meaningful chunks, called "frames". This
34    /// method layers framing on top of an I/O object, by using the `Codec`
35    /// traits to handle encoding and decoding of messages frames. Note that
36    /// the incoming and outgoing frame types may be distinct.
37    ///
38    /// This function returns a *single* object that is both `Stream` and
39    /// `Sink`; grouping this into a single object is often useful for layering
40    /// things like gzip or TLS, which require both read and write access to the
41    /// underlying object.
42    ///
43    /// If you want to work more directly with the streams and sink, consider
44    /// calling `split` on the `Framed` returned by this method, which will
45    /// break them into separate objects, allowing them to interact more easily.
46    pub fn new(inner: T, codec: U) -> Framed<T, U> {
47        Framed {
48            inner: framed_read2(framed_write2(Fuse(inner, codec))),
49        }
50    }
51}
52
53impl<T, U> Framed<T, U> {
54    /// Provides a `Stream` and `Sink` interface for reading and writing to this
55    /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
56    ///
57    /// Raw I/O objects work with byte sequences, but higher-level code usually
58    /// wants to batch these into meaningful chunks, called "frames". This
59    /// method layers framing on top of an I/O object, by using the `Codec`
60    /// traits to handle encoding and decoding of messages frames. Note that
61    /// the incoming and outgoing frame types may be distinct.
62    ///
63    /// This function returns a *single* object that is both `Stream` and
64    /// `Sink`; grouping this into a single object is often useful for layering
65    /// things like gzip or TLS, which require both read and write access to the
66    /// underlying object.
67    ///
68    /// This objects takes a stream and a readbuffer and a writebuffer. These field
69    /// can be obtained from an existing `Framed` with the `into_parts` method.
70    ///
71    /// If you want to work more directly with the streams and sink, consider
72    /// calling `split` on the `Framed` returned by this method, which will
73    /// break them into separate objects, allowing them to interact more easily.
74    pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
75        Framed {
76            inner: framed_read2_with_buffer(
77                framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
78                parts.read_buf,
79            ),
80        }
81    }
82
83    /// Returns a reference to the underlying I/O stream wrapped by
84    /// `Frame`.
85    ///
86    /// Note that care should be taken to not tamper with the underlying stream
87    /// of data coming in as it may corrupt the stream of frames otherwise
88    /// being worked with.
89    pub fn get_ref(&self) -> &T {
90        &self.inner.get_ref().get_ref().0
91    }
92
93    /// Returns a mutable reference to the underlying I/O stream wrapped by
94    /// `Frame`.
95    ///
96    /// Note that care should be taken to not tamper with the underlying stream
97    /// of data coming in as it may corrupt the stream of frames otherwise
98    /// being worked with.
99    pub fn get_mut(&mut self) -> &mut T {
100        &mut self.inner.get_mut().get_mut().0
101    }
102
103    /// Returns a reference to the underlying codec wrapped by
104    /// `Frame`.
105    ///
106    /// Note that care should be taken to not tamper with the underlying codec
107    /// as it may corrupt the stream of frames otherwise being worked with.
108    pub fn codec(&self) -> &U {
109        &self.inner.get_ref().get_ref().1
110    }
111
112    /// Returns a mutable reference to the underlying codec wrapped by
113    /// `Frame`.
114    ///
115    /// Note that care should be taken to not tamper with the underlying codec
116    /// as it may corrupt the stream of frames otherwise being worked with.
117    pub fn codec_mut(&mut self) -> &mut U {
118        &mut self.inner.get_mut().get_mut().1
119    }
120
121    /// Consumes the `Frame`, returning its underlying I/O stream.
122    ///
123    /// Note that care should be taken to not tamper with the underlying stream
124    /// of data coming in as it may corrupt the stream of frames otherwise
125    /// being worked with.
126    pub fn into_inner(self) -> T {
127        self.inner.into_inner().into_inner().0
128    }
129
130    /// Consumes the `Frame`, returning its underlying I/O stream, the buffer
131    /// with unprocessed data, and the codec.
132    ///
133    /// Note that care should be taken to not tamper with the underlying stream
134    /// of data coming in as it may corrupt the stream of frames otherwise
135    /// being worked with.
136    pub fn into_parts(self) -> FramedParts<T, U> {
137        let (inner, read_buf) = self.inner.into_parts();
138        let (inner, write_buf) = inner.into_parts();
139
140        FramedParts {
141            io: inner.0,
142            codec: inner.1,
143            read_buf: read_buf,
144            write_buf: write_buf,
145            _priv: (),
146        }
147    }
148}
149
150impl<T, U> Stream for Framed<T, U>
151where
152    T: AsyncRead,
153    U: Decoder,
154{
155    type Item = U::Item;
156    type Error = U::Error;
157
158    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
159        self.inner.poll()
160    }
161}
162
163impl<T, U> Sink for Framed<T, U>
164where
165    T: AsyncWrite,
166    U: Encoder,
167    U::Error: From<io::Error>,
168{
169    type SinkItem = U::Item;
170    type SinkError = U::Error;
171
172    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
173        self.inner.get_mut().start_send(item)
174    }
175
176    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
177        self.inner.get_mut().poll_complete()
178    }
179
180    fn close(&mut self) -> Poll<(), Self::SinkError> {
181        self.inner.get_mut().close()
182    }
183}
184
185impl<T, U> fmt::Debug for Framed<T, U>
186where
187    T: fmt::Debug,
188    U: fmt::Debug,
189{
190    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191        f.debug_struct("Framed")
192            .field("io", &self.inner.get_ref().get_ref().0)
193            .field("codec", &self.inner.get_ref().get_ref().1)
194            .finish()
195    }
196}
197
198// ===== impl Fuse =====
199
200impl<T: Read, U> Read for Fuse<T, U> {
201    fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
202        self.0.read(dst)
203    }
204}
205
206impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
207    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
208        self.0.prepare_uninitialized_buffer(buf)
209    }
210}
211
212impl<T: Write, U> Write for Fuse<T, U> {
213    fn write(&mut self, src: &[u8]) -> io::Result<usize> {
214        self.0.write(src)
215    }
216
217    fn flush(&mut self) -> io::Result<()> {
218        self.0.flush()
219    }
220}
221
222impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
223    fn shutdown(&mut self) -> Poll<(), io::Error> {
224        self.0.shutdown()
225    }
226}
227
228impl<T, U: Decoder> Decoder for Fuse<T, U> {
229    type Item = U::Item;
230    type Error = U::Error;
231
232    fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
233        self.1.decode(buffer)
234    }
235
236    fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
237        self.1.decode_eof(buffer)
238    }
239}
240
241impl<T, U: Encoder> Encoder for Fuse<T, U> {
242    type Item = U::Item;
243    type Error = U::Error;
244
245    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
246        self.1.encode(item, dst)
247    }
248}
249
250/// `FramedParts` contains an export of the data of a Framed transport.
251/// It can be used to construct a new `Framed` with a different codec.
252/// It contains all current buffers and the inner transport.
253#[derive(Debug)]
254pub struct FramedParts<T, U> {
255    /// The inner transport used to read bytes to and write bytes to
256    pub io: T,
257
258    /// The codec
259    pub codec: U,
260
261    /// The buffer with read but unprocessed data.
262    pub read_buf: BytesMut,
263
264    /// A buffer with unprocessed data which are not written yet.
265    pub write_buf: BytesMut,
266
267    /// This private field allows us to add additional fields in the future in a
268    /// backwards compatible way.
269    _priv: (),
270}
271
272impl<T, U> FramedParts<T, U> {
273    /// Create a new, default, `FramedParts`
274    pub fn new(io: T, codec: U) -> FramedParts<T, U> {
275        FramedParts {
276            io,
277            codec,
278            read_buf: BytesMut::new(),
279            write_buf: BytesMut::new(),
280            _priv: (),
281        }
282    }
283}