swansong 0.3.4

Graceful Shutdown
Documentation
use super::Interrupt;
use futures_core::Stream;
use std::{
    future::Future,
    pin::Pin,
    task::{ready, Context, Poll},
};

impl<T: Stream> Stream for Interrupt<T> {
    type Item = T::Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        loop {
            if this.inner.is_stopped_relaxed() {
                log::trace!("stopped, cancelling");
                return Poll::Ready(None);
            }

            let Some(listener) = this.stop_listener.listen(this.inner) else {
                return Poll::Ready(None);
            };

            match this.wrapped_type.as_mut().poll_next(cx) {
                Poll::Ready(item) => {
                    return Poll::Ready(item);
                }

                Poll::Pending => {
                    if this.inner.is_stopped_relaxed() {
                        log::trace!("stopped, cancelling");
                        return Poll::Ready(None);
                    }

                    ready!(Pin::new(listener).poll(cx));
                    **this.stop_listener = None;
                }
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (0, self.wrapped_type.size_hint().1)
    }
}