tower_web/util/buf_stream/
buf_stream.rs

1use super::{Chain, Collect, FromBufStream, SizeHint};
2
3use bytes::Buf;
4use futures::{Async, Poll};
5
6/// An asynchronous stream of bytes.
7///
8/// `BufStream` asynchronously yields values implementing `Buf`, i.e. byte
9/// buffers.
10pub trait BufStream {
11    /// Values yielded by the `BufStream`.
12    type Item: Buf;
13
14    /// The error type this `BufStream` might generate.
15    type Error;
16
17    /// Attempt to pull out the next buffer of this stream, registering the
18    /// current task for wakeup if the value is not yet available, and returning
19    /// `None` if the stream is exhausted.
20    ///
21    /// # Return value
22    ///
23    /// There are several possible return values, each indicating a distinct
24    /// stream state:
25    ///
26    /// - `Ok(Async::NotReady)` means that this stream's next value is not ready
27    /// yet. Implementations will ensure that the current task will be notified
28    /// when the next value may be ready.
29    ///
30    /// - `Ok(Async::Ready(Some(buf)))` means that the stream has successfully
31    /// produced a value, `buf`, and may produce further values on subsequent
32    /// `poll` calls.
33    ///
34    /// - `Ok(Async::Ready(None))` means that the stream has terminated, and
35    /// `poll` should not be invoked again.
36    ///
37    /// # Panics
38    ///
39    /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
40    /// calls to `poll` may result in a panic or other "bad behavior".
41    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
42
43    /// Returns the bounds on the remaining length of the iterator.
44    ///
45    /// # Implementation notes
46    ///
47    /// It is not enforced that a `BufStreaam` yields the declared amount of
48    /// data. A buggy implementation may yield less than the lower bound or more
49    /// than the upper bound.
50    ///
51    /// `size_hint()` is primarily intended to be used for optimizations such as
52    /// reserving space for the data, but must not be trusted to e.g. omit
53    /// bounds checks in unsafe code. An incorrect implemeentation of
54    /// `size_hint()` should not lead to memory safety violations.
55    ///
56    /// That said, the implementation should provide a correct estimation,
57    /// because otherwise it would be a violation of the trait's protocol.
58    fn size_hint(&self) -> SizeHint {
59        SizeHint::default()
60    }
61
62    /// Takes two buf streams and creates a new buf stream over both in
63    /// sequence.
64    ///
65    /// `chain()` will return a new `BufStream` value which will first yield all
66    /// data from `self` then all data from `other`.
67    ///
68    /// In other words, it links two buf streams together, in a chain.
69    fn chain<T>(self, other: T) -> Chain<Self, T>
70    where
71        Self: Sized,
72        T: BufStream<Error = Self::Error>,
73    {
74        Chain::new(self, other)
75    }
76
77    /// Consumes all data from `self`, storing it in byte storage of type `T`.
78    fn collect<T>(self) -> Collect<Self, T>
79    where
80        Self: Sized,
81        T: FromBufStream,
82    {
83        Collect::new(self)
84    }
85}
86
87impl<B> BufStream for Option<B>
88where
89    B: BufStream,
90{
91    type Item = B::Item;
92    type Error = B::Error;
93
94    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
95        match self {
96            Some(b) => b.poll(),
97            None => Ok(Async::Ready(None)),
98        }
99    }
100
101    fn size_hint(&self) -> SizeHint {
102        match self {
103            Some(b) => b.size_hint(),
104            None => SizeHint::default(),
105        }
106    }
107}