pub trait InterruptibleStreamExt {
// Required methods
fn interruptible(
self,
interrupt_rx: OwnedOrMutRef<'_, Receiver<InterruptSignal>>
) -> InterruptibleStream<'_, 'static, Self>
where Self: Sized;
fn interruptible_with<'rx, 'intx>(
self,
interruptibility_state: InterruptibilityState<'rx, 'intx>
) -> InterruptibleStream<'rx, 'intx, Self>
where Self: Sized + 'rx;
}
Expand description
Provides the .interruptible()
method for Stream
s to stop producing
values when an interrupt signal is received.
Required Methods§
sourcefn interruptible(
self,
interrupt_rx: OwnedOrMutRef<'_, Receiver<InterruptSignal>>
) -> InterruptibleStream<'_, 'static, Self>where
Self: Sized,
fn interruptible(
self,
interrupt_rx: OwnedOrMutRef<'_, Receiver<InterruptSignal>>
) -> InterruptibleStream<'_, 'static, Self>where
Self: Sized,
Overrides this Stream
’s poll value when an interrupt signal is
received.
Parameters
interrupt_rx
: Channel receiver of the interrupt signal.
sourcefn interruptible_with<'rx, 'intx>(
self,
interruptibility_state: InterruptibilityState<'rx, 'intx>
) -> InterruptibleStream<'rx, 'intx, Self>where
Self: Sized + 'rx,
fn interruptible_with<'rx, 'intx>(
self,
interruptibility_state: InterruptibilityState<'rx, 'intx>
) -> InterruptibleStream<'rx, 'intx, Self>where
Self: Sized + 'rx,
Wraps a stream to allow it to gracefully stop.
The stream’s items are wrapped with PollOutcome
.
Parameters
interrupt_rx
: Channel receiver of the interrupt signal.interruptibility_state
: Whether interruptibility is supported.
Examples
This example uses the PollNextN
strategy:
use futures::{stream, StreamExt};
use tokio::sync::mpsc;
use interruptible::{InterruptSignal, Interruptibility, InterruptibleStreamExt, PollOutcome};
let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
let mut interruptible_stream = stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
.interruptible_with(Interruptibility::poll_next_n(interrupt_rx.into(), 2).into());
interrupt_tx
.send(InterruptSignal)
.await
.expect("Expected to send `InterruptSignal`.");
assert_eq!(
Some(PollOutcome::NoInterrupt(0)),
interruptible_stream.next().await
);
assert_eq!(
Some(PollOutcome::NoInterrupt(1)),
interruptible_stream.next().await
);
assert_eq!(
Some(PollOutcome::Interrupted(None)),
interruptible_stream.next().await
);
assert_eq!(None, interruptible_stream.next().await);