Skip to main content

swansong/implementation/interrupt/
stream.rs

1use super::Interrupt;
2use futures_core::Stream;
3use std::{
4    future::Future,
5    pin::Pin,
6    task::{ready, Context, Poll},
7};
8
9impl<T: Stream> Stream for Interrupt<T> {
10    type Item = T::Item;
11
12    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
13        let mut this = self.project();
14        loop {
15            if this.inner.is_stopped_relaxed() {
16                log::trace!("stopped, cancelling");
17                return Poll::Ready(None);
18            }
19
20            let Some(listener) = this.stop_listener.listen(this.inner) else {
21                return Poll::Ready(None);
22            };
23
24            match this.wrapped_type.as_mut().poll_next(cx) {
25                Poll::Ready(item) => {
26                    return Poll::Ready(item);
27                }
28
29                Poll::Pending => {
30                    if this.inner.is_stopped_relaxed() {
31                        log::trace!("stopped, cancelling");
32                        return Poll::Ready(None);
33                    }
34
35                    ready!(Pin::new(listener).poll(cx));
36                    **this.stop_listener = None;
37                }
38            }
39        }
40    }
41
42    fn size_hint(&self) -> (usize, Option<usize>) {
43        (0, self.wrapped_type.size_hint().1)
44    }
45}