1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
//! Stops a future producer or stream from producing values when interrupted.
//!
//! For a future that returns either `Result<T, ()>` or `ControlFlow<T, ()>`,
//! calling `fut.interruptible_*(tx)` causes the returned value to be `Err(())`
//! or `Break(T)` if an interruption signal is received *while* that future is
//! executing.
//!
//! This means the future is progressed to completion, but the return value
//! signals the producer to stop yielding futures.
//!
//! For a stream, when the interrupt signal is received, the current future is
//! run to completion, but the stream is not polled for the next item.
//!
//! # Usage
//!
//! Add the following to `Cargo.toml`
//!
//! ```toml
//! interruptible = "0.0.4"
//!
//! # Enables `InterruptibleStreamExt`
//! interruptible = { version = "0.0.4", features = ["stream"] }
//!
//! # Enables:
//! #
//! # * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
//! # * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
//! interruptible = { version = "0.0.4", features = ["ctrl_c"] }
//! ```
//!
//! # Examples
//!
//! ## `Future<Output = ControlFlow<B, C>>`
//!
//! ```rust
//! use std::ops::ControlFlow;
//!
//! use futures::FutureExt;
//! use tokio::{
//! join,
//! sync::{mpsc, oneshot},
//! };
//!
//! use interruptible::{InterruptSignal, InterruptibleFutureExt};
//!
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//! let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
//! let (ready_tx, ready_rx) = oneshot::channel::<()>();
//!
//! let interruptible_control = async {
//! let () = ready_rx.await.expect("Expected to be notified to start.");
//! ControlFlow::Continue(())
//! }
//! .boxed()
//! .interruptible_control(&mut interrupt_rx);
//!
//! let interrupter = async move {
//! interrupt_tx
//! .send(InterruptSignal)
//! .await
//! .expect("Expected to send `InterruptSignal`.");
//! ready_tx
//! .send(())
//! .expect("Expected to notify sleep to start.");
//! };
//!
//! let (control_flow, ()) = join!(interruptible_control, interrupter);
//!
//! assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
//! }
//! ```
//!
//! ## `InterruptibleStreamExt` with `features = ["stream"]`
//!
//! Stops a stream from producing values when an interrupt signal is received.
//!
//! See the [`interrupt_strategy`] module for different ways the stream
//! interruption can be handled.
//!
//! ```rust
//! # #[cfg(not(feature = "stream"))]
//! # fn main() {}
//! #
//! #[cfg(feature = "stream")]
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//! #
//! # use futures::{stream, StreamExt};
//! # use tokio::sync::mpsc;
//! #
//! # use interruptible::{
//! # InterruptibleStreamExt, InterruptSignal, Interruptibility, PollOutcome,
//! # };
//! #
//! let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
//!
//! let mut interruptible_stream =
//! stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
//! .interruptible(interrupt_rx.into());
//!
//! interrupt_tx
//! .send(InterruptSignal)
//! .await
//! .expect("Expected to send `InterruptSignal`.");
//!
//! assert_eq!(
//! Some(PollOutcome::Interrupted(None)),
//! interruptible_stream.next().await
//! );
//! assert_eq!(None, interruptible_stream.next().await);
//! # }
//! ```
//!
//! [`interrupt_strategy`]: https://docs.rs/interruptible/latest/interrupt_strategy/index.html
pub use crate::{
interrupt_signal::InterruptSignal, interruptible_future_control::InterruptibleFutureControl,
interruptible_future_ext::InterruptibleFutureExt,
interruptible_future_result::InterruptibleFutureResult, owned_or_mut_ref::OwnedOrMutRef,
};
mod interrupt_signal;
mod interruptible_future_control;
mod interruptible_future_ext;
mod interruptible_future_result;
mod owned_or_mut_ref;
#[cfg(feature = "stream")]
pub use crate::{
interrupt_strategy::InterruptStrategy, interruptibility::Interruptibility,
interruptibility_state::InterruptibilityState, interruptible_stream::InterruptibleStream,
interruptible_stream_ext::InterruptibleStreamExt, poll_outcome::PollOutcome,
};
#[cfg(feature = "stream")]
mod interrupt_strategy;
#[cfg(feature = "stream")]
mod interruptibility;
#[cfg(feature = "stream")]
mod interruptibility_state;
#[cfg(feature = "stream")]
mod interruptible_stream;
#[cfg(feature = "stream")]
mod interruptible_stream_ext;
#[cfg(feature = "stream")]
mod poll_outcome;