futures_01_ext/stream_wrappers/
collect_to.rs1use std::mem;
11
12use futures::Async;
13use futures::Future;
14use futures::Poll;
15use futures::Stream;
16
17#[must_use = "futures do nothing unless you `.await` or poll them"]
19pub struct CollectTo<S, C> {
20 stream: S,
21 collection: C,
22}
23
24impl<S: Stream, C> CollectTo<S, C>
25where
26 C: Default + Extend<S::Item>,
27{
28 fn finish(&mut self) -> C {
29 mem::take(&mut self.collection)
30 }
31
32 pub fn new(stream: S) -> CollectTo<S, C> {
34 CollectTo {
35 stream,
36 collection: Default::default(),
37 }
38 }
39}
40
41impl<S, C> Future for CollectTo<S, C>
42where
43 S: Stream,
44 C: Default + Extend<S::Item>,
45{
46 type Item = C;
47 type Error = S::Error;
48
49 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
50 loop {
51 match self.stream.poll() {
52 Ok(Async::Ready(Some(v))) => self.collection.extend(Some(v)),
53 Ok(Async::Ready(None)) => return Ok(Async::Ready(self.finish())),
54 Ok(Async::NotReady) => return Ok(Async::NotReady),
55 Err(e) => {
56 self.finish();
57 return Err(e);
58 }
59 }
60 }
61 }
62}