swansong/implementation/interrupt/
stream.rs1use super::Interrupt;
2use futures_core::Stream;
3use std::{
4 future::Future,
5 pin::Pin,
6 task::{ready, Context, Poll},
7};
8
9impl<T: Stream> Stream for Interrupt<T> {
10 type Item = T::Item;
11
12 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
13 let mut this = self.project();
14 loop {
15 if this.inner.is_stopped_relaxed() {
16 log::trace!("stopped, cancelling");
17 return Poll::Ready(None);
18 }
19
20 let Some(listener) = this.stop_listener.listen(this.inner) else {
21 return Poll::Ready(None);
22 };
23
24 match this.wrapped_type.as_mut().poll_next(cx) {
25 Poll::Ready(item) => {
26 return Poll::Ready(item);
27 }
28
29 Poll::Pending => {
30 if this.inner.is_stopped_relaxed() {
31 log::trace!("stopped, cancelling");
32 return Poll::Ready(None);
33 }
34
35 ready!(Pin::new(listener).poll(cx));
36 **this.stop_listener = None;
37 }
38 }
39 }
40 }
41
42 fn size_hint(&self) -> (usize, Option<usize>) {
43 (0, self.wrapped_type.size_hint().1)
44 }
45}