cancel_this/lib.rs
1//! This crate provides a user-friendly way to implement cooperative
2//! cancellation in Rust based on a wide range of criteria, including
3//! *triggers*, *timers*, *OS signals* (Ctrl+C), or the *Python
4//! interpreter linked using PyO3*. It also provides liveness monitoring
5//! of "cancellation aware" code.
6//!
7//! **Why not use `async` instead of cooperative cancellation?** In principle,
8//! `async` was designed to solve a different problem, and that's executing IO-bound
9//! tasks in a non-blocking fashion. It is not *really* designed for CPU-bound tasks.
10//! Consequently, using `async` adds a lot of unnecessary overhead to your project
11//! which `cancel_this` does not have (see also the *Performance* section in the project README).
12//!
13//! **Why not use [`stop-token`](https://crates.io/crates/stop-token),
14//! [`CancellationToken`](https://docs.rs/tokio-util/latest/tokio_util/sync/struct.CancellationToken.html)
15//! or other cooperative cancellation crates?** So far, all crates I have seen require you
16//! to pass the cancellation token around and generally do not make it easy to
17//! combine the effects of multiple tokens. In `cancel_this`, the goal was to
18//! make cancellation dead simple: You register however many cancellation triggers
19//! you want, each trigger is valid within a specific scope (and thread), and can be checked
20//! by a macro anywhere in your code.
21//!
22//! ### Current features
23//!
24//! - Scoped cancellation using thread-local "cancellation triggers".
25//! - Out-of-the box support for triggers based on atomics and timers.
26//! - With feature `ctrlc` enabled, support for cancellation using `SIGINT` signals.
27//! - With feature `pyo3` enabled, support for cancellation using `Python::check_signals`.
28//! - With feature `liveness` enabled, you can register a per-thread handler invoked
29//! once the thread becomes unresponsive (i.e. cancellation is not checked periodically
30//! withing the desired interval).
31//! - Practically no overhead in cancellable code when cancellation is not enabled.
32//! - Very small overhead for "atomic-based" cancellation triggers, acceptable overhead for PyO3 cancellation.
33//! - All triggers and guards generate [`log`](https://crates.io/crates/log) messages (`trace` for normal operation,
34//! `warn` for issues where panic can be avoided).
35//!
36//!
37//! ### Simple example
38//!
39//! A simple counter that is eventually cancelled by a one-second timeout:
40//!
41//! ```rust
42//! use std::time::Duration;
43//! use cancel_this::{Cancellable, is_cancelled};
44//!
45//! fn cancellable_counter(count: usize) -> Cancellable<()> {
46//! for _ in 0..count {
47//! is_cancelled!()?;
48//! std::thread::sleep(Duration::from_millis(10));
49//! }
50//! Ok(())
51//! }
52//!
53//! let result: Cancellable<()> = cancel_this::on_timeout(Duration::from_secs(1), || {
54//! cancellable_counter(5)?;
55//! cancellable_counter(10)?;
56//! cancellable_counter(100)?;
57//! Ok(())
58//! });
59//!
60//! assert!(result.is_err());
61//! ```
62//!
63//! ## Complex example
64//!
65//! This example uses most of the features, including error conversion, never-cancel blocks,
66//! and liveness monitoring.
67//!
68//! ```rust
69//! use std::time::Duration;
70//! use cancel_this::{Cancelled, is_cancelled, LivenessGuard};
71//!
72//! enum ComputeError {
73//! Zero,
74//! Cancelled
75//! }
76//!
77//! impl From<Cancelled> for ComputeError {
78//! fn from(value: Cancelled) -> Self {
79//! ComputeError::Cancelled
80//! }
81//! }
82//!
83//!
84//! fn compute(input: u32) -> Result<String, ComputeError> {
85//! if input == 0 {
86//! Err(ComputeError::Zero)
87//! } else {
88//! let mut total: u32 = 0;
89//! for _ in 0..input {
90//! total += input;
91//! is_cancelled!()?;
92//! std::thread::sleep(Duration::from_millis(10));
93//! }
94//! Ok(total.to_string())
95//! }
96//! }
97//!
98//! let guard = LivenessGuard::new(Duration::from_secs(2), |is_alive| {
99//! eprintln!("Thread has not responded in the last two seconds.");
100//! });
101//!
102//! let result: Result<String, ComputeError> = cancel_this::on_timeout(Duration::from_millis(200), || {
103//! let r1 = cancel_this::on_sigint(|| {
104//! // This operation can be canceled using Ctrl+C, but the timeout still applies.
105//! compute(5)
106//! })?;
107//!
108//! assert_eq!(r1.as_str(), "25");
109//! // This will be cancelled. Instead of using `?`, we check
110//! // that the operation actually got cancelled.
111//! let r2 = compute(20);
112//! assert!(matches!(r2, Err(ComputeError::Cancelled)));
113//! // Even though the execution is now cancelled, we can still execute code in
114//! // the "cancel-never" blocks.
115//! let r3 = cancel_this::never(|| compute(10))?;
116//! assert_eq!(r3.as_str(), "100");
117//! compute(10) // This should get immediately canceled.
118//! });
119//!
120//! // The liveness monitoring is active while `guard` is in scope. Once `guard` is dropped here,
121//! // the liveness monitoring is turned off as well.
122//! ```
123//!
124//! ## Multi-threaded example
125//!
126//! Virtually all triggers and guards provided by `cancel_this` only apply to the current
127//! thread. However, since triggers can be safely shared across threads, it is possible to
128//! transfer them from one thread to another. Note that the transferred triggers also inherently
129//! update the liveness guard of the original thread.
130//!
131//! ```rust
132//! # use std::thread::JoinHandle;
133//! # use std::time::Duration;
134//! # use cancel_this::{is_cancelled, Cancellable, LivenessGuard};
135//!
136//! let guard = LivenessGuard::new(Duration::from_millis(10), |is_alive| {
137//! // In this test, the liveness guard should never trigger, even though the original
138//! // thread goes to sleep for a long time, waiting to join with the spawned thread.
139//! assert!(is_alive);
140//! });
141//!
142//! let result: Cancellable<u32> = cancel_this::on_timeout(Duration::from_millis(100), || {
143//! let active = cancel_this::active_triggers();
144//! let t1: JoinHandle<Cancellable<u32>> = std::thread::spawn(|| {
145//! cancel_this::on_trigger(active, || {
146//! let mut result = 0u32;
147//! // This cycle is eventually going to get cancelled
148//! // by the timer which is "transferred" from the spawning thread.
149//! for i in 0..50 {
150//! result += 1;
151//! is_cancelled!()?;
152//! std::thread::sleep(Duration::from_millis(5));
153//! }
154//! Ok(result)
155//! })
156//! });
157//! // Put the spawning thread to sleep until `t1` finishes.
158//! t1.join().unwrap()
159//! });
160//!
161//! assert!(result.is_err());
162//! ```
163//!
164//! Doing the same without transferring cancellation triggers will cause the spawning
165//! thread to be registered as unresponsive and the compute thread to never actually get
166//! cancelled:
167//!
168//! ```rust
169//! # use std::thread::JoinHandle;
170//! # use std::time::Duration;
171//! # use cancel_this::{is_cancelled, Cancellable, LivenessGuard};
172//!
173//! let guard = LivenessGuard::new(Duration::from_millis(10), |is_alive| {
174//! // In this test, the liveness guard should never trigger, even though the original
175//! // thread goes to sleep for a long time, waiting to join with the spawned thread.
176//! assert!(!is_alive);
177//! });
178//!
179//! let result: Cancellable<u32> = cancel_this::on_timeout(Duration::from_millis(100), || {
180//! let t1: JoinHandle<Cancellable<u32>> = std::thread::spawn(|| {
181//! let mut result = 0u32;
182//! // This cycle is never going to get cancelled, because we didn't transfer
183//! // the timeout trigger from the original thread.
184//! for i in 0..50 {
185//! result += 1;
186//! is_cancelled!()?;
187//! std::thread::sleep(Duration::from_millis(5));
188//! }
189//! Ok(result)
190//! });
191//! // Put the spawning thread to sleep until `t1` finishes.
192//! t1.join().unwrap()
193//! });
194//!
195//! assert!(result.is_ok());
196//! ```
197//!
198//! ## Cached triggers example
199//!
200//! If you need the absolute lowest overhead, you might want to sacrifice some of the
201//! ergonomics provided by `cancel_this`. To reduce overhead, you can create a local copy
202//! of the thread-local triggers the same way as in the multithreaded example, and use it
203//! directly with `is_cancelled!`. This significantly reduces the overhead of each
204//! cancellation check.
205//!
206//! ```rust
207//! # use std::time::Duration;
208//! # use cancel_this::{is_cancelled, Cancellable};
209//! let result: Cancellable<u32> = cancel_this::on_timeout(Duration::from_millis(100), || {
210//! let cache = cancel_this::active_triggers();
211//! let mut result = 0u32;
212//! while true {
213//! // The overhead of this `is_cancelled` call is reduced, because triggers
214//! // are cached in a local variable. However, the cache is only valid within
215//! // the scope where it was obtained.
216//! is_cancelled!(cache)?;
217//! result += 1;
218//! }
219//! Ok(result)
220//! });
221//! assert!(result.is_err())
222//! ```
223//!
224
225/// Cancellation error type.
226mod error;
227
228/// Various types of triggers, including corresponding `when_*` helper functions.
229mod triggers;
230
231/// Implements a "liveness guard", which monitors the frequency with which cancellations are
232/// checked, making sure the process
233#[cfg(feature = "liveness")]
234mod liveness;
235#[cfg(feature = "liveness")]
236pub use liveness::*;
237
238#[cfg(not(feature = "liveness"))]
239mod liveness {
240 #[derive(Clone, Default)]
241 pub(crate) struct LivenessInterceptor<R: crate::CancellationTrigger + Clone>(R);
242
243 impl<R: crate::CancellationTrigger + Clone> LivenessInterceptor<R> {
244 pub fn as_inner_mut(&mut self) -> &mut R {
245 &mut self.0
246 }
247
248 pub fn as_inner(&self) -> &R {
249 &self.0
250 }
251 }
252
253 impl LivenessInterceptor<crate::CancelChain> {
254 pub fn clone_and_flatten(&self) -> crate::triggers::DynamicCancellationTrigger {
255 // If liveness monitoring is off, we can just use normal flattening.
256 self.as_inner().clone_and_flatten()
257 }
258 }
259
260 impl<R: crate::CancellationTrigger + Clone> crate::CancellationTrigger for LivenessInterceptor<R> {
261 fn is_cancelled(&self) -> bool {
262 // If liveness monitoring is off, we do nothing.
263 self.0.is_cancelled()
264 }
265
266 fn type_name(&self) -> &'static str {
267 self.0.type_name()
268 }
269 }
270}
271
272pub use error::*;
273use liveness::LivenessInterceptor;
274use std::cell::RefCell;
275pub use triggers::*;
276
277/// The "default" [`Cancelled`] cause, reported when the trigger type is unknown.
278pub const UNKNOWN_CAUSE: &str = "UnknownCancellationTrigger";
279
280thread_local! {
281 /// The correct usage of this value lies in the fact that references to the `CancelChain`
282 /// will never leak out of this crate (we can hand out copies to the downstream users).
283 /// Within the crate, the triggers are either read when checking cancellation status, or
284 /// written when entering/leaving scope. However, these two actions are never performed
285 /// simultaneously.
286 static TRIGGER: RefCell<LivenessInterceptor<CancelChain>> = RefCell::new(LivenessInterceptor::default());
287}
288
289/// Call this macro every time your code wants to check for cancellation. It returns
290/// `Result<(), Cancelled>`, which can typically be propagated using the `?` operator.
291#[macro_export]
292macro_rules! is_cancelled {
293 () => {
294 $crate::check_local_cancellation()
295 };
296 ($handler:ident) => {
297 $crate::check_cancellation(&$handler)
298 };
299}
300
301/// Returns [`Cancelled`] if [`CancellationTrigger::is_cancelled`] of the given
302/// `trigger` is true. In typical situations, you don't use this method directly,
303/// but instead use the [`is_cancelled`] macro.
304pub fn check_cancellation<TCancel: CancellationTrigger>(
305 trigger: &TCancel,
306) -> Result<(), Cancelled> {
307 if trigger.is_cancelled() {
308 Err(Cancelled::new(trigger.type_name()))
309 } else {
310 Ok(())
311 }
312}
313
314/// Check if the current thread-local cancellation trigger is cancelled. In typical situations,
315/// you don't use this method directly, but instead use the [`is_cancelled`] macro.
316///
317/// To avoid a repeated borrow of the thread-local value in performance-sensitive applications,
318/// you can use [`active_triggers`] to cache the value in a local variable.
319pub fn check_local_cancellation() -> Result<(), Cancelled> {
320 TRIGGER.with_borrow(check_cancellation)
321}
322
323/// Get a snapshot of the current thread-local cancellation trigger.
324///
325/// This value can be either used to initialize triggers in a new thread using [`on_trigger`],
326/// or used directly as argument to the [`is_cancelled`] macro to speed up cancellation checks.
327pub fn active_triggers() -> DynamicCancellationTrigger {
328 TRIGGER.with_borrow(|trigger| trigger.clone_and_flatten())
329}
330
331/// Run the `action` in a context where a cancellation can be signaled using the given `trigger`.
332///
333/// Once the action is completed, the trigger is de-registered and does not apply
334/// to further code execution.
335pub fn on_trigger<TResult, TError, TCancel, TAction>(
336 trigger: TCancel,
337 action: TAction,
338) -> Result<TResult, TError>
339where
340 TCancel: CancellationTrigger + 'static,
341 TAction: FnOnce() -> Result<TResult, TError>,
342 TError: From<Cancelled>,
343{
344 TRIGGER.with_borrow_mut(|thread_trigger| thread_trigger.as_inner_mut().push(trigger));
345 let result = action();
346 TRIGGER.with_borrow_mut(|thread_trigger| thread_trigger.as_inner_mut().pop());
347 result
348}