ax_core/ax_futures_util/stream/
drain.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Future, Stream};
7use pin_project_lite::pin_project;
8
9pin_project! {
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Drain<S> {
12        #[pin]
13        stream: S
14    }
15}
16
17impl<S: Stream> Drain<S> {
18    pub(crate) fn new(stream: S) -> Self {
19        Self { stream }
20    }
21}
22
23impl<S: Stream> Future for Drain<S> {
24    type Output = ();
25
26    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
27        loop {
28            match self.as_mut().project().stream.poll_next(cx) {
29                Poll::Ready(Some(_)) => {}
30                Poll::Ready(None) => return Poll::Ready(()),
31                Poll::Pending => return Poll::Pending,
32            }
33        }
34    }
35}