tower_web/util/buf_stream/
collect.rs1use super::{BufStream, FromBufStream};
2
3use futures::{Future, Poll};
4
5#[derive(Debug)]
9pub struct Collect<T, U: FromBufStream> {
10 stream: T,
11 builder: Option<U::Builder>,
12}
13
14impl<T, U> Collect<T, U>
15where
16 T: BufStream,
17 U: FromBufStream,
18{
19 pub(crate) fn new(stream: T) -> Collect<T, U> {
20 let builder = U::builder(&stream.size_hint());
21
22 Collect {
23 stream,
24 builder: Some(builder),
25 }
26 }
27}
28
29impl<T, U> Future for Collect<T, U>
30where
31 T: BufStream,
32 U: FromBufStream,
33{
34 type Item = U;
35 type Error = T::Error;
36
37 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
38 loop {
39 match try_ready!(self.stream.poll()) {
40 Some(mut buf) => {
41 let builder = self.builder.as_mut().expect("cannot poll after done");
42
43 U::extend(builder, &mut buf);
44 }
45 None => {
46 let builder = self.builder.take().expect("cannot poll after done");
47 let value = U::build(builder);
48 return Ok(value.into());
49 }
50 }
51 }
52 }
53}