async_sink/ext/
send_all.rs

1use super::Sink;
2use core::{
3    fmt,
4    future::Future,
5    ops::DerefMut,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use tokio_stream_util::TryStream;
10
11/// Future for the [`send_all`](super::SinkExt::send_all) method.
12#[must_use = "futures do nothing unless you `.await` or poll them"]
13pub struct SendAll<'a, Si, St>
14where
15    Si: ?Sized,
16    St: TryStream + ?Sized,
17{
18    sink: &'a mut Si,
19    stream: &'a mut St,
20    buffered: Option<St::Ok>,
21    stream_done: bool,
22}
23
24impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
25where
26    Si: fmt::Debug + ?Sized,
27    St: TryStream + fmt::Debug + ?Sized,
28    St::Ok: fmt::Debug,
29{
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        f.debug_struct("SendAll")
32            .field("sink", &self.sink)
33            .field("stream", &self.stream)
34            .field("buffered", &self.buffered)
35            .field("stream_done", &self.stream_done)
36            .finish()
37    }
38}
39
40impl<'a, Si, St> SendAll<'a, Si, St>
41where
42    Si: Sink<St::Ok, Error = St::Error> + Unpin + ?Sized,
43    St: TryStream + Unpin + ?Sized,
44{
45    pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
46        Self {
47            sink,
48            stream,
49            buffered: None,
50            stream_done: false,
51        }
52    }
53
54    fn try_start_send(
55        self: Pin<&mut Self>,
56        cx: &mut Context<'_>,
57        item: St::Ok,
58    ) -> Poll<Result<(), St::Error>> {
59        let this = unsafe { Pin::get_unchecked_mut(self) };
60        debug_assert!(this.buffered.is_none());
61        match Pin::new(&mut *this.sink).poll_ready(cx) {
62            Poll::Ready(Ok(())) => Poll::Ready(Pin::new(&mut *this.sink).start_send(item)),
63            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
64            Poll::Pending => {
65                this.buffered = Some(item);
66                Poll::Pending
67            }
68        }
69    }
70}
71
72impl<'a, Si, St> Future for SendAll<'a, Si, St>
73where
74    Si: Sink<St::Ok, Error = St::Error> + Unpin + ?Sized,
75    St: TryStream + Unpin + ?Sized,
76{
77    type Output = Result<(), St::Error>;
78
79    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80        if let Some(item) = unsafe { self.as_mut().get_unchecked_mut() }.buffered.take() {
81            match self.as_mut().try_start_send(cx, item) {
82                Poll::Ready(Ok(())) => {}
83                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
84                Poll::Pending => return Poll::Pending,
85            }
86        }
87
88        loop {
89            let this = unsafe { self.as_mut().get_unchecked_mut() };
90            if this.stream_done {
91                return Pin::new(&mut *this.sink).poll_flush(cx);
92            }
93
94            match <St as TryStream>::try_poll_next(Pin::new(this.stream.deref_mut()), cx) {
95                Poll::Ready(Some(Ok(item))) => match self.as_mut().try_start_send(cx, item) {
96                    Poll::Ready(Ok(())) => continue,
97                    Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
98                    Poll::Pending => return Poll::Pending,
99                },
100                Poll::Ready(Some(Err(e))) => {
101                    unsafe { self.as_mut().get_unchecked_mut() }.stream_done = true;
102                    return Poll::Ready(Err(e));
103                }
104                Poll::Ready(None) => {
105                    unsafe { self.as_mut().get_unchecked_mut() }.stream_done = true;
106                }
107                Poll::Pending => {
108                    let this = unsafe { self.as_mut().get_unchecked_mut() };
109                    return match Pin::new(&mut *this.sink).poll_flush(cx) {
110                        Poll::Ready(Ok(())) => Poll::Pending,
111                        Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
112                        Poll::Pending => Poll::Pending,
113                    };
114                }
115            }
116        }
117    }
118}