cancel_this/triggers/timer.rs
1use crate::{CancelAtomic, CancellationTrigger, Cancelled};
2use log::{trace, warn};
3use std::sync::Arc;
4use std::sync::mpsc::Sender;
5use std::thread::JoinHandle;
6use std::time::Duration;
7
8/// Run the given `action`, cancelling it if the provided `duration` of time has elapsed,
9/// measured by the [`CancelTimer`].
10///
11/// ```rust
12/// # use std::time::Duration;
13/// # use cancel_this::{is_cancelled, Cancelled};
14/// # let _ = env_logger::builder().is_test(true).try_init();
15/// fn cancellable_counter(count: usize) -> Result<(), Cancelled> {
16/// for _ in 0..count {
17/// is_cancelled!()?;
18/// std::thread::sleep(Duration::from_millis(10));
19/// }
20/// Ok(())
21/// }
22///
23/// let result_fast = cancel_this::on_timeout(Duration::from_millis(100), || cancellable_counter(5));
24/// assert!(result_fast.is_ok());
25///
26/// let result_slow = cancel_this::on_timeout(Duration::from_millis(100), || cancellable_counter(50));
27/// assert!(result_slow.is_err());
28/// ```
29pub fn on_timeout<TResult, TError, TAction>(
30 duration: Duration,
31 action: TAction,
32) -> Result<TResult, TError>
33where
34 TAction: FnOnce() -> Result<TResult, TError>,
35 TError: From<Cancelled>,
36{
37 crate::on_trigger(CancelTimer::start(duration), action)
38}
39
40/// Implementation of [`CancellationTrigger`] that is canceled once the specified [`Duration`]
41/// elapsed. The "timer" is started immediately upon creation.
42///
43/// See also [`on_timeout`].
44///
45/// ## Logging
46/// - `[trace]` Every time a timer is started or elapsed (i.e., upon cancellation).
47/// - `[warn]` If the timer is dropped, but the timer thread cannot be safely destroyed.
48#[derive(Debug, Clone)]
49// The trigger is storing its "core data", but it won't access them. It only needs to keep them
50// around so that they are dropped once all copies of the trigger are destroyed as well.
51#[allow(dead_code)]
52pub struct CancelTimer(CancelAtomic, Arc<CancelTimerCore>);
53
54impl CancellationTrigger for CancelTimer {
55 fn is_cancelled(&self) -> bool {
56 self.0.is_cancelled()
57 }
58
59 fn type_name(&self) -> &'static str {
60 "CancelTimer"
61 }
62}
63
64impl CancelTimer {
65 /// Create a new [`CancelTimer`] that will be canceled once the given `duration` elapsed.
66 pub fn start(duration: Duration) -> Self {
67 let trigger = CancelAtomic::default();
68 let core = CancelTimerCore::start(trigger.clone(), duration);
69 trace!(
70 "`CancelTimer[{:p}]` started; Waiting for {}ms.",
71 trigger.id_ref(),
72 duration.as_millis()
73 );
74 CancelTimer(trigger, Arc::new(core))
75 }
76}
77
78/// An internal data structure that manages the timer required by [`CancelTimer`]. In particular,
79/// it is responsible for safely shutting down the timer thread once the timer is no longer
80/// needed (to avoid leaking a million timer threads in applications where the timeout is long
81/// but is used very often).
82#[derive(Debug)]
83struct CancelTimerCore {
84 trigger: CancelAtomic,
85 timer_thread: Option<JoinHandle<()>>,
86 stop_trigger: Sender<()>,
87}
88
89impl CancelTimerCore {
90 pub fn start(trigger: CancelAtomic, duration: Duration) -> Self {
91 let trigger_copy = trigger.clone();
92 let (sender, receiver) = std::sync::mpsc::channel();
93 let handle = std::thread::spawn(move || {
94 // If this is `Ok`, it means the timer got canceled.
95 // If it is `Err`, it means the duration elapsed.
96 // In practice, this distinction should be irrelevant, since the timer can only
97 // be canceled if the whole cancellation trigger is dropped, meaning it is no
98 // longer observed by anyone...
99 match receiver.recv_timeout(duration) {
100 Ok(()) => (),
101 Err(_) => {
102 trace!(
103 "`CancelTimer[{:p}]` elapsed. Canceling.",
104 trigger_copy.id_ref()
105 );
106 trigger_copy.cancel()
107 }
108 }
109 });
110 CancelTimerCore {
111 trigger,
112 timer_thread: Some(handle),
113 stop_trigger: sender,
114 }
115 }
116}
117
118impl Drop for CancelTimerCore {
119 fn drop(&mut self) {
120 let thread = self
121 .timer_thread
122 .take()
123 .expect("Invariant violation: Timer thread removed before drop.");
124
125 let join = match self.stop_trigger.send(()) {
126 Ok(()) => thread.join(),
127 Err(_) => {
128 // The receiver has already been deallocated, meaning the timer most likely
129 // elapsed and the thread should be dead.
130 if !thread.is_finished() {
131 warn!(
132 "Timer of `CancelTimer[{:p}]` cannot be stopped. Possible thread leak.`",
133 self.trigger.id_ref()
134 );
135 return;
136 } else {
137 thread.join()
138 }
139 }
140 };
141 if join.is_err() {
142 // The thread panicked, meaning we probably want to propagate it.
143 panic!("Timer thread of `CancelTimer` trigger panicked.");
144 }
145 }
146}