futures_util/sink/
buffer.rs1use std::collections::VecDeque;
2
3use futures_core::{Poll, Async, Stream};
4use futures_core::task;
5use futures_sink::Sink;
6
7#[derive(Debug)]
10#[must_use = "sinks do nothing unless polled"]
11pub struct Buffer<S: Sink> {
12 sink: S,
13 buf: VecDeque<S::SinkItem>,
14
15 cap: usize,
17}
18
19pub fn new<S: Sink>(sink: S, amt: usize) -> Buffer<S> {
20 Buffer {
21 sink: sink,
22 buf: VecDeque::with_capacity(amt),
23 cap: amt,
24 }
25}
26
27impl<S: Sink> Buffer<S> {
28 pub fn get_ref(&self) -> &S {
30 &self.sink
31 }
32
33 pub fn get_mut(&mut self) -> &mut S {
35 &mut self.sink
36 }
37
38 pub fn into_inner(self) -> S {
43 self.sink
44 }
45
46 fn try_empty_buffer(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
47 try_ready!(self.sink.poll_ready(cx));
48 while let Some(item) = self.buf.pop_front() {
49 self.sink.start_send(item)?;
50 if self.buf.len() != 0 {
51 try_ready!(self.sink.poll_ready(cx));
52 }
53 }
54 Ok(Async::Ready(()))
55 }
56}
57
58impl<S> Stream for Buffer<S> where S: Sink + Stream {
60 type Item = S::Item;
61 type Error = S::Error;
62
63 fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<S::Item>, S::Error> {
64 self.sink.poll_next(cx)
65 }
66}
67
68impl<S: Sink> Sink for Buffer<S> {
69 type SinkItem = S::SinkItem;
70 type SinkError = S::SinkError;
71
72 fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
73 if self.cap == 0 {
74 return self.sink.poll_ready(cx);
75 }
76
77 self.try_empty_buffer(cx)?;
78
79 Ok(if self.buf.len() >= self.cap {
80 Async::Pending
81 } else {
82 Async::Ready(())
83 })
84 }
85
86 fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
87 if self.cap == 0 {
88 self.sink.start_send(item)
89 } else {
90 self.buf.push_back(item);
91 Ok(())
92 }
93 }
94
95 fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
96 try_ready!(self.try_empty_buffer(cx));
97 debug_assert!(self.buf.is_empty());
98 self.sink.poll_flush(cx)
99 }
100
101 fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
102 try_ready!(self.try_empty_buffer(cx));
103 debug_assert!(self.buf.is_empty());
104 self.sink.poll_close(cx)
105 }
106}