async_sink/ext/
send_all.rs1use super::Sink;
2use core::{
3 fmt,
4 future::Future,
5 ops::DerefMut,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use tokio_stream_util::TryStream;
10
11#[must_use = "futures do nothing unless you `.await` or poll them"]
13pub struct SendAll<'a, Si, St>
14where
15 Si: ?Sized,
16 St: TryStream + ?Sized,
17{
18 sink: &'a mut Si,
19 stream: &'a mut St,
20 buffered: Option<St::Ok>,
21 stream_done: bool,
22}
23
24impl<Si, St> fmt::Debug for SendAll<'_, Si, St>
25where
26 Si: fmt::Debug + ?Sized,
27 St: TryStream + fmt::Debug + ?Sized,
28 St::Ok: fmt::Debug,
29{
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 f.debug_struct("SendAll")
32 .field("sink", &self.sink)
33 .field("stream", &self.stream)
34 .field("buffered", &self.buffered)
35 .field("stream_done", &self.stream_done)
36 .finish()
37 }
38}
39
40impl<'a, Si, St> SendAll<'a, Si, St>
41where
42 Si: Sink<St::Ok, Error = St::Error> + Unpin + ?Sized,
43 St: TryStream + Unpin + ?Sized,
44{
45 pub(super) fn new(sink: &'a mut Si, stream: &'a mut St) -> Self {
46 Self {
47 sink,
48 stream,
49 buffered: None,
50 stream_done: false,
51 }
52 }
53
54 fn try_start_send(
55 self: Pin<&mut Self>,
56 cx: &mut Context<'_>,
57 item: St::Ok,
58 ) -> Poll<Result<(), St::Error>> {
59 let this = unsafe { Pin::get_unchecked_mut(self) };
60 debug_assert!(this.buffered.is_none());
61 match Pin::new(&mut *this.sink).poll_ready(cx) {
62 Poll::Ready(Ok(())) => Poll::Ready(Pin::new(&mut *this.sink).start_send(item)),
63 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
64 Poll::Pending => {
65 this.buffered = Some(item);
66 Poll::Pending
67 }
68 }
69 }
70}
71
72impl<'a, Si, St> Future for SendAll<'a, Si, St>
73where
74 Si: Sink<St::Ok, Error = St::Error> + Unpin + ?Sized,
75 St: TryStream + Unpin + ?Sized,
76{
77 type Output = Result<(), St::Error>;
78
79 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
80 if let Some(item) = unsafe { self.as_mut().get_unchecked_mut() }.buffered.take() {
81 match self.as_mut().try_start_send(cx, item) {
82 Poll::Ready(Ok(())) => {}
83 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
84 Poll::Pending => return Poll::Pending,
85 }
86 }
87
88 loop {
89 let this = unsafe { self.as_mut().get_unchecked_mut() };
90 if this.stream_done {
91 return Pin::new(&mut *this.sink).poll_flush(cx);
92 }
93
94 match <St as TryStream>::try_poll_next(Pin::new(this.stream.deref_mut()), cx) {
95 Poll::Ready(Some(Ok(item))) => match self.as_mut().try_start_send(cx, item) {
96 Poll::Ready(Ok(())) => continue,
97 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
98 Poll::Pending => return Poll::Pending,
99 },
100 Poll::Ready(Some(Err(e))) => {
101 unsafe { self.as_mut().get_unchecked_mut() }.stream_done = true;
102 return Poll::Ready(Err(e));
103 }
104 Poll::Ready(None) => {
105 unsafe { self.as_mut().get_unchecked_mut() }.stream_done = true;
106 }
107 Poll::Pending => {
108 let this = unsafe { self.as_mut().get_unchecked_mut() };
109 return match Pin::new(&mut *this.sink).poll_flush(cx) {
110 Poll::Ready(Ok(())) => Poll::Pending,
111 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
112 Poll::Pending => Poll::Pending,
113 };
114 }
115 }
116 }
117 }
118}