futures_util/sink/
send_all.rs

1use futures_core::{Poll, Async, Future, Stream};
2use futures_core::task;
3use futures_sink::{Sink};
4
5use stream::{StreamExt, Fuse};
6
7/// Future for the `Sink::send_all` combinator, which sends a stream of values
8/// to a sink and then waits until the sink has fully flushed those values.
9#[derive(Debug)]
10#[must_use = "futures do nothing unless polled"]
11pub struct SendAll<T, U: Stream> {
12    sink: Option<T>,
13    stream: Option<Fuse<U>>,
14    buffered: Option<U::Item>,
15}
16
17pub fn new<T, U>(sink: T, stream: U) -> SendAll<T, U>
18    where T: Sink,
19          U: Stream<Item = T::SinkItem>,
20          T::SinkError: From<U::Error>,
21{
22    SendAll {
23        sink: Some(sink),
24        stream: Some(stream.fuse()),
25        buffered: None,
26    }
27}
28
29impl<T, U> SendAll<T, U>
30    where T: Sink,
31          U: Stream<Item = T::SinkItem>,
32          T::SinkError: From<U::Error>,
33{
34    fn sink_mut(&mut self) -> &mut T {
35        self.sink.as_mut().take().expect("Attempted to poll SendAll after completion")
36    }
37
38    fn stream_mut(&mut self) -> &mut Fuse<U> {
39        self.stream.as_mut().take()
40            .expect("Attempted to poll SendAll after completion")
41    }
42
43    fn take_result(&mut self) -> (T, U) {
44        let sink = self.sink.take()
45            .expect("Attempted to poll Forward after completion");
46        let fuse = self.stream.take()
47            .expect("Attempted to poll Forward after completion");
48        (sink, fuse.into_inner())
49    }
50
51    fn try_start_send(&mut self, cx: &mut task::Context, item: U::Item) -> Poll<(), T::SinkError> {
52        debug_assert!(self.buffered.is_none());
53        match self.sink_mut().poll_ready(cx)? {
54            Async::Ready(()) => {
55                self.sink_mut().start_send(item)?;
56                Ok(Async::Ready(()))
57            }
58            Async::Pending => {
59                self.buffered = Some(item);
60                Ok(Async::Pending)
61            }
62        }
63    }
64}
65
66impl<T, U> Future for SendAll<T, U>
67    where T: Sink,
68          U: Stream<Item = T::SinkItem>,
69          T::SinkError: From<U::Error>,
70{
71    type Item = (T, U);
72    type Error = T::SinkError;
73
74    fn poll(&mut self, cx: &mut task::Context) -> Poll<(T, U), T::SinkError> {
75        // If we've got an item buffered already, we need to write it to the
76        // sink before we can do anything else
77        if let Some(item) = self.buffered.take() {
78            try_ready!(self.try_start_send(cx, item))
79        }
80
81        loop {
82            match self.stream_mut().poll_next(cx)? {
83                Async::Ready(Some(item)) => try_ready!(self.try_start_send(cx, item)),
84                Async::Ready(None) => {
85                    try_ready!(self.sink_mut().poll_flush(cx));
86                    return Ok(Async::Ready(self.take_result()))
87                }
88                Async::Pending => {
89                    try_ready!(self.sink_mut().poll_flush(cx));
90                    return Ok(Async::Pending)
91                }
92            }
93        }
94    }
95}