tk_bufstream/frame.rs
1// This module contains some code from tokio-core/src/io/framed.rs
2use std::io;
3
4use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
5use tokio_io::{AsyncRead, AsyncWrite};
6
7use {IoBuf, WriteBuf, ReadBuf, Buf};
8
9
10/// Decoding of a frame from an internal buffer.
11///
12/// This trait is used when constructing an instance of `Framed`. It defines how
13/// to decode the incoming bytes on a stream to the specified type of frame for
14/// that framed I/O stream.
15///
16/// The primary method of this trait, `decode`, attempts to decode a
17/// frame from a buffer of bytes. It has the option of returning `NotReady`,
18/// indicating that more bytes need to be read before decoding can
19/// continue.
20pub trait Decode: Sized {
21 /// Decoded message
22 type Item;
23 /// Attempts to decode a frame from the provided buffer of bytes.
24 ///
25 /// This method is called by `Framed` whenever bytes are ready to be parsed.
26 /// The provided buffer of bytes is what's been read so far, and this
27 /// instance of `Decode` can determine whether an entire frame is in the
28 /// buffer and is ready to be returned.
29 ///
30 /// If an entire frame is available, then this instance will remove those
31 /// bytes from the buffer provided and return them as a decoded
32 /// frame. Note that removing bytes from the provided buffer doesn't always
33 /// necessarily copy the bytes, so this should be an efficient operation in
34 /// most circumstances.
35 ///
36 /// If the bytes look valid, but a frame isn't fully available yet, then
37 /// `Ok(None)` is returned. This indicates to the `Framed` instance that
38 /// it needs to read some more bytes before calling this method again.
39 ///
40 /// Finally, if the bytes in the buffer are malformed then an error is
41 /// returned indicating why. This informs `Framed` that the stream is now
42 /// corrupt and should be terminated.
43 fn decode(&mut self, buf: &mut Buf)
44 -> Result<Option<Self::Item>, io::Error>;
45
46 /// A default method available to be called when there are no more bytes
47 /// available to be read from the underlying I/O.
48 ///
49 /// This method defaults to calling `decode` and returns an error if
50 /// `Ok(None)` is returned. Typically this doesn't need to be implemented
51 /// unless the framing protocol differs near the end of the stream.
52 fn done(&mut self, buf: &mut Buf) -> io::Result<Self::Item> {
53 match self.decode(buf)? {
54 Some(frame) => Ok(frame),
55 None => Err(io::Error::new(io::ErrorKind::Other,
56 "bytes remaining on stream")),
57 }
58 }
59}
60
61/// A trait for encoding frames into a byte buffer.
62///
63/// This trait is used as a building block of `Framed` to define how frames are
64/// encoded into bytes to get passed to the underlying byte stream. each
65/// frame written to `Framed` will be encoded with this trait to an internal
66/// buffer. That buffer is then written out when possible to the underlying I/O
67/// stream.
68pub trait Encode {
69 /// Value to encode
70 type Item: Sized;
71 /// Encodes a frame into the buffer provided.
72 ///
73 /// This method will encode `msg` into the byte buffer provided by `buf`.
74 /// The `buf` provided is an internal buffer of the `Framed` instance and
75 /// will be written out when possible.
76 fn encode(&mut self, value: Self::Item, buf: &mut Buf);
77}
78
79/// A unified `Stream` and `Sink` interface to an underlying `Io` object, using
80/// the `Encode` and `Decode` traits to encode and decode frames.
81pub struct Framed<T, C>(IoBuf<T>, C);
82
83/// A `Stream` interface to `ReadBuf` object
84pub struct ReadFramed<T, C>(ReadBuf<T>, C);
85
86/// A `Sink` interface to `WriteBuf` object
87pub struct WriteFramed<T, C>(WriteBuf<T>, C);
88
89impl<T: AsyncRead, C: Decode> Stream for Framed<T, C> {
90 type Item = C::Item;
91 type Error = io::Error;
92
93 fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
94 loop {
95 if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
96 return Ok(Async::Ready(Some(frame)));
97 } else {
98 let nbytes = self.0.read()?;
99 if nbytes == 0 {
100 if self.0.done() {
101 return Ok(Async::Ready(None));
102 } else {
103 return Ok(Async::NotReady);
104 }
105 }
106 }
107 }
108 }
109}
110
111impl<T: AsyncWrite, C: Encode> Sink for Framed<T, C> {
112 type SinkItem = C::Item;
113 type SinkError = io::Error;
114
115 fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
116 self.1.encode(item, &mut self.0.out_buf);
117 Ok(AsyncSink::Ready)
118 }
119
120 fn poll_complete(&mut self) -> Poll<(), io::Error> {
121 self.0.flush()?;
122 Ok(Async::Ready(()))
123 }
124}
125
126pub fn framed<T, C>(io: IoBuf<T>, codec: C) -> Framed<T, C> {
127 Framed(io, codec)
128}
129
130impl<T, C> Framed<T, C> {
131 /// Returns a reference to the underlying I/O stream wrapped by `Framed`.
132 pub fn get_ref(&self) -> &IoBuf<T> {
133 &self.0
134 }
135
136 /// Returns a mutable reference to the underlying I/O stream wrapped by
137 /// `Framed`.
138 ///
139 /// Note that care should be taken to not tamper with the underlying stream
140 /// of data coming in as it may corrupt the stream of frames otherwise being
141 /// worked with.
142 pub fn get_mut(&mut self) -> &mut IoBuf<T> {
143 &mut self.0
144 }
145
146 /// Consumes the `Framed`, returning its underlying I/O stream.
147 ///
148 /// Note that stream may contain both input and output data buffered.
149 pub fn into_inner(self) -> IoBuf<T> {
150 self.0
151 }
152}
153
154impl<T: AsyncRead, C: Decode> Stream for ReadFramed<T, C> {
155 type Item = C::Item;
156 type Error = io::Error;
157
158 fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
159 loop {
160 if let Some(frame) = self.1.decode(&mut self.0.in_buf)? {
161 return Ok(Async::Ready(Some(frame)));
162 } else {
163 let nbytes = self.0.read()?;
164 if nbytes == 0 {
165 if self.0.done() {
166 return Ok(Async::Ready(None));
167 } else {
168 return Ok(Async::NotReady);
169 }
170 }
171 }
172 }
173 }
174}
175
176pub fn read_framed<T, C>(io: ReadBuf<T>, codec: C)
177 -> ReadFramed<T, C>
178{
179 ReadFramed(io, codec)
180}
181
182impl<T, C> ReadFramed<T, C> {
183 /// Returns a reference to the underlying I/O stream wrapped by `ReadFramed`.
184 pub fn get_ref(&self) -> &ReadBuf<T> {
185 &self.0
186 }
187
188 /// Returns a mutable reference to the underlying I/O stream wrapped by
189 /// `ReadFramed`.
190 ///
191 /// Note that care should be taken to not tamper with the underlying stream
192 /// of data coming in as it may corrupt the stream of frames otherwise being
193 /// worked with.
194 pub fn get_mut(&mut self) -> &mut ReadBuf<T> {
195 &mut self.0
196 }
197
198 /// Consumes the `ReadFramed`, returning its underlying I/O stream.
199 ///
200 /// Note that stream may contain both input and output data buffered.
201 pub fn into_inner(self) -> ReadBuf<T> {
202 self.0
203 }
204}
205
206impl<T: AsyncWrite, C: Encode> Sink for WriteFramed<T, C> {
207 type SinkItem = C::Item;
208 type SinkError = io::Error;
209
210 fn start_send(&mut self, item: C::Item) -> StartSend<C::Item, io::Error> {
211 self.1.encode(item, &mut self.0.out_buf);
212 Ok(AsyncSink::Ready)
213 }
214
215 fn poll_complete(&mut self) -> Poll<(), io::Error> {
216 self.0.flush()?;
217 Ok(Async::Ready(()))
218 }
219}
220
221pub fn write_framed<T, C>(io: WriteBuf<T>, codec: C) -> WriteFramed<T, C> {
222 WriteFramed(io, codec)
223}
224
225impl<T, C> WriteFramed<T, C> {
226 /// Returns a reference to the underlying I/O stream wrapped by `WriteFramed`.
227 pub fn get_ref(&self) -> &WriteBuf<T> {
228 &self.0
229 }
230
231 /// Returns a mutable reference to the underlying I/O stream wrapped by
232 /// `WriteFramed`.
233 ///
234 /// Note that care should be taken to not tamper with the underlying stream
235 /// of data coming in as it may corrupt the stream of frames otherwise being
236 /// worked with.
237 pub fn get_mut(&mut self) -> &mut WriteBuf<T> {
238 &mut self.0
239 }
240
241 /// Consumes the `WriteFramed`, returning its underlying I/O stream.
242 ///
243 /// Note that stream may contain both input and output data buffered.
244 pub fn into_inner(self) -> WriteBuf<T> {
245 self.0
246 }
247}