async_sink/ext/
send_all.rs

1use super::Sink;
2use core::{
3    fmt,
4    future::Future,
5    pin::Pin,
6    task::{Context, Poll},
7};
8use tokio_stream::Stream;
9
10/// Future for the [`send_all`](super::SinkExt::send_all) method.
11#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct SendAll<'a, Si, Item, St>
13where
14    Si: ?Sized + Sink<Item>,
15    St: Stream<Item = Result<Item, Si::Error>> + ?Sized,
16{
17    sink: &'a mut Si,
18    stream: &'a mut St,
19    buffered: Option<Item>,
20    stream_done: bool,
21}
22
23impl<Si, Item, St> fmt::Debug for SendAll<'_, Si, Item, St>
24where
25    Si: fmt::Debug + ?Sized + Sink<Item>,
26    Item: fmt::Debug,
27    St: fmt::Debug + Stream<Item = Result<Item, Si::Error>> + ?Sized,
28{
29    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30        f.debug_struct("SendAll")
31            .field("sink", &self.sink)
32            .field("stream", &self.stream)
33            .field("buffered", &self.buffered)
34            .field("stream_done", &self.stream_done)
35            .finish()
36    }
37}
38
39impl<'a, Si, Item, St> SendAll<'a, Si, Item, St>
40where
41    Si: Sink<Item> + Unpin + ?Sized,
42    St: Stream<Item = Result<Item, Si::Error>> + Unpin + ?Sized,
43{
44    pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
45        Self {
46            sink,
47            stream,
48            buffered: None,
49            stream_done: false,
50        }
51    }
52
53    fn try_start_send(
54        self: Pin<&mut Self>,
55        cx: &mut Context<'_>,
56        item: Item,
57    ) -> Poll<Result<(), Si::Error>> {
58        let this = unsafe { Pin::get_unchecked_mut(self) };
59        debug_assert!(this.buffered.is_none());
60        match Pin::new(&mut *this.sink).poll_ready(cx) {
61            Poll::Ready(Ok(())) => Poll::Ready(Pin::new(&mut *this.sink).start_send(item)),
62            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
63            Poll::Pending => {
64                this.buffered = Some(item);
65                Poll::Pending
66            }
67        }
68    }
69}
70
71impl<'a, Si, Item, St> Future for SendAll<'a, Si, Item, St>
72where
73    Item: Unpin,
74    Si: Sink<Item> + Unpin + ?Sized,
75    St: Stream<Item = Result<Item, Si::Error>> + Unpin + ?Sized,
76{
77    type Output = Result<(), Si::Error>;
78
79    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        if let Some(item) = self.as_mut().buffered.take() {
81            match self.as_mut().try_start_send(cx, item) {
82                Poll::Ready(Ok(())) => {}
83                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
84                Poll::Pending => return Poll::Pending,
85            }
86        }
87
88        loop {
89            let mut this = self.as_mut();
90            if this.stream_done {
91                return Pin::new(&mut this.sink).poll_flush(cx);
92            }
93
94            match <St as Stream>::poll_next(Pin::new(&mut this.stream), cx) {
95                Poll::Ready(Some(Ok(item))) => match this.try_start_send(cx, item) {
96                    Poll::Ready(Ok(())) => continue,
97                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
98                    Poll::Pending => return Poll::Pending,
99                },
100                Poll::Ready(Some(Err(e))) => {
101                    this.stream_done = true;
102                    return Poll::Ready(Err(e));
103                }
104                Poll::Ready(None) => {
105                    this.stream_done = true;
106                }
107                Poll::Pending => {
108                    return match Pin::new(&mut this.sink).poll_flush(cx) {
109                        Poll::Ready(Ok(())) => Poll::Pending,
110                        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
111                        Poll::Pending => Poll::Pending,
112                    };
113                }
114            }
115        }
116    }
117}