tokio_io/
framed_read.rs

1#![allow(deprecated)]
2
3use std::fmt;
4
5use codec::Decoder;
6use framed::Fuse;
7use AsyncRead;
8
9use bytes::BytesMut;
10use futures::{Async, Poll, Sink, StartSend, Stream};
11
12/// A `Stream` of messages decoded from an `AsyncRead`.
13#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
14#[doc(hidden)]
15pub struct FramedRead<T, D> {
16    inner: FramedRead2<Fuse<T, D>>,
17}
18
19#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
20#[doc(hidden)]
21pub struct FramedRead2<T> {
22    inner: T,
23    eof: bool,
24    is_readable: bool,
25    buffer: BytesMut,
26}
27
28const INITIAL_CAPACITY: usize = 8 * 1024;
29
30// ===== impl FramedRead =====
31
32impl<T, D> FramedRead<T, D>
33where
34    T: AsyncRead,
35    D: Decoder,
36{
37    /// Creates a new `FramedRead` with the given `decoder`.
38    pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
39        FramedRead {
40            inner: framed_read2(Fuse(inner, decoder)),
41        }
42    }
43}
44
45impl<T, D> FramedRead<T, D> {
46    /// Returns a reference to the underlying I/O stream wrapped by
47    /// `FramedRead`.
48    ///
49    /// Note that care should be taken to not tamper with the underlying stream
50    /// of data coming in as it may corrupt the stream of frames otherwise
51    /// being worked with.
52    pub fn get_ref(&self) -> &T {
53        &self.inner.inner.0
54    }
55
56    /// Returns a mutable reference to the underlying I/O stream wrapped by
57    /// `FramedRead`.
58    ///
59    /// Note that care should be taken to not tamper with the underlying stream
60    /// of data coming in as it may corrupt the stream of frames otherwise
61    /// being worked with.
62    pub fn get_mut(&mut self) -> &mut T {
63        &mut self.inner.inner.0
64    }
65
66    /// Consumes the `FramedRead`, returning its underlying I/O stream.
67    ///
68    /// Note that care should be taken to not tamper with the underlying stream
69    /// of data coming in as it may corrupt the stream of frames otherwise
70    /// being worked with.
71    pub fn into_inner(self) -> T {
72        self.inner.inner.0
73    }
74
75    /// Returns a reference to the underlying decoder.
76    pub fn decoder(&self) -> &D {
77        &self.inner.inner.1
78    }
79
80    /// Returns a mutable reference to the underlying decoder.
81    pub fn decoder_mut(&mut self) -> &mut D {
82        &mut self.inner.inner.1
83    }
84}
85
86impl<T, D> Stream for FramedRead<T, D>
87where
88    T: AsyncRead,
89    D: Decoder,
90{
91    type Item = D::Item;
92    type Error = D::Error;
93
94    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
95        self.inner.poll()
96    }
97}
98
99impl<T, D> Sink for FramedRead<T, D>
100where
101    T: Sink,
102{
103    type SinkItem = T::SinkItem;
104    type SinkError = T::SinkError;
105
106    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
107        self.inner.inner.0.start_send(item)
108    }
109
110    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
111        self.inner.inner.0.poll_complete()
112    }
113
114    fn close(&mut self) -> Poll<(), Self::SinkError> {
115        self.inner.inner.0.close()
116    }
117}
118
119impl<T, D> fmt::Debug for FramedRead<T, D>
120where
121    T: fmt::Debug,
122    D: fmt::Debug,
123{
124    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125        f.debug_struct("FramedRead")
126            .field("inner", &self.inner.inner.0)
127            .field("decoder", &self.inner.inner.1)
128            .field("eof", &self.inner.eof)
129            .field("is_readable", &self.inner.is_readable)
130            .field("buffer", &self.inner.buffer)
131            .finish()
132    }
133}
134
135// ===== impl FramedRead2 =====
136
137pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
138    FramedRead2 {
139        inner: inner,
140        eof: false,
141        is_readable: false,
142        buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
143    }
144}
145
146pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
147    if buf.capacity() < INITIAL_CAPACITY {
148        let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
149        buf.reserve(bytes_to_reserve);
150    }
151    FramedRead2 {
152        inner: inner,
153        eof: false,
154        is_readable: buf.len() > 0,
155        buffer: buf,
156    }
157}
158
159impl<T> FramedRead2<T> {
160    pub fn get_ref(&self) -> &T {
161        &self.inner
162    }
163
164    pub fn into_inner(self) -> T {
165        self.inner
166    }
167
168    pub fn into_parts(self) -> (T, BytesMut) {
169        (self.inner, self.buffer)
170    }
171
172    pub fn get_mut(&mut self) -> &mut T {
173        &mut self.inner
174    }
175}
176
177impl<T> Stream for FramedRead2<T>
178where
179    T: AsyncRead + Decoder,
180{
181    type Item = T::Item;
182    type Error = T::Error;
183
184    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
185        loop {
186            // Repeatedly call `decode` or `decode_eof` as long as it is
187            // "readable". Readable is defined as not having returned `None`. If
188            // the upstream has returned EOF, and the decoder is no longer
189            // readable, it can be assumed that the decoder will never become
190            // readable again, at which point the stream is terminated.
191            if self.is_readable {
192                if self.eof {
193                    let frame = self.inner.decode_eof(&mut self.buffer)?;
194                    return Ok(Async::Ready(frame));
195                }
196
197                trace!("attempting to decode a frame");
198
199                if let Some(frame) = self.inner.decode(&mut self.buffer)? {
200                    trace!("frame decoded from buffer");
201                    return Ok(Async::Ready(Some(frame)));
202                }
203
204                self.is_readable = false;
205            }
206
207            assert!(!self.eof);
208
209            // Otherwise, try to read more data and try again. Make sure we've
210            // got room for at least one byte to read to ensure that we don't
211            // get a spurious 0 that looks like EOF
212            self.buffer.reserve(1);
213            if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
214                self.eof = true;
215            }
216
217            self.is_readable = true;
218        }
219    }
220}