pub trait InterruptibleStreamExt {
    // Required methods
    fn interruptible(
        self,
        interrupt_rx: OwnedOrMutRef<'_, Receiver<InterruptSignal>>
    ) -> InterruptibleStream<'_, 'static, Self>
       where Self: Sized;
    fn interruptible_with<'rx, 'intx>(
        self,
        interruptibility_state: InterruptibilityState<'rx, 'intx>
    ) -> InterruptibleStream<'rx, 'intx, Self>
       where Self: Sized + 'rx;
}
Expand description

Provides the .interruptible() method for Streams to stop producing values when an interrupt signal is received.

Required Methods§

source

fn interruptible( self, interrupt_rx: OwnedOrMutRef<'_, Receiver<InterruptSignal>> ) -> InterruptibleStream<'_, 'static, Self>
where Self: Sized,

Overrides this Stream’s poll value when an interrupt signal is received.

Parameters
  • interrupt_rx: Channel receiver of the interrupt signal.
source

fn interruptible_with<'rx, 'intx>( self, interruptibility_state: InterruptibilityState<'rx, 'intx> ) -> InterruptibleStream<'rx, 'intx, Self>
where Self: Sized + 'rx,

Wraps a stream to allow it to gracefully stop.

The stream’s items are wrapped with PollOutcome.

Parameters
  • interrupt_rx: Channel receiver of the interrupt signal.
  • interruptibility_state: Whether interruptibility is supported.
Examples

This example uses the PollNextN strategy:

use futures::{stream, StreamExt};
use tokio::sync::mpsc;

use interruptible::{InterruptSignal, Interruptibility, InterruptibleStreamExt, PollOutcome};

let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
let mut interruptible_stream = stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
    .interruptible_with(Interruptibility::poll_next_n(interrupt_rx.into(), 2).into());

interrupt_tx
    .send(InterruptSignal)
    .await
    .expect("Expected to send `InterruptSignal`.");

assert_eq!(
    Some(PollOutcome::NoInterrupt(0)),
    interruptible_stream.next().await
);
assert_eq!(
    Some(PollOutcome::NoInterrupt(1)),
    interruptible_stream.next().await
);
assert_eq!(
    Some(PollOutcome::Interrupted(None)),
    interruptible_stream.next().await
);
assert_eq!(None, interruptible_stream.next().await);

Implementors§

source§

impl<S> InterruptibleStreamExt for S
where S: Stream,