1#![allow(deprecated)]
2
3use std::fmt;
4
5use codec::Decoder;
6use framed::Fuse;
7use AsyncRead;
8
9use bytes::BytesMut;
10use futures::{Async, Poll, Sink, StartSend, Stream};
11
12#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
14#[doc(hidden)]
15pub struct FramedRead<T, D> {
16 inner: FramedRead2<Fuse<T, D>>,
17}
18
19#[deprecated(since = "0.1.7", note = "Moved to tokio-codec")]
20#[doc(hidden)]
21pub struct FramedRead2<T> {
22 inner: T,
23 eof: bool,
24 is_readable: bool,
25 buffer: BytesMut,
26}
27
28const INITIAL_CAPACITY: usize = 8 * 1024;
29
30impl<T, D> FramedRead<T, D>
33where
34 T: AsyncRead,
35 D: Decoder,
36{
37 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
39 FramedRead {
40 inner: framed_read2(Fuse(inner, decoder)),
41 }
42 }
43}
44
45impl<T, D> FramedRead<T, D> {
46 pub fn get_ref(&self) -> &T {
53 &self.inner.inner.0
54 }
55
56 pub fn get_mut(&mut self) -> &mut T {
63 &mut self.inner.inner.0
64 }
65
66 pub fn into_inner(self) -> T {
72 self.inner.inner.0
73 }
74
75 pub fn decoder(&self) -> &D {
77 &self.inner.inner.1
78 }
79
80 pub fn decoder_mut(&mut self) -> &mut D {
82 &mut self.inner.inner.1
83 }
84}
85
86impl<T, D> Stream for FramedRead<T, D>
87where
88 T: AsyncRead,
89 D: Decoder,
90{
91 type Item = D::Item;
92 type Error = D::Error;
93
94 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
95 self.inner.poll()
96 }
97}
98
99impl<T, D> Sink for FramedRead<T, D>
100where
101 T: Sink,
102{
103 type SinkItem = T::SinkItem;
104 type SinkError = T::SinkError;
105
106 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
107 self.inner.inner.0.start_send(item)
108 }
109
110 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
111 self.inner.inner.0.poll_complete()
112 }
113
114 fn close(&mut self) -> Poll<(), Self::SinkError> {
115 self.inner.inner.0.close()
116 }
117}
118
119impl<T, D> fmt::Debug for FramedRead<T, D>
120where
121 T: fmt::Debug,
122 D: fmt::Debug,
123{
124 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
125 f.debug_struct("FramedRead")
126 .field("inner", &self.inner.inner.0)
127 .field("decoder", &self.inner.inner.1)
128 .field("eof", &self.inner.eof)
129 .field("is_readable", &self.inner.is_readable)
130 .field("buffer", &self.inner.buffer)
131 .finish()
132 }
133}
134
135pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
138 FramedRead2 {
139 inner: inner,
140 eof: false,
141 is_readable: false,
142 buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
143 }
144}
145
146pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
147 if buf.capacity() < INITIAL_CAPACITY {
148 let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
149 buf.reserve(bytes_to_reserve);
150 }
151 FramedRead2 {
152 inner: inner,
153 eof: false,
154 is_readable: buf.len() > 0,
155 buffer: buf,
156 }
157}
158
159impl<T> FramedRead2<T> {
160 pub fn get_ref(&self) -> &T {
161 &self.inner
162 }
163
164 pub fn into_inner(self) -> T {
165 self.inner
166 }
167
168 pub fn into_parts(self) -> (T, BytesMut) {
169 (self.inner, self.buffer)
170 }
171
172 pub fn get_mut(&mut self) -> &mut T {
173 &mut self.inner
174 }
175}
176
177impl<T> Stream for FramedRead2<T>
178where
179 T: AsyncRead + Decoder,
180{
181 type Item = T::Item;
182 type Error = T::Error;
183
184 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
185 loop {
186 if self.is_readable {
192 if self.eof {
193 let frame = self.inner.decode_eof(&mut self.buffer)?;
194 return Ok(Async::Ready(frame));
195 }
196
197 trace!("attempting to decode a frame");
198
199 if let Some(frame) = self.inner.decode(&mut self.buffer)? {
200 trace!("frame decoded from buffer");
201 return Ok(Async::Ready(Some(frame)));
202 }
203
204 self.is_readable = false;
205 }
206
207 assert!(!self.eof);
208
209 self.buffer.reserve(1);
213 if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
214 self.eof = true;
215 }
216
217 self.is_readable = true;
218 }
219 }
220}