futures_01_ext/stream_wrappers/
collect_to.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under both the MIT license found in the
5 * LICENSE-MIT file in the root directory of this source tree and the Apache
6 * License, Version 2.0 found in the LICENSE-APACHE file in the root directory
7 * of this source tree.
8 */
9
10use std::mem;
11
12use futures::Async;
13use futures::Future;
14use futures::Poll;
15use futures::Stream;
16
17/// Stream returned as a result of calling [crate::StreamExt::collect_to]
18#[must_use = "futures do nothing unless you `.await` or poll them"]
19pub struct CollectTo<S, C> {
20    stream: S,
21    collection: C,
22}
23
24impl<S: Stream, C> CollectTo<S, C>
25where
26    C: Default + Extend<S::Item>,
27{
28    fn finish(&mut self) -> C {
29        mem::take(&mut self.collection)
30    }
31
32    /// Create a new instance of [CollectTo] wrapping the provided stream
33    pub fn new(stream: S) -> CollectTo<S, C> {
34        CollectTo {
35            stream,
36            collection: Default::default(),
37        }
38    }
39}
40
41impl<S, C> Future for CollectTo<S, C>
42where
43    S: Stream,
44    C: Default + Extend<S::Item>,
45{
46    type Item = C;
47    type Error = S::Error;
48
49    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
50        loop {
51            match self.stream.poll() {
52                Ok(Async::Ready(Some(v))) => self.collection.extend(Some(v)),
53                Ok(Async::Ready(None)) => return Ok(Async::Ready(self.finish())),
54                Ok(Async::NotReady) => return Ok(Async::NotReady),
55                Err(e) => {
56                    self.finish();
57                    return Err(e);
58                }
59            }
60        }
61    }
62}