futures_01_ext/bytes_stream/
bytes_stream_future.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10use bytes_old::Bytes;
11use futures::try_ready;
12use futures::Async;
13use futures::Future;
14use futures::Poll;
15use futures::Stream;
16use tokio_io::codec::Decoder;
17
18use super::BytesStream;
19
20/// A future that yields a single decoded item from the Bytes of the provided
21/// BytesStream (if any) and the remaining BytesStream.
22#[must_use = "futures do nothing unless you `.await` or poll them"]
23pub struct BytesStreamFuture<S, Dec> {
24    inner: Option<BytesStreamFutureInner<S, Dec>>,
25}
26
27impl<S, Dec> BytesStreamFuture<S, Dec>
28where
29    S: Stream<Item = Bytes>,
30    Dec: Decoder,
31    Dec::Error: From<S::Error>,
32{
33    pub(crate) fn new(bs: BytesStream<S>, decoder: Dec) -> Self {
34        let is_readable = !bs.bytes.is_empty() || bs.stream_done;
35
36        BytesStreamFuture {
37            inner: Some(BytesStreamFutureInner {
38                bs,
39                decoder,
40                is_readable,
41            }),
42        }
43    }
44}
45
46impl<S, Dec> Future for BytesStreamFuture<S, Dec>
47where
48    S: Stream<Item = Bytes>,
49    Dec: Decoder,
50    Dec::Error: From<S::Error>,
51{
52    type Item = (Option<Dec::Item>, BytesStream<S>);
53    type Error = (Dec::Error, BytesStream<S>);
54
55    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
56        let mut inner = self
57            .inner
58            .take()
59            .expect("calling poll after future completed");
60        match inner.poll() {
61            Ok(Async::NotReady) => {
62                self.inner = Some(inner);
63                Ok(Async::NotReady)
64            }
65            Ok(Async::Ready(frame)) => Ok(Async::Ready((frame, inner.bs))),
66            Err(frame) => Err((frame, inner.bs)),
67        }
68    }
69}
70
71struct BytesStreamFutureInner<S, Dec> {
72    bs: BytesStream<S>,
73    decoder: Dec,
74    is_readable: bool,
75}
76
77impl<S, Dec> BytesStreamFutureInner<S, Dec>
78where
79    S: Stream<Item = Bytes>,
80    Dec: Decoder,
81    Dec::Error: From<S::Error>,
82{
83    fn poll(&mut self) -> Poll<Option<Dec::Item>, Dec::Error> {
84        loop {
85            if self.is_readable {
86                if self.bs.stream_done {
87                    return Ok(Async::Ready(self.decoder.decode_eof(&mut self.bs.bytes)?));
88                }
89
90                if let Some(frame) = self.decoder.decode(&mut self.bs.bytes)? {
91                    return Ok(Async::Ready(Some(frame)));
92                }
93
94                self.is_readable = false;
95            }
96
97            try_ready!(self.bs.poll_buffer());
98            self.is_readable = true;
99        }
100    }
101}