cancel_this/triggers/
timer.rs

1use crate::{CancelAtomic, CancellationTrigger, Cancelled};
2use log::{trace, warn};
3use std::sync::Arc;
4use std::sync::mpsc::Sender;
5use std::thread::JoinHandle;
6use std::time::Duration;
7
8/// Run the given `action`, cancelling it if the provided `duration` of time has elapsed,
9/// measured by the [`CancelTimer`].
10///
11/// ```rust
12/// # use std::time::Duration;
13/// # use cancel_this::{is_cancelled, Cancelled};
14/// # let _ = env_logger::builder().is_test(true).try_init();
15/// fn cancellable_counter(count: usize) -> Result<(), Cancelled> {
16///     for _ in 0..count {
17///         is_cancelled!()?;
18///         std::thread::sleep(Duration::from_millis(10));
19///     }
20///     Ok(())
21/// }
22///
23/// let result_fast = cancel_this::on_timeout(Duration::from_millis(100), || cancellable_counter(5));
24/// assert!(result_fast.is_ok());
25///
26/// let result_slow = cancel_this::on_timeout(Duration::from_millis(100), || cancellable_counter(50));
27/// assert!(result_slow.is_err());
28/// ```
29pub fn on_timeout<TResult, TError, TAction>(
30    duration: Duration,
31    action: TAction,
32) -> Result<TResult, TError>
33where
34    TAction: FnOnce() -> Result<TResult, TError>,
35    TError: From<Cancelled>,
36{
37    crate::on_trigger(CancelTimer::start(duration), action)
38}
39
40/// Implementation of [`CancellationTrigger`] that is canceled once the specified [`Duration`]
41/// elapsed. The "timer" is started immediately upon creation.
42///
43/// See also [`on_timeout`].
44///
45/// ## Logging
46///  - `[trace]` Every time a timer is started or elapsed (i.e., upon cancellation).
47///  - `[warn]` If the timer is dropped, but the timer thread cannot be safely destroyed.
48#[derive(Debug, Clone)]
49// The trigger is storing its "core data", but it won't access them. It only needs to keep them
50// around so that they are dropped once all copies of the trigger are destroyed as well.
51#[allow(dead_code)]
52pub struct CancelTimer(CancelAtomic, Arc<CancelTimerCore>);
53
54impl CancellationTrigger for CancelTimer {
55    fn is_cancelled(&self) -> bool {
56        self.0.is_cancelled()
57    }
58
59    fn type_name(&self) -> &'static str {
60        "CancelTimer"
61    }
62}
63
64impl CancelTimer {
65    /// Create a new [`CancelTimer`] that will be canceled once the given `duration` elapsed.
66    pub fn start(duration: Duration) -> Self {
67        let trigger = CancelAtomic::default();
68        let core = CancelTimerCore::start(trigger.clone(), duration);
69        trace!(
70            "`CancelTimer[{:p}]` started; Waiting for {}ms.",
71            trigger.id_ref(),
72            duration.as_millis()
73        );
74        CancelTimer(trigger, Arc::new(core))
75    }
76}
77
78/// An internal data structure that manages the timer required by [`CancelTimer`]. In particular,
79/// it is responsible for safely shutting down the timer thread once the timer is no longer
80/// needed (to avoid leaking a million timer threads in applications where the timeout is long
81/// but is used very often).
82#[derive(Debug)]
83struct CancelTimerCore {
84    trigger: CancelAtomic,
85    timer_thread: Option<JoinHandle<()>>,
86    stop_trigger: Sender<()>,
87}
88
89impl CancelTimerCore {
90    pub fn start(trigger: CancelAtomic, duration: Duration) -> Self {
91        let trigger_copy = trigger.clone();
92        let (sender, receiver) = std::sync::mpsc::channel();
93        let handle = std::thread::spawn(move || {
94            // If this is `Ok`, it means the timer got canceled.
95            // If it is `Err`, it means the duration elapsed.
96            // In practice, this distinction should be irrelevant, since the timer can only
97            // be canceled if the whole cancellation trigger is dropped, meaning it is no
98            // longer observed by anyone...
99            match receiver.recv_timeout(duration) {
100                Ok(()) => (),
101                Err(_) => {
102                    trace!(
103                        "`CancelTimer[{:p}]` elapsed. Canceling.",
104                        trigger_copy.id_ref()
105                    );
106                    trigger_copy.cancel()
107                }
108            }
109        });
110        CancelTimerCore {
111            trigger,
112            timer_thread: Some(handle),
113            stop_trigger: sender,
114        }
115    }
116}
117
118impl Drop for CancelTimerCore {
119    fn drop(&mut self) {
120        let thread = self
121            .timer_thread
122            .take()
123            .expect("Invariant violation: Timer thread removed before drop.");
124
125        let join = match self.stop_trigger.send(()) {
126            Ok(()) => thread.join(),
127            Err(_) => {
128                // The receiver has already been deallocated, meaning the timer most likely
129                // elapsed and the thread should be dead.
130                if !thread.is_finished() {
131                    warn!(
132                        "Timer of `CancelTimer[{:p}]` cannot be stopped. Possible thread leak.`",
133                        self.trigger.id_ref()
134                    );
135                    return;
136                } else {
137                    thread.join()
138                }
139            }
140        };
141        if join.is_err() {
142            // The thread panicked, meaning we probably want to propagate it.
143            panic!("Timer thread of `CancelTimer` trigger panicked.");
144        }
145    }
146}