tk_bufstream/
frame.rs

1// This module contains some code from tokio-core/src/io/framed.rs
2use std::io;
3
4use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
5use tokio_io::{AsyncRead, AsyncWrite};
6
7use {IoBuf, WriteBuf, ReadBuf, Buf};
8
9
10/// Decoding of a frame from an internal buffer.
11///
12/// This trait is used when constructing an instance of `Framed`. It defines how
13/// to decode the incoming bytes on a stream to the specified type of frame for
14/// that framed I/O stream.
15///
16/// The primary method of this trait, `decode`, attempts to decode a
17/// frame from a buffer of bytes. It has the option of returning `NotReady`,
18/// indicating that more bytes need to be read before decoding can
19/// continue.
20pub trait Decode: Sized {
21    /// Decoded message
22    type Item;
23    /// Attempts to decode a frame from the provided buffer of bytes.
24    ///
25    /// This method is called by `Framed` whenever bytes are ready to be parsed.
26    /// The provided buffer of bytes is what's been read so far, and this
27    /// instance of `Decode` can determine whether an entire frame is in the
28    /// buffer and is ready to be returned.
29    ///
30    /// If an entire frame is available, then this instance will remove those
31    /// bytes from the buffer provided and return them as a decoded
32    /// frame. Note that removing bytes from the provided buffer doesn't always
33    /// necessarily copy the bytes, so this should be an efficient operation in
34    /// most circumstances.
35    ///
36    /// If the bytes look valid, but a frame isn't fully available yet, then
37    /// `Ok(None)` is returned. This indicates to the `Framed` instance that
38    /// it needs to read some more bytes before calling this method again.
39    ///
40    /// Finally, if the bytes in the buffer are malformed then an error is
41    /// returned indicating why. This informs `Framed` that the stream is now
42    /// corrupt and should be terminated.
43    fn decode(&mut self, buf: &mut Buf)
44        -> Result<Option<Self::Item>, io::Error>;
45
46    /// A default method available to be called when there are no more bytes
47    /// available to be read from the underlying I/O.
48    ///
49    /// This method defaults to calling `decode` and returns an error if
50    /// `Ok(None)` is returned. Typically this doesn't need to be implemented
51    /// unless the framing protocol differs near the end of the stream.
52    fn done(&mut self, buf: &mut Buf) -> io::Result<Self::Item> {
53        match self.decode(buf)? {
54            Some(frame) => Ok(frame),
55            None => Err(io::Error::new(io::ErrorKind::Other,
56                                       "bytes remaining on stream")),
57        }
58    }
59}
60
61/// A trait for encoding frames into a byte buffer.
62///
63/// This trait is used as a building block of `Framed` to define how frames are
64/// encoded into bytes to get passed to the underlying byte stream. each
65/// frame written to `Framed` will be encoded with this trait to an internal
66/// buffer. That buffer is then written out when possible to the underlying I/O
67/// stream.
68pub trait Encode {
69    /// Value to encode
70    type Item: Sized;
71    /// Encodes a frame into the buffer provided.
72    ///
73    /// This method will encode `msg` into the byte buffer provided by `buf`.
74    /// The `buf` provided is an internal buffer of the `Framed` instance and
75    /// will be written out when possible.
76    fn encode(&mut self, value: Self::Item, buf: &mut Buf);
77}
78
79/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
80/// the `Encode` and `Decode` traits to encode and decode frames.
81pub struct Framed<T, C>(IoBuf<T>, C);
82
83/// A `Stream` interface to `ReadBuf` object
84pub struct ReadFramed<T, C>(ReadBuf<T>, C);
85
86/// A `Sink` interface to `WriteBuf` object
87pub struct WriteFramed<T, C>(WriteBuf<T>, C);
88
89impl<T: AsyncRead, C: Decode> Stream for Framed<T, C> {
90    type Item = C::Item;
91    type Error = io::Error;
92
93    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
94        loop {
95            if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
96                return Ok(Async::Ready(Some(frame)));
97            } else {
98                let nbytes = self.0.read()?;
99                if nbytes == 0 {
100                    if self.0.done() {
101                        return Ok(Async::Ready(None));
102                    } else {
103                        return Ok(Async::NotReady);
104                    }
105                }
106            }
107        }
108    }
109}
110
111impl<T: AsyncWrite, C: Encode> Sink for Framed<T, C> {
112    type SinkItem = C::Item;
113    type SinkError = io::Error;
114
115    fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
116        self.1.encode(item, &mut self.0.out_buf);
117        Ok(AsyncSink::Ready)
118    }
119
120    fn poll_complete(&mut self) -> Poll<(), io::Error> {
121        self.0.flush()?;
122        Ok(Async::Ready(()))
123    }
124}
125
126pub fn framed<T, C>(io: IoBuf<T>, codec: C) -> Framed<T, C> {
127    Framed(io, codec)
128}
129
130impl<T, C> Framed<T, C> {
131    /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
132    pub fn get_ref(&self) -> &IoBuf<T> {
133        &self.0
134    }
135
136    /// Returns a mutable reference to the underlying I/O stream wrapped by
137    /// `Framed`.
138    ///
139    /// Note that care should be taken to not tamper with the underlying stream
140    /// of data coming in as it may corrupt the stream of frames otherwise being
141    /// worked with.
142    pub fn get_mut(&mut self) -> &mut IoBuf<T> {
143        &mut self.0
144    }
145
146    /// Consumes the `Framed`, returning its underlying I/O stream.
147    ///
148    /// Note that stream may contain both input and output data buffered.
149    pub fn into_inner(self) -> IoBuf<T> {
150        self.0
151    }
152}
153
154impl<T: AsyncRead, C: Decode> Stream for ReadFramed<T, C> {
155    type Item = C::Item;
156    type Error = io::Error;
157
158    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
159        loop {
160            if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
161                return Ok(Async::Ready(Some(frame)));
162            } else {
163                let nbytes = self.0.read()?;
164                if nbytes == 0 {
165                    if self.0.done() {
166                        return Ok(Async::Ready(None));
167                    } else {
168                        return Ok(Async::NotReady);
169                    }
170                }
171            }
172        }
173    }
174}
175
176pub fn read_framed<T, C>(io: ReadBuf<T>, codec: C)
177    -> ReadFramed<T, C>
178{
179    ReadFramed(io, codec)
180}
181
182impl<T, C> ReadFramed<T, C> {
183    /// Returns a reference to the underlying I/O stream wrapped by `ReadFramed`.
184    pub fn get_ref(&self) -> &ReadBuf<T> {
185        &self.0
186    }
187
188    /// Returns a mutable reference to the underlying I/O stream wrapped by
189    /// `ReadFramed`.
190    ///
191    /// Note that care should be taken to not tamper with the underlying stream
192    /// of data coming in as it may corrupt the stream of frames otherwise being
193    /// worked with.
194    pub fn get_mut(&mut self) -> &mut ReadBuf<T> {
195        &mut self.0
196    }
197
198    /// Consumes the `ReadFramed`, returning its underlying I/O stream.
199    ///
200    /// Note that stream may contain both input and output data buffered.
201    pub fn into_inner(self) -> ReadBuf<T> {
202        self.0
203    }
204}
205
206impl<T: AsyncWrite, C: Encode> Sink for WriteFramed<T, C> {
207    type SinkItem = C::Item;
208    type SinkError = io::Error;
209
210    fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
211        self.1.encode(item, &mut self.0.out_buf);
212        Ok(AsyncSink::Ready)
213    }
214
215    fn poll_complete(&mut self) -> Poll<(), io::Error> {
216        self.0.flush()?;
217        Ok(Async::Ready(()))
218    }
219}
220
221pub fn write_framed<T, C>(io: WriteBuf<T>, codec: C) -> WriteFramed<T, C> {
222    WriteFramed(io, codec)
223}
224
225impl<T, C> WriteFramed<T, C> {
226    /// Returns a reference to the underlying I/O stream wrapped by `WriteFramed`.
227    pub fn get_ref(&self) -> &WriteBuf<T> {
228        &self.0
229    }
230
231    /// Returns a mutable reference to the underlying I/O stream wrapped by
232    /// `WriteFramed`.
233    ///
234    /// Note that care should be taken to not tamper with the underlying stream
235    /// of data coming in as it may corrupt the stream of frames otherwise being
236    /// worked with.
237    pub fn get_mut(&mut self) -> &mut WriteBuf<T> {
238        &mut self.0
239    }
240
241    /// Consumes the `WriteFramed`, returning its underlying I/O stream.
242    ///
243    /// Note that stream may contain both input and output data buffered.
244    pub fn into_inner(self) -> WriteBuf<T> {
245        self.0
246    }
247}