tower_web/util/buf_stream/buf_stream.rs
1use super::{Chain, Collect, FromBufStream, SizeHint};
2
3use bytes::Buf;
4use futures::{Async, Poll};
5
6/// An asynchronous stream of bytes.
7///
8/// `BufStream` asynchronously yields values implementing `Buf`, i.e. byte
9/// buffers.
10pub trait BufStream {
11 /// Values yielded by the `BufStream`.
12 type Item: Buf;
13
14 /// The error type this `BufStream` might generate.
15 type Error;
16
17 /// Attempt to pull out the next buffer of this stream, registering the
18 /// current task for wakeup if the value is not yet available, and returning
19 /// `None` if the stream is exhausted.
20 ///
21 /// # Return value
22 ///
23 /// There are several possible return values, each indicating a distinct
24 /// stream state:
25 ///
26 /// - `Ok(Async::NotReady)` means that this stream's next value is not ready
27 /// yet. Implementations will ensure that the current task will be notified
28 /// when the next value may be ready.
29 ///
30 /// - `Ok(Async::Ready(Some(buf)))` means that the stream has successfully
31 /// produced a value, `buf`, and may produce further values on subsequent
32 /// `poll` calls.
33 ///
34 /// - `Ok(Async::Ready(None))` means that the stream has terminated, and
35 /// `poll` should not be invoked again.
36 ///
37 /// # Panics
38 ///
39 /// Once a stream is finished, i.e. `Ready(None)` has been returned, further
40 /// calls to `poll` may result in a panic or other "bad behavior".
41 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;
42
43 /// Returns the bounds on the remaining length of the iterator.
44 ///
45 /// # Implementation notes
46 ///
47 /// It is not enforced that a `BufStreaam` yields the declared amount of
48 /// data. A buggy implementation may yield less than the lower bound or more
49 /// than the upper bound.
50 ///
51 /// `size_hint()` is primarily intended to be used for optimizations such as
52 /// reserving space for the data, but must not be trusted to e.g. omit
53 /// bounds checks in unsafe code. An incorrect implemeentation of
54 /// `size_hint()` should not lead to memory safety violations.
55 ///
56 /// That said, the implementation should provide a correct estimation,
57 /// because otherwise it would be a violation of the trait's protocol.
58 fn size_hint(&self) -> SizeHint {
59 SizeHint::default()
60 }
61
62 /// Takes two buf streams and creates a new buf stream over both in
63 /// sequence.
64 ///
65 /// `chain()` will return a new `BufStream` value which will first yield all
66 /// data from `self` then all data from `other`.
67 ///
68 /// In other words, it links two buf streams together, in a chain.
69 fn chain<T>(self, other: T) -> Chain<Self, T>
70 where
71 Self: Sized,
72 T: BufStream<Error = Self::Error>,
73 {
74 Chain::new(self, other)
75 }
76
77 /// Consumes all data from `self`, storing it in byte storage of type `T`.
78 fn collect<T>(self) -> Collect<Self, T>
79 where
80 Self: Sized,
81 T: FromBufStream,
82 {
83 Collect::new(self)
84 }
85}
86
87impl<B> BufStream for Option<B>
88where
89 B: BufStream,
90{
91 type Item = B::Item;
92 type Error = B::Error;
93
94 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
95 match self {
96 Some(b) => b.poll(),
97 None => Ok(Async::Ready(None)),
98 }
99 }
100
101 fn size_hint(&self) -> SizeHint {
102 match self {
103 Some(b) => b.size_hint(),
104 None => SizeHint::default(),
105 }
106 }
107}