futures_util/stream/
chunks.rs

1use std::mem;
2use std::prelude::v1::*;
3
4use futures_core::{Async, Poll, Stream};
5use futures_core::task;
6use futures_sink::{Sink};
7
8use stream::Fuse;
9
10/// An adaptor that chunks up elements in a vector.
11///
12/// This adaptor will buffer up a list of items in the stream and pass on the
13/// vector used for buffering when a specified capacity has been reached. This
14/// is created by the `Stream::chunks` method.
15#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct Chunks<S>
18    where S: Stream
19{
20    items: Vec<S::Item>,
21    err: Option<S::Error>,
22    stream: Fuse<S>
23}
24
25pub fn new<S>(s: S, capacity: usize) -> Chunks<S>
26    where S: Stream
27{
28    assert!(capacity > 0);
29
30    Chunks {
31        items: Vec::with_capacity(capacity),
32        err: None,
33        stream: super::fuse::new(s),
34    }
35}
36
37// Forwarding impl of Sink from the underlying stream
38impl<S> Sink for Chunks<S>
39    where S: Sink + Stream
40{
41    type SinkItem = S::SinkItem;
42    type SinkError = S::SinkError;
43
44    delegate_sink!(stream);
45}
46
47
48impl<S> Chunks<S> where S: Stream {
49    fn take(&mut self) -> Vec<S::Item> {
50        let cap = self.items.capacity();
51        mem::replace(&mut self.items, Vec::with_capacity(cap))
52    }
53
54    /// Acquires a reference to the underlying stream that this combinator is
55    /// pulling from.
56    pub fn get_ref(&self) -> &S {
57        self.stream.get_ref()
58    }
59
60    /// Acquires a mutable reference to the underlying stream that this
61    /// combinator is pulling from.
62    ///
63    /// Note that care must be taken to avoid tampering with the state of the
64    /// stream which may otherwise confuse this combinator.
65    pub fn get_mut(&mut self) -> &mut S {
66        self.stream.get_mut()
67    }
68
69    /// Consumes this combinator, returning the underlying stream.
70    ///
71    /// Note that this may discard intermediate state of this combinator, so
72    /// care should be taken to avoid losing resources when this is called.
73    pub fn into_inner(self) -> S {
74        self.stream.into_inner()
75    }
76}
77
78impl<S> Stream for Chunks<S>
79    where S: Stream
80{
81    type Item = Vec<<S as Stream>::Item>;
82    type Error = <S as Stream>::Error;
83
84    fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
85        if let Some(err) = self.err.take() {
86            return Err(err)
87        }
88
89        let cap = self.items.capacity();
90        loop {
91            match self.stream.poll_next(cx) {
92                Ok(Async::Pending) => return Ok(Async::Pending),
93
94                // Push the item into the buffer and check whether it is full.
95                // If so, replace our buffer with a new and empty one and return
96                // the full one.
97                Ok(Async::Ready(Some(item))) => {
98                    self.items.push(item);
99                    if self.items.len() >= cap {
100                        return Ok(Some(self.take()).into())
101                    }
102                }
103
104                // Since the underlying stream ran out of values, return what we
105                // have buffered, if we have anything.
106                Ok(Async::Ready(None)) => {
107                    return if self.items.len() > 0 {
108                        let full_buf = mem::replace(&mut self.items, Vec::new());
109                        Ok(Some(full_buf).into())
110                    } else {
111                        Ok(Async::Ready(None))
112                    }
113                }
114
115                // If we've got buffered items be sure to return them first,
116                // we'll defer our error for later.
117                Err(e) => {
118                    if self.items.len() == 0 {
119                        return Err(e)
120                    } else {
121                        self.err = Some(e);
122                        return Ok(Some(self.take()).into())
123                    }
124                }
125            }
126        }
127    }
128}