async_sink/ext/
send.rs

1use super::Feed;
2use super::Sink;
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6
7/// Future for the [`send`](super::SinkExt::send) method.
8#[derive(Debug)]
9#[must_use = "futures do nothing unless you `.await` or poll them"]
10pub struct Send<'a, Si: ?Sized, Item> {
11    feed: Feed<'a, Si, Item>,
12}
13
14// `Send` is `Unpin` because it only contains a `Feed` which is `Unpin`.
15impl<Si: ?Sized, Item> Unpin for Send<'_, Si, Item> {}
16
17impl<'a, Si: Sink<Item> + Unpin + ?Sized, Item> Send<'a, Si, Item> {
18    pub(super) fn new(sink: &'a mut Si, item: Item) -> Self {
19        Self {
20            feed: Feed::new(sink, item),
21        }
22    }
23}
24
25impl<Si: Sink<Item> + Unpin + ?Sized, Item> Future for Send<'_, Si, Item> {
26    type Output = Result<(), Si::Error>;
27
28    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
29        let mut this = self.as_mut();
30
31        if this.feed.is_item_pending() {
32            match Pin::new(&mut this.feed).poll(cx) {
33                Poll::Ready(Ok(())) => {
34                    debug_assert!(!this.feed.is_item_pending());
35                    // Fall through to flushing.
36                }
37                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
38                Poll::Pending => return Poll::Pending,
39            }
40        }
41
42        // We're done sending the item, but want to block on flushing the sink.
43        this.feed.sink_pin_mut().poll_flush(cx)
44    }
45}