futures_01_ext/bytes_stream/
bytes_stream_future.rs1use bytes_old::Bytes;
11use futures::try_ready;
12use futures::Async;
13use futures::Future;
14use futures::Poll;
15use futures::Stream;
16use tokio_io::codec::Decoder;
17
18use super::BytesStream;
19
20#[must_use = "futures do nothing unless you `.await` or poll them"]
23pub struct BytesStreamFuture<S, Dec> {
24 inner: Option<BytesStreamFutureInner<S, Dec>>,
25}
26
27impl<S, Dec> BytesStreamFuture<S, Dec>
28where
29 S: Stream<Item = Bytes>,
30 Dec: Decoder,
31 Dec::Error: From<S::Error>,
32{
33 pub(crate) fn new(bs: BytesStream<S>, decoder: Dec) -> Self {
34 let is_readable = !bs.bytes.is_empty() || bs.stream_done;
35
36 BytesStreamFuture {
37 inner: Some(BytesStreamFutureInner {
38 bs,
39 decoder,
40 is_readable,
41 }),
42 }
43 }
44}
45
46impl<S, Dec> Future for BytesStreamFuture<S, Dec>
47where
48 S: Stream<Item = Bytes>,
49 Dec: Decoder,
50 Dec::Error: From<S::Error>,
51{
52 type Item = (Option<Dec::Item>, BytesStream<S>);
53 type Error = (Dec::Error, BytesStream<S>);
54
55 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
56 let mut inner = self
57 .inner
58 .take()
59 .expect("calling poll after future completed");
60 match inner.poll() {
61 Ok(Async::NotReady) => {
62 self.inner = Some(inner);
63 Ok(Async::NotReady)
64 }
65 Ok(Async::Ready(frame)) => Ok(Async::Ready((frame, inner.bs))),
66 Err(frame) => Err((frame, inner.bs)),
67 }
68 }
69}
70
71struct BytesStreamFutureInner<S, Dec> {
72 bs: BytesStream<S>,
73 decoder: Dec,
74 is_readable: bool,
75}
76
77impl<S, Dec> BytesStreamFutureInner<S, Dec>
78where
79 S: Stream<Item = Bytes>,
80 Dec: Decoder,
81 Dec::Error: From<S::Error>,
82{
83 fn poll(&mut self) -> Poll<Option<Dec::Item>, Dec::Error> {
84 loop {
85 if self.is_readable {
86 if self.bs.stream_done {
87 return Ok(Async::Ready(self.decoder.decode_eof(&mut self.bs.bytes)?));
88 }
89
90 if let Some(frame) = self.decoder.decode(&mut self.bs.bytes)? {
91 return Ok(Async::Ready(Some(frame)));
92 }
93
94 self.is_readable = false;
95 }
96
97 try_ready!(self.bs.poll_buffer());
98 self.is_readable = true;
99 }
100 }
101}