tower_web/util/buf_stream/
collect.rs

1use super::{BufStream, FromBufStream};
2
3use futures::{Future, Poll};
4
5/// Consumes a buf stream, collecting the data into a single byte container.
6///
7/// `Collect` values are produced by `BufStream::collect`.
8#[derive(Debug)]
9pub struct Collect<T, U: FromBufStream> {
10    stream: T,
11    builder: Option<U::Builder>,
12}
13
14impl<T, U> Collect<T, U>
15where
16    T: BufStream,
17    U: FromBufStream,
18{
19    pub(crate) fn new(stream: T) -> Collect<T, U> {
20        let builder = U::builder(&stream.size_hint());
21
22        Collect {
23            stream,
24            builder: Some(builder),
25        }
26    }
27}
28
29impl<T, U> Future for Collect<T, U>
30where
31    T: BufStream,
32    U: FromBufStream,
33{
34    type Item = U;
35    type Error = T::Error;
36
37    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
38        loop {
39            match try_ready!(self.stream.poll()) {
40                Some(mut buf) => {
41                    let builder = self.builder.as_mut().expect("cannot poll after done");
42
43                    U::extend(builder, &mut buf);
44                }
45                None => {
46                    let builder = self.builder.take().expect("cannot poll after done");
47                    let value = U::build(builder);
48                    return Ok(value.into());
49                }
50            }
51        }
52    }
53}