tokio_io/_tokio_codec/
framed_read.rs

1#![allow(deprecated)]
2
3use std::fmt;
4
5use super::framed::Fuse;
6use codec::Decoder;
7use AsyncRead;
8
9use bytes::BytesMut;
10use futures::{Async, Poll, Sink, StartSend, Stream};
11
12/// A `Stream` of messages decoded from an `AsyncRead`.
13pub struct FramedRead<T, D> {
14    inner: FramedRead2<Fuse<T, D>>,
15}
16
17pub struct FramedRead2<T> {
18    inner: T,
19    eof: bool,
20    is_readable: bool,
21    buffer: BytesMut,
22}
23
24const INITIAL_CAPACITY: usize = 8 * 1024;
25
26// ===== impl FramedRead =====
27
28impl<T, D> FramedRead<T, D>
29where
30    T: AsyncRead,
31    D: Decoder,
32{
33    /// Creates a new `FramedRead` with the given `decoder`.
34    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
35        FramedRead {
36            inner: framed_read2(Fuse(inner, decoder)),
37        }
38    }
39}
40
41impl<T, D> FramedRead<T, D> {
42    /// Returns a reference to the underlying I/O stream wrapped by
43    /// `FramedRead`.
44    ///
45    /// Note that care should be taken to not tamper with the underlying stream
46    /// of data coming in as it may corrupt the stream of frames otherwise
47    /// being worked with.
48    pub fn get_ref(&self) -> &T {
49        &self.inner.inner.0
50    }
51
52    /// Returns a mutable reference to the underlying I/O stream wrapped by
53    /// `FramedRead`.
54    ///
55    /// Note that care should be taken to not tamper with the underlying stream
56    /// of data coming in as it may corrupt the stream of frames otherwise
57    /// being worked with.
58    pub fn get_mut(&mut self) -> &mut T {
59        &mut self.inner.inner.0
60    }
61
62    /// Consumes the `FramedRead`, returning its underlying I/O stream.
63    ///
64    /// Note that care should be taken to not tamper with the underlying stream
65    /// of data coming in as it may corrupt the stream of frames otherwise
66    /// being worked with.
67    pub fn into_inner(self) -> T {
68        self.inner.inner.0
69    }
70
71    /// Returns a reference to the underlying decoder.
72    pub fn decoder(&self) -> &D {
73        &self.inner.inner.1
74    }
75
76    /// Returns a mutable reference to the underlying decoder.
77    pub fn decoder_mut(&mut self) -> &mut D {
78        &mut self.inner.inner.1
79    }
80}
81
82impl<T, D> Stream for FramedRead<T, D>
83where
84    T: AsyncRead,
85    D: Decoder,
86{
87    type Item = D::Item;
88    type Error = D::Error;
89
90    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
91        self.inner.poll()
92    }
93}
94
95impl<T, D> Sink for FramedRead<T, D>
96where
97    T: Sink,
98{
99    type SinkItem = T::SinkItem;
100    type SinkError = T::SinkError;
101
102    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
103        self.inner.inner.0.start_send(item)
104    }
105
106    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
107        self.inner.inner.0.poll_complete()
108    }
109
110    fn close(&mut self) -> Poll<(), Self::SinkError> {
111        self.inner.inner.0.close()
112    }
113}
114
115impl<T, D> fmt::Debug for FramedRead<T, D>
116where
117    T: fmt::Debug,
118    D: fmt::Debug,
119{
120    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121        f.debug_struct("FramedRead")
122            .field("inner", &self.inner.inner.0)
123            .field("decoder", &self.inner.inner.1)
124            .field("eof", &self.inner.eof)
125            .field("is_readable", &self.inner.is_readable)
126            .field("buffer", &self.inner.buffer)
127            .finish()
128    }
129}
130
131// ===== impl FramedRead2 =====
132
133pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
134    FramedRead2 {
135        inner: inner,
136        eof: false,
137        is_readable: false,
138        buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
139    }
140}
141
142pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
143    if buf.capacity() < INITIAL_CAPACITY {
144        let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
145        buf.reserve(bytes_to_reserve);
146    }
147    FramedRead2 {
148        inner: inner,
149        eof: false,
150        is_readable: buf.len() > 0,
151        buffer: buf,
152    }
153}
154
155impl<T> FramedRead2<T> {
156    pub fn get_ref(&self) -> &T {
157        &self.inner
158    }
159
160    pub fn into_inner(self) -> T {
161        self.inner
162    }
163
164    pub fn into_parts(self) -> (T, BytesMut) {
165        (self.inner, self.buffer)
166    }
167
168    pub fn get_mut(&mut self) -> &mut T {
169        &mut self.inner
170    }
171}
172
173impl<T> Stream for FramedRead2<T>
174where
175    T: AsyncRead + Decoder,
176{
177    type Item = T::Item;
178    type Error = T::Error;
179
180    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
181        loop {
182            // Repeatedly call `decode` or `decode_eof` as long as it is
183            // "readable". Readable is defined as not having returned `None`. If
184            // the upstream has returned EOF, and the decoder is no longer
185            // readable, it can be assumed that the decoder will never become
186            // readable again, at which point the stream is terminated.
187            if self.is_readable {
188                if self.eof {
189                    let frame = self.inner.decode_eof(&mut self.buffer)?;
190                    return Ok(Async::Ready(frame));
191                }
192
193                trace!("attempting to decode a frame");
194
195                if let Some(frame) = self.inner.decode(&mut self.buffer)? {
196                    trace!("frame decoded from buffer");
197                    return Ok(Async::Ready(Some(frame)));
198                }
199
200                self.is_readable = false;
201            }
202
203            assert!(!self.eof);
204
205            // Otherwise, try to read more data and try again. Make sure we've
206            // got room for at least one byte to read to ensure that we don't
207            // get a spurious 0 that looks like EOF
208            self.buffer.reserve(1);
209            if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
210                self.eof = true;
211            }
212
213            self.is_readable = true;
214        }
215    }
216}