async_sink/ext/
send_all.rs1use super::Sink;
2use core::{
3 fmt,
4 future::Future,
5 pin::Pin,
6 task::{Context, Poll},
7};
8use tokio_stream::Stream;
9
10#[must_use = "futures do nothing unless you `.await` or poll them"]
12pub struct SendAll<'a, Si, Item, St>
13where
14 Si: ?Sized + Sink<Item>,
15 St: Stream<Item = Result<Item, Si::Error>> + ?Sized,
16{
17 sink: &'a mut Si,
18 stream: &'a mut St,
19 buffered: Option<Item>,
20 stream_done: bool,
21}
22
23impl<Si, Item, St> fmt::Debug for SendAll<'_, Si, Item, St>
24where
25 Si: fmt::Debug + ?Sized + Sink<Item>,
26 Item: fmt::Debug,
27 St: fmt::Debug + Stream<Item = Result<Item, Si::Error>> + ?Sized,
28{
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 f.debug_struct("SendAll")
31 .field("sink", &self.sink)
32 .field("stream", &self.stream)
33 .field("buffered", &self.buffered)
34 .field("stream_done", &self.stream_done)
35 .finish()
36 }
37}
38
39impl<'a, Si, Item, St> SendAll<'a, Si, Item, St>
40where
41 Si: Sink<Item> + Unpin + ?Sized,
42 St: Stream<Item = Result<Item, Si::Error>> + Unpin + ?Sized,
43{
44 pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
45 Self {
46 sink,
47 stream,
48 buffered: None,
49 stream_done: false,
50 }
51 }
52
53 fn try_start_send(
54 self: Pin<&mut Self>,
55 cx: &mut Context<'_>,
56 item: Item,
57 ) -> Poll<Result<(), Si::Error>> {
58 let this = unsafe { Pin::get_unchecked_mut(self) };
59 debug_assert!(this.buffered.is_none());
60 match Pin::new(&mut *this.sink).poll_ready(cx) {
61 Poll::Ready(Ok(())) => Poll::Ready(Pin::new(&mut *this.sink).start_send(item)),
62 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
63 Poll::Pending => {
64 this.buffered = Some(item);
65 Poll::Pending
66 }
67 }
68 }
69}
70
71impl<'a, Si, Item, St> Future for SendAll<'a, Si, Item, St>
72where
73 Item: Unpin,
74 Si: Sink<Item> + Unpin + ?Sized,
75 St: Stream<Item = Result<Item, Si::Error>> + Unpin + ?Sized,
76{
77 type Output = Result<(), Si::Error>;
78
79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 if let Some(item) = self.as_mut().buffered.take() {
81 match self.as_mut().try_start_send(cx, item) {
82 Poll::Ready(Ok(())) => {}
83 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
84 Poll::Pending => return Poll::Pending,
85 }
86 }
87
88 loop {
89 let mut this = self.as_mut();
90 if this.stream_done {
91 return Pin::new(&mut this.sink).poll_flush(cx);
92 }
93
94 match <St as Stream>::poll_next(Pin::new(&mut this.stream), cx) {
95 Poll::Ready(Some(Ok(item))) => match this.try_start_send(cx, item) {
96 Poll::Ready(Ok(())) => continue,
97 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
98 Poll::Pending => return Poll::Pending,
99 },
100 Poll::Ready(Some(Err(e))) => {
101 this.stream_done = true;
102 return Poll::Ready(Err(e));
103 }
104 Poll::Ready(None) => {
105 this.stream_done = true;
106 }
107 Poll::Pending => {
108 return match Pin::new(&mut this.sink).poll_flush(cx) {
109 Poll::Ready(Ok(())) => Poll::Pending,
110 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
111 Poll::Pending => Poll::Pending,
112 };
113 }
114 }
115 }
116 }
117}