tokio_io/_tokio_codec/
framed_read.rs1#![allow(deprecated)]
2
3use std::fmt;
4
5use super::framed::Fuse;
6use codec::Decoder;
7use AsyncRead;
8
9use bytes::BytesMut;
10use futures::{Async, Poll, Sink, StartSend, Stream};
11
12pub struct FramedRead<T, D> {
14 inner: FramedRead2<Fuse<T, D>>,
15}
16
17pub struct FramedRead2<T> {
18 inner: T,
19 eof: bool,
20 is_readable: bool,
21 buffer: BytesMut,
22}
23
24const INITIAL_CAPACITY: usize = 8 * 1024;
25
26impl<T, D> FramedRead<T, D>
29where
30 T: AsyncRead,
31 D: Decoder,
32{
33 pub fn new(inner: T, decoder: D) -> FramedRead<T, D> {
35 FramedRead {
36 inner: framed_read2(Fuse(inner, decoder)),
37 }
38 }
39}
40
41impl<T, D> FramedRead<T, D> {
42 pub fn get_ref(&self) -> &T {
49 &self.inner.inner.0
50 }
51
52 pub fn get_mut(&mut self) -> &mut T {
59 &mut self.inner.inner.0
60 }
61
62 pub fn into_inner(self) -> T {
68 self.inner.inner.0
69 }
70
71 pub fn decoder(&self) -> &D {
73 &self.inner.inner.1
74 }
75
76 pub fn decoder_mut(&mut self) -> &mut D {
78 &mut self.inner.inner.1
79 }
80}
81
82impl<T, D> Stream for FramedRead<T, D>
83where
84 T: AsyncRead,
85 D: Decoder,
86{
87 type Item = D::Item;
88 type Error = D::Error;
89
90 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
91 self.inner.poll()
92 }
93}
94
95impl<T, D> Sink for FramedRead<T, D>
96where
97 T: Sink,
98{
99 type SinkItem = T::SinkItem;
100 type SinkError = T::SinkError;
101
102 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
103 self.inner.inner.0.start_send(item)
104 }
105
106 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
107 self.inner.inner.0.poll_complete()
108 }
109
110 fn close(&mut self) -> Poll<(), Self::SinkError> {
111 self.inner.inner.0.close()
112 }
113}
114
115impl<T, D> fmt::Debug for FramedRead<T, D>
116where
117 T: fmt::Debug,
118 D: fmt::Debug,
119{
120 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
121 f.debug_struct("FramedRead")
122 .field("inner", &self.inner.inner.0)
123 .field("decoder", &self.inner.inner.1)
124 .field("eof", &self.inner.eof)
125 .field("is_readable", &self.inner.is_readable)
126 .field("buffer", &self.inner.buffer)
127 .finish()
128 }
129}
130
131pub fn framed_read2<T>(inner: T) -> FramedRead2<T> {
134 FramedRead2 {
135 inner: inner,
136 eof: false,
137 is_readable: false,
138 buffer: BytesMut::with_capacity(INITIAL_CAPACITY),
139 }
140}
141
142pub fn framed_read2_with_buffer<T>(inner: T, mut buf: BytesMut) -> FramedRead2<T> {
143 if buf.capacity() < INITIAL_CAPACITY {
144 let bytes_to_reserve = INITIAL_CAPACITY - buf.capacity();
145 buf.reserve(bytes_to_reserve);
146 }
147 FramedRead2 {
148 inner: inner,
149 eof: false,
150 is_readable: buf.len() > 0,
151 buffer: buf,
152 }
153}
154
155impl<T> FramedRead2<T> {
156 pub fn get_ref(&self) -> &T {
157 &self.inner
158 }
159
160 pub fn into_inner(self) -> T {
161 self.inner
162 }
163
164 pub fn into_parts(self) -> (T, BytesMut) {
165 (self.inner, self.buffer)
166 }
167
168 pub fn get_mut(&mut self) -> &mut T {
169 &mut self.inner
170 }
171}
172
173impl<T> Stream for FramedRead2<T>
174where
175 T: AsyncRead + Decoder,
176{
177 type Item = T::Item;
178 type Error = T::Error;
179
180 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
181 loop {
182 if self.is_readable {
188 if self.eof {
189 let frame = self.inner.decode_eof(&mut self.buffer)?;
190 return Ok(Async::Ready(frame));
191 }
192
193 trace!("attempting to decode a frame");
194
195 if let Some(frame) = self.inner.decode(&mut self.buffer)? {
196 trace!("frame decoded from buffer");
197 return Ok(Async::Ready(Some(frame)));
198 }
199
200 self.is_readable = false;
201 }
202
203 assert!(!self.eof);
204
205 self.buffer.reserve(1);
209 if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) {
210 self.eof = true;
211 }
212
213 self.is_readable = true;
214 }
215 }
216}