futures_util/stream/
chunks.rs1use std::mem;
2use std::prelude::v1::*;
3
4use futures_core::{Async, Poll, Stream};
5use futures_core::task;
6use futures_sink::{Sink};
7
8use stream::Fuse;
9
10#[derive(Debug)]
16#[must_use = "streams do nothing unless polled"]
17pub struct Chunks<S>
18 where S: Stream
19{
20 items: Vec<S::Item>,
21 err: Option<S::Error>,
22 stream: Fuse<S>
23}
24
25pub fn new<S>(s: S, capacity: usize) -> Chunks<S>
26 where S: Stream
27{
28 assert!(capacity > 0);
29
30 Chunks {
31 items: Vec::with_capacity(capacity),
32 err: None,
33 stream: super::fuse::new(s),
34 }
35}
36
37impl<S> Sink for Chunks<S>
39 where S: Sink + Stream
40{
41 type SinkItem = S::SinkItem;
42 type SinkError = S::SinkError;
43
44 delegate_sink!(stream);
45}
46
47
48impl<S> Chunks<S> where S: Stream {
49 fn take(&mut self) -> Vec<S::Item> {
50 let cap = self.items.capacity();
51 mem::replace(&mut self.items, Vec::with_capacity(cap))
52 }
53
54 pub fn get_ref(&self) -> &S {
57 self.stream.get_ref()
58 }
59
60 pub fn get_mut(&mut self) -> &mut S {
66 self.stream.get_mut()
67 }
68
69 pub fn into_inner(self) -> S {
74 self.stream.into_inner()
75 }
76}
77
78impl<S> Stream for Chunks<S>
79 where S: Stream
80{
81 type Item = Vec<<S as Stream>::Item>;
82 type Error = <S as Stream>::Error;
83
84 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
85 if let Some(err) = self.err.take() {
86 return Err(err)
87 }
88
89 let cap = self.items.capacity();
90 loop {
91 match self.stream.poll_next(cx) {
92 Ok(Async::Pending) => return Ok(Async::Pending),
93
94 Ok(Async::Ready(Some(item))) => {
98 self.items.push(item);
99 if self.items.len() >= cap {
100 return Ok(Some(self.take()).into())
101 }
102 }
103
104 Ok(Async::Ready(None)) => {
107 return if self.items.len() > 0 {
108 let full_buf = mem::replace(&mut self.items, Vec::new());
109 Ok(Some(full_buf).into())
110 } else {
111 Ok(Async::Ready(None))
112 }
113 }
114
115 Err(e) => {
118 if self.items.len() == 0 {
119 return Err(e)
120 } else {
121 self.err = Some(e);
122 return Ok(Some(self.take()).into())
123 }
124 }
125 }
126 }
127 }
128}