tokio_io/_tokio_codec/framed.rs
1#![allow(deprecated)]
2
3use std::fmt;
4use std::io::{self, Read, Write};
5
6use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2};
7use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2};
8use codec::{Decoder, Encoder};
9use {AsyncRead, AsyncWrite};
10
11use bytes::BytesMut;
12use futures::{Poll, Sink, StartSend, Stream};
13
14/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
15/// the `Encoder` and `Decoder` traits to encode and decode frames.
16///
17/// You can create a `Framed` instance by using the `AsyncRead::framed` adapter.
18pub struct Framed<T, U> {
19 inner: FramedRead2<FramedWrite2<Fuse<T, U>>>,
20}
21
22pub struct Fuse<T, U>(pub T, pub U);
23
24impl<T, U> Framed<T, U>
25where
26 T: AsyncRead + AsyncWrite,
27 U: Decoder + Encoder,
28{
29 /// Provides a `Stream` and `Sink` interface for reading and writing to this
30 /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
31 ///
32 /// Raw I/O objects work with byte sequences, but higher-level code usually
33 /// wants to batch these into meaningful chunks, called "frames". This
34 /// method layers framing on top of an I/O object, by using the `Codec`
35 /// traits to handle encoding and decoding of messages frames. Note that
36 /// the incoming and outgoing frame types may be distinct.
37 ///
38 /// This function returns a *single* object that is both `Stream` and
39 /// `Sink`; grouping this into a single object is often useful for layering
40 /// things like gzip or TLS, which require both read and write access to the
41 /// underlying object.
42 ///
43 /// If you want to work more directly with the streams and sink, consider
44 /// calling `split` on the `Framed` returned by this method, which will
45 /// break them into separate objects, allowing them to interact more easily.
46 pub fn new(inner: T, codec: U) -> Framed<T, U> {
47 Framed {
48 inner: framed_read2(framed_write2(Fuse(inner, codec))),
49 }
50 }
51}
52
53impl<T, U> Framed<T, U> {
54 /// Provides a `Stream` and `Sink` interface for reading and writing to this
55 /// `Io` object, using `Decode` and `Encode` to read and write the raw data.
56 ///
57 /// Raw I/O objects work with byte sequences, but higher-level code usually
58 /// wants to batch these into meaningful chunks, called "frames". This
59 /// method layers framing on top of an I/O object, by using the `Codec`
60 /// traits to handle encoding and decoding of messages frames. Note that
61 /// the incoming and outgoing frame types may be distinct.
62 ///
63 /// This function returns a *single* object that is both `Stream` and
64 /// `Sink`; grouping this into a single object is often useful for layering
65 /// things like gzip or TLS, which require both read and write access to the
66 /// underlying object.
67 ///
68 /// This objects takes a stream and a readbuffer and a writebuffer. These field
69 /// can be obtained from an existing `Framed` with the `into_parts` method.
70 ///
71 /// If you want to work more directly with the streams and sink, consider
72 /// calling `split` on the `Framed` returned by this method, which will
73 /// break them into separate objects, allowing them to interact more easily.
74 pub fn from_parts(parts: FramedParts<T, U>) -> Framed<T, U> {
75 Framed {
76 inner: framed_read2_with_buffer(
77 framed_write2_with_buffer(Fuse(parts.io, parts.codec), parts.write_buf),
78 parts.read_buf,
79 ),
80 }
81 }
82
83 /// Returns a reference to the underlying I/O stream wrapped by
84 /// `Frame`.
85 ///
86 /// Note that care should be taken to not tamper with the underlying stream
87 /// of data coming in as it may corrupt the stream of frames otherwise
88 /// being worked with.
89 pub fn get_ref(&self) -> &T {
90 &self.inner.get_ref().get_ref().0
91 }
92
93 /// Returns a mutable reference to the underlying I/O stream wrapped by
94 /// `Frame`.
95 ///
96 /// Note that care should be taken to not tamper with the underlying stream
97 /// of data coming in as it may corrupt the stream of frames otherwise
98 /// being worked with.
99 pub fn get_mut(&mut self) -> &mut T {
100 &mut self.inner.get_mut().get_mut().0
101 }
102
103 /// Returns a reference to the underlying codec wrapped by
104 /// `Frame`.
105 ///
106 /// Note that care should be taken to not tamper with the underlying codec
107 /// as it may corrupt the stream of frames otherwise being worked with.
108 pub fn codec(&self) -> &U {
109 &self.inner.get_ref().get_ref().1
110 }
111
112 /// Returns a mutable reference to the underlying codec wrapped by
113 /// `Frame`.
114 ///
115 /// Note that care should be taken to not tamper with the underlying codec
116 /// as it may corrupt the stream of frames otherwise being worked with.
117 pub fn codec_mut(&mut self) -> &mut U {
118 &mut self.inner.get_mut().get_mut().1
119 }
120
121 /// Consumes the `Frame`, returning its underlying I/O stream.
122 ///
123 /// Note that care should be taken to not tamper with the underlying stream
124 /// of data coming in as it may corrupt the stream of frames otherwise
125 /// being worked with.
126 pub fn into_inner(self) -> T {
127 self.inner.into_inner().into_inner().0
128 }
129
130 /// Consumes the `Frame`, returning its underlying I/O stream, the buffer
131 /// with unprocessed data, and the codec.
132 ///
133 /// Note that care should be taken to not tamper with the underlying stream
134 /// of data coming in as it may corrupt the stream of frames otherwise
135 /// being worked with.
136 pub fn into_parts(self) -> FramedParts<T, U> {
137 let (inner, read_buf) = self.inner.into_parts();
138 let (inner, write_buf) = inner.into_parts();
139
140 FramedParts {
141 io: inner.0,
142 codec: inner.1,
143 read_buf: read_buf,
144 write_buf: write_buf,
145 _priv: (),
146 }
147 }
148}
149
150impl<T, U> Stream for Framed<T, U>
151where
152 T: AsyncRead,
153 U: Decoder,
154{
155 type Item = U::Item;
156 type Error = U::Error;
157
158 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
159 self.inner.poll()
160 }
161}
162
163impl<T, U> Sink for Framed<T, U>
164where
165 T: AsyncWrite,
166 U: Encoder,
167 U::Error: From<io::Error>,
168{
169 type SinkItem = U::Item;
170 type SinkError = U::Error;
171
172 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
173 self.inner.get_mut().start_send(item)
174 }
175
176 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
177 self.inner.get_mut().poll_complete()
178 }
179
180 fn close(&mut self) -> Poll<(), Self::SinkError> {
181 self.inner.get_mut().close()
182 }
183}
184
185impl<T, U> fmt::Debug for Framed<T, U>
186where
187 T: fmt::Debug,
188 U: fmt::Debug,
189{
190 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
191 f.debug_struct("Framed")
192 .field("io", &self.inner.get_ref().get_ref().0)
193 .field("codec", &self.inner.get_ref().get_ref().1)
194 .finish()
195 }
196}
197
198// ===== impl Fuse =====
199
200impl<T: Read, U> Read for Fuse<T, U> {
201 fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
202 self.0.read(dst)
203 }
204}
205
206impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
207 unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
208 self.0.prepare_uninitialized_buffer(buf)
209 }
210}
211
212impl<T: Write, U> Write for Fuse<T, U> {
213 fn write(&mut self, src: &[u8]) -> io::Result<usize> {
214 self.0.write(src)
215 }
216
217 fn flush(&mut self) -> io::Result<()> {
218 self.0.flush()
219 }
220}
221
222impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
223 fn shutdown(&mut self) -> Poll<(), io::Error> {
224 self.0.shutdown()
225 }
226}
227
228impl<T, U: Decoder> Decoder for Fuse<T, U> {
229 type Item = U::Item;
230 type Error = U::Error;
231
232 fn decode(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
233 self.1.decode(buffer)
234 }
235
236 fn decode_eof(&mut self, buffer: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
237 self.1.decode_eof(buffer)
238 }
239}
240
241impl<T, U: Encoder> Encoder for Fuse<T, U> {
242 type Item = U::Item;
243 type Error = U::Error;
244
245 fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
246 self.1.encode(item, dst)
247 }
248}
249
250/// `FramedParts` contains an export of the data of a Framed transport.
251/// It can be used to construct a new `Framed` with a different codec.
252/// It contains all current buffers and the inner transport.
253#[derive(Debug)]
254pub struct FramedParts<T, U> {
255 /// The inner transport used to read bytes to and write bytes to
256 pub io: T,
257
258 /// The codec
259 pub codec: U,
260
261 /// The buffer with read but unprocessed data.
262 pub read_buf: BytesMut,
263
264 /// A buffer with unprocessed data which are not written yet.
265 pub write_buf: BytesMut,
266
267 /// This private field allows us to add additional fields in the future in a
268 /// backwards compatible way.
269 _priv: (),
270}
271
272impl<T, U> FramedParts<T, U> {
273 /// Create a new, default, `FramedParts`
274 pub fn new(io: T, codec: U) -> FramedParts<T, U> {
275 FramedParts {
276 io,
277 codec,
278 read_buf: BytesMut::new(),
279 write_buf: BytesMut::new(),
280 _priv: (),
281 }
282 }
283}