1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

//! Stops a future producer or stream from producing values when interrupted.
//!
//! For a future that returns either `Result<T, ()>` or `ControlFlow<T, ()>`,
//! calling `fut.interruptible_*(tx)` causes the returned value to be `Err(())`
//! or `Break(T)` if an interruption signal is received *while* that future is
//! executing.
//!
//! This means the future is progressed to completion, but the return value
//! signals the producer to stop yielding futures.
//!
//! For a stream, when the interrupt signal is received, the current future is
//! run to completion, but the stream is not polled for the next item.
//!
//! # Usage
//!
//! Add the following to `Cargo.toml`
//!
//! ```toml
//! interruptible = "0.0.4"
//!
//! # Enables `InterruptibleStreamExt`
//! interruptible = { version = "0.0.4", features = ["stream"] }
//!
//! # Enables:
//! #
//! # * `InterruptibleFutureExt::{interruptible_control_ctrl_c, interruptible_result_ctrl_c}`
//! # * `InterruptibleStreamExt::interruptible_ctrl_c` if the `"stream"` feature is also enabled.
//! interruptible = { version = "0.0.4", features = ["ctrl_c"] }
//! ```
//!
//! # Examples
//!
//! ## `Future<Output = ControlFlow<B, C>>`
//!
//! ```rust
//! use std::ops::ControlFlow;
//!
//! use futures::FutureExt;
//! use tokio::{
//!     join,
//!     sync::{mpsc, oneshot},
//! };
//!
//! use interruptible::{InterruptSignal, InterruptibleFutureExt};
//!
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//!     let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
//!     let (ready_tx, ready_rx) = oneshot::channel::<()>();
//!
//!     let interruptible_control = async {
//!         let () = ready_rx.await.expect("Expected to be notified to start.");
//!         ControlFlow::Continue(())
//!     }
//!     .boxed()
//!     .interruptible_control(&mut interrupt_rx);
//!
//!     let interrupter = async move {
//!         interrupt_tx
//!             .send(InterruptSignal)
//!             .await
//!             .expect("Expected to send `InterruptSignal`.");
//!         ready_tx
//!             .send(())
//!             .expect("Expected to notify sleep to start.");
//!     };
//!
//!     let (control_flow, ()) = join!(interruptible_control, interrupter);
//!
//!     assert_eq!(ControlFlow::Break(InterruptSignal), control_flow);
//! }
//! ```
//!
//! ## `InterruptibleStreamExt` with `features = ["stream"]`
//!
//! Stops a stream from producing values when an interrupt signal is received.
//!
//! See the [`interrupt_strategy`] module for different ways the stream
//! interruption can be handled.
//!
//! ```rust
//! # #[cfg(not(feature = "stream"))]
//! # fn main() {}
//! #
//! #[cfg(feature = "stream")]
//! #[tokio::main(flavor = "current_thread")]
//! async fn main() {
//! #
//! # use futures::{stream, StreamExt};
//! # use tokio::sync::mpsc;
//! #
//! # use interruptible::{
//! #     InterruptibleStreamExt, InterruptSignal, Interruptibility, PollOutcome,
//! # };
//! #
//!     let (interrupt_tx, mut interrupt_rx) = mpsc::channel::<InterruptSignal>(16);
//!
//!     let mut interruptible_stream =
//!         stream::unfold(0u32, move |n| async move { Some((n, n + 1)) })
//!             .interruptible(interrupt_rx.into());
//!
//!     interrupt_tx
//!         .send(InterruptSignal)
//!         .await
//!         .expect("Expected to send `InterruptSignal`.");
//!
//!     assert_eq!(
//!         Some(PollOutcome::Interrupted(None)),
//!         interruptible_stream.next().await
//!     );
//!     assert_eq!(None, interruptible_stream.next().await);
//! # }
//! ```
//!
//! [`interrupt_strategy`]: https://docs.rs/interruptible/latest/interrupt_strategy/index.html

pub use crate::{
    interrupt_signal::InterruptSignal, interruptible_future_control::InterruptibleFutureControl,
    interruptible_future_ext::InterruptibleFutureExt,
    interruptible_future_result::InterruptibleFutureResult, owned_or_mut_ref::OwnedOrMutRef,
};

mod interrupt_signal;
mod interruptible_future_control;
mod interruptible_future_ext;
mod interruptible_future_result;
mod owned_or_mut_ref;

#[cfg(feature = "stream")]
pub use crate::{
    interrupt_strategy::InterruptStrategy, interruptibility::Interruptibility,
    interruptibility_state::InterruptibilityState, interruptible_stream::InterruptibleStream,
    interruptible_stream_ext::InterruptibleStreamExt, poll_outcome::PollOutcome,
};

#[cfg(feature = "stream")]
mod interrupt_strategy;
#[cfg(feature = "stream")]
mod interruptibility;
#[cfg(feature = "stream")]
mod interruptibility_state;
#[cfg(feature = "stream")]
mod interruptible_stream;
#[cfg(feature = "stream")]
mod interruptible_stream_ext;
#[cfg(feature = "stream")]
mod poll_outcome;