interruptible/
lib.rs

1#![cfg_attr(coverage_nightly, feature(coverage_attribute))]
2
3//! Stops a future producer or stream from producing values when interrupted.
4//!
5//! For a future that returns either `Result<T, ()>` or `ControlFlow<T, ()>`,
6//! calling `fut.interruptible_*(tx)` causes the returned value to be `Err(())`
7//! or `Break(T)` if an interruption signal is received *while* that future is
8//! executing.
9//!
10//! This means the future is progressed to completion, but the return value
11//! signals the producer to stop yielding futures.
12//!
13//! For a stream, when the interrupt signal is received, the current future is
14//! run to completion, but the stream is not polled for the next item.
15//!
16//! # Usage
17//!
18//! Add the following to `Cargo.toml`
19//!
20//! ```toml
21//! interruptible = "0.2.4"
22//!
23//! # Enables `InterruptibleStreamExt`
24//! interruptible = { version = "0.2.4", features = ["stream"] }
25//!
26//! # Enables:
27//! #
28//! # * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
29//! # * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
30//! interruptible = { version = "0.2.4", features = ["ctrl_c"] }
31//! ```
32//!
33//! # Examples
34//!
35//! ## `Future<Output = ControlFlow<B, C>>`
36//!
37//! ```rust
38//! use std::ops::ControlFlow;
39//!
40//! use futures::FutureExt;
41//! use tokio::{
42//!     join,
43//!     sync::{mpsc, oneshot},
44//! };
45//!
46//! use interruptible::{InterruptSignal, InterruptibleFutureExt};
47//!
48//! #[tokio::main(flavor = "current_thread")]
49//! async fn main() {
50//!     let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
51//!     let (ready_tx, ready_rx) = oneshot::channel::<()>();
52//!
53//!     let interruptible_control = async {
54//!         let () = ready_rx.await.expect("Expected to be notified to start.");
55//!         ControlFlow::Continue(())
56//!     }
57//!     .boxed()
58//!     .interruptible_control(&mut interrupt_rx);
59//!
60//!     let interrupter = async move {
61//!         interrupt_tx
62//!             .send(InterruptSignal)
63//!             .await
64//!             .expect("Expected to send `InterruptSignal`.");
65//!         ready_tx
66//!             .send(())
67//!             .expect("Expected to notify sleep to start.");
68//!     };
69//!
70//!     let (control_flow, ()) = join!(interruptible_control, interrupter);
71//!
72//!     assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
73//! }
74//! ```
75//!
76//! ## `InterruptibleStreamExt` with `features = ["stream"]`
77//!
78//! Stops a stream from producing values when an interrupt signal is received.
79//!
80//! See the [`interrupt_strategy`] module for different ways the stream
81//! interruption can be handled.
82//!
83//! ```rust
84//! # #[cfg(not(feature = "stream"))]
85//! # fn main() {}
86//! #
87//! #[cfg(feature = "stream")]
88//! #[tokio::main(flavor = "current_thread")]
89//! async fn main() {
90//! #
91//! # use futures::{stream, StreamExt};
92//! # use tokio::sync::mpsc;
93//! #
94//! # use interruptible::{
95//! #     InterruptibleStreamExt, InterruptSignal, Interruptibility, PollOutcome,
96//! # };
97//! #
98//!     let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
99//!
100//!     let mut interruptible_stream =
101//!         stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
102//!             .interruptible(interrupt_rx.into());
103//!
104//!     interrupt_tx
105//!         .send(InterruptSignal)
106//!         .await
107//!         .expect("Expected to send `InterruptSignal`.");
108//!
109//!     assert_eq!(
110//!         Some(PollOutcome::Interrupted(None)),
111//!         interruptible_stream.next().await
112//!     );
113//!     assert_eq!(None, interruptible_stream.next().await);
114//! # }
115//! ```
116//!
117//! [`interrupt_strategy`]: https://docs.rs/interruptible/latest/interrupt_strategy/index.html
118
119// Re-exports
120pub use own::{OwnedOrMutRef, OwnedOrRef};
121
122pub use crate::{
123    interrupt_signal::InterruptSignal, interruptible_future_control::InterruptibleFutureControl,
124    interruptible_future_ext::InterruptibleFutureExt,
125    interruptible_future_result::InterruptibleFutureResult,
126};
127
128mod interrupt_signal;
129mod interruptible_future_control;
130mod interruptible_future_ext;
131mod interruptible_future_result;
132
133#[cfg(feature = "stream")]
134pub use crate::{
135    interrupt_strategy::InterruptStrategy, interruptibility::Interruptibility,
136    interruptibility_state::InterruptibilityState, interruptible_stream::InterruptibleStream,
137    interruptible_stream_ext::InterruptibleStreamExt, poll_outcome::PollOutcome,
138};
139
140#[cfg(feature = "stream")]
141mod interrupt_strategy;
142#[cfg(feature = "stream")]
143mod interruptibility;
144#[cfg(feature = "stream")]
145mod interruptibility_state;
146#[cfg(feature = "stream")]
147mod interruptible_stream;
148#[cfg(feature = "stream")]
149mod interruptible_stream_ext;
150#[cfg(feature = "stream")]
151mod poll_outcome;