Crate interruptible

Source
Expand description

Stops a future producer or stream from producing values when interrupted.

For a future that returns either Result<T, ()> or ControlFlow<T, ()>, calling fut.interruptible_*(tx) causes the returned value to be Err(()) or Break(T) if an interruption signal is received while that future is executing.

This means the future is progressed to completion, but the return value signals the producer to stop yielding futures.

For a stream, when the interrupt signal is received, the current future is run to completion, but the stream is not polled for the next item.

§Usage

Add the following to Cargo.toml

interruptible = "0.2.4"

# Enables `InterruptibleStreamExt`
interruptible = { version = "0.2.4", features = ["stream"] }

# Enables:
#
# * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
# * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
interruptible = { version = "0.2.4", features = ["ctrl_c"] }

§Examples

§Future<Output = ControlFlow<B, C>>

use std::ops::ControlFlow;

use futures::FutureExt;
use tokio::{
    join,
    sync::{mpsc, oneshot},
};

use interruptible::{InterruptSignal, InterruptibleFutureExt};

#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
    let (ready_tx, ready_rx) = oneshot::channel::<()>();

    let interruptible_control = async {
        let () = ready_rx.await.expect("Expected to be notified to start.");
        ControlFlow::Continue(())
    }
    .boxed()
    .interruptible_control(&mut interrupt_rx);

    let interrupter = async move {
        interrupt_tx
            .send(InterruptSignal)
            .await
            .expect("Expected to send `InterruptSignal`.");
        ready_tx
            .send(())
            .expect("Expected to notify sleep to start.");
    };

    let (control_flow, ()) = join!(interruptible_control, interrupter);

    assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
}

§InterruptibleStreamExt with features = ["stream"]

Stops a stream from producing values when an interrupt signal is received.

See the interrupt_strategy module for different ways the stream interruption can be handled.

#[cfg(feature = "stream")]
#[tokio::main(flavor = "current_thread")]
async fn main() {
    let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);

    let mut interruptible_stream =
        stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
            .interruptible(interrupt_rx.into());

    interrupt_tx
        .send(InterruptSignal)
        .await
        .expect("Expected to send `InterruptSignal`.");

    assert_eq!(
        Some(PollOutcome::Interrupted(None)),
        interruptible_stream.next().await
    );
    assert_eq!(None, interruptible_stream.next().await);

Structs§

InterruptSignal
Signal signifying an interruption.
InterruptibilityState
Whether interruptibility is supported, and number of times interrupt signals have been received.
InterruptibleFutureControl
InterruptibleFutureResult
InterruptibleStream
Wrapper around a Stream that adds interruptible behaviour.

Enums§

InterruptStrategy
How to poll an underlying stream when an interruption is received.
Interruptibility
Specifies interruptibility support of the application.
OwnedOrMutRef
Holds an owned T or a mutable reference.
OwnedOrRef
Holds an owned T or an immutable reference.
PollOutcome
InterruptibleStream outcome that indicates whether an interruption happened.

Traits§

InterruptibleFutureExt
Provides the .interruptible_control() and .interruptible_result() methods for Futures to return ControlFlow::Break or Result::Err when an interrupt signal is received.
InterruptibleStreamExt
Provides the .interruptible() method for Streams to stop producing values when an interrupt signal is received.