Skip to main content

wallclock_timer/
thread_timer.rs

1//! Timer implementation that runs on its own thread and uses wall-clock deadlines.
2//!
3//! This module emits errors via the `log` crate. Provide a logger implementation
4//! in your application to see these messages.
5
6use crate::{
7    ClosureState,
8    timers::{State, WallClockTimer},
9};
10use crossbeam_channel as channel;
11use rustc_hash::FxHashMap;
12use snafu::prelude::*;
13use std::{
14    cmp::Ordering,
15    collections::{BinaryHeap, hash_map},
16    fmt,
17    hash::Hash,
18    thread,
19    time::{Duration, SystemTime},
20};
21
22/// Abstraction over time for testing.
23pub trait Clock: Send + 'static {
24    /// Return the current wall-clock time.
25    fn now(&self) -> SystemTime;
26}
27
28/// System clock backed by `SystemTime::now()`.
29#[derive(Debug, Clone, Copy)]
30pub struct RealClock;
31
32impl Clock for RealClock {
33    fn now(&self) -> SystemTime {
34        SystemTime::now()
35    }
36}
37
38/// Errors returned by the thread timer APIs.
39#[derive(Debug, Snafu)]
40pub enum ThreadTimerError {
41    /// Failed to spawn the timer thread.
42    #[snafu(display("Failed to spawn timer thread: {source}"))]
43    SpawnThread { source: std::io::Error },
44    /// Failed to send a message to the timer thread.
45    #[snafu(display("Failed to send message to timer thread"))]
46    SendMessage,
47    /// Timer thread panicked while running.
48    #[snafu(display("Timer thread panicked while waiting to join"))]
49    JoinThread,
50}
51
52impl PartialEq for ThreadTimerError {
53    fn eq(&self, other: &Self) -> bool {
54        match (self, other) {
55            (ThreadTimerError::SpawnThread { .. }, _)
56            | (_, ThreadTimerError::SpawnThread { .. }) => false,
57            (ThreadTimerError::SendMessage, ThreadTimerError::SendMessage) => true,
58            (ThreadTimerError::JoinThread, ThreadTimerError::JoinThread) => true,
59            _ => false,
60        }
61    }
62}
63
64#[derive(Debug)]
65enum TimerMsg<I, O>
66where
67    I: Hash + Clone + Eq + Ord,
68    O: State<Id = I>,
69{
70    Schedule(TimerEntry<I, O>),
71    Cancel(I),
72    Stop,
73}
74
75/// A shorthand for a reference to a [[TimerWithThread]] with closure actions.
76pub type ClosureTimerRef<I> = TimerRef<I, ClosureState<I>>;
77
78/// A shorthand for a reference to a [[TimerWithThread]] with UUID closure actions.
79#[cfg(feature = "uuid")]
80#[cfg_attr(docsrs, doc(cfg(feature = "uuid")))]
81pub type UuidClosureTimerRef = TimerRef<uuid::Uuid, ClosureState<uuid::Uuid>>;
82
83/// A shorthand for a timer that uses UUID closure actions.
84#[cfg(feature = "uuid")]
85#[cfg_attr(docsrs, doc(cfg(feature = "uuid")))]
86pub type UuidClosureTimer = TimerWithThread<uuid::Uuid, ClosureState<uuid::Uuid>>;
87
88/// A reference to a thread timer.
89pub struct TimerRef<I, O>
90where
91    I: Hash + Clone + Eq + Ord,
92    O: State<Id = I>,
93{
94    work_queue: channel::Sender<TimerMsg<I, O>>,
95}
96
97impl<I, O> WallClockTimer for TimerRef<I, O>
98where
99    I: Hash + Clone + Eq + Ord,
100    O: State<Id = I>,
101{
102    type Id = I;
103    type State = O;
104    type Error = ThreadTimerError;
105
106    fn schedule_at(
107        &mut self,
108        deadline: SystemTime,
109        state: Self::State,
110    ) -> Result<(), ThreadTimerError> {
111        let entry = TimerEntry { deadline, state };
112        self.work_queue
113            .send(TimerMsg::Schedule(entry))
114            .map_err(|err| {
115                log::error!("Failed to send schedule message: {}", err);
116                ThreadTimerError::SendMessage
117            })
118    }
119
120    fn cancel(&mut self, id: Self::Id) -> Result<(), ThreadTimerError> {
121        self.work_queue.send(TimerMsg::Cancel(id)).map_err(|err| {
122            log::error!("Failed to send cancel message: {}", err);
123            ThreadTimerError::SendMessage
124        })
125    }
126}
127
128// Explicit Clone implementation, because O does not need to be Clone for the [[TimerRef]] to be Clone.
129impl<I, O> Clone for TimerRef<I, O>
130where
131    I: Hash + Clone + Eq + Ord,
132    O: State<Id = I>,
133{
134    fn clone(&self) -> Self {
135        Self {
136            work_queue: self.work_queue.clone(),
137        }
138    }
139}
140
141/// Default value for [[TimerWithThread::new]] `max_wait_time` argument.
142pub const DEFAULT_MAX_WAIT: Duration = Duration::from_secs(5);
143
144/// A timer implementation that uses its own thread.
145///
146/// This instance is essentially the owning handle.
147/// Non-owning references can be created with [[TimerWithThread::timer_ref]] and are always cloneable.
148pub struct TimerWithThread<I, O>
149where
150    I: Hash + Clone + Eq + Ord,
151    O: State<Id = I>,
152{
153    timer_thread: thread::JoinHandle<()>,
154    work_queue: channel::Sender<TimerMsg<I, O>>,
155}
156
157impl<I, O> TimerWithThread<I, O>
158where
159    I: Hash + Clone + Eq + Ord + fmt::Debug + Send + 'static,
160    O: State<Id = I> + fmt::Debug + Send + 'static,
161{
162    /// Create a new timer with its own thread.
163    ///
164    /// `max_wait_time` is the maximum time we wait until we check the clock again,
165    /// in case it jumped (e.g. after sleep or due to a timezone change).
166    pub fn new(max_wait_time: Duration) -> Result<TimerWithThread<I, O>, ThreadTimerError> {
167        Self::new_with_clock(RealClock, max_wait_time)
168    }
169
170    /// Create a new timer with its own thread using a custom clock.
171    ///
172    /// This is mostly meant for testing, but can also be used to supply other clock sources
173    /// than [[SystemTime]].
174    ///
175    /// `max_wait_time` is the maximum time we wait until we check the clock again,
176    /// in case it jumped (e.g. after sleep or due to a timezone change).
177    pub fn new_with_clock<C>(
178        clock: C,
179        max_wait_time: Duration,
180    ) -> Result<TimerWithThread<I, O>, ThreadTimerError>
181    where
182        C: Clock,
183    {
184        let (s, r) = channel::unbounded();
185        let handle = thread::Builder::new()
186            .name("wallclock-timer-thread".to_string())
187            .spawn(move || {
188                let timer = TimerThread::new(r, clock, max_wait_time);
189                timer.run();
190            })
191            .context(SpawnThreadSnafu)?;
192        Ok(TimerWithThread {
193            timer_thread: handle,
194            work_queue: s,
195        })
196    }
197
198    /// Returns a shareable reference to this timer.
199    pub fn timer_ref(&self) -> TimerRef<I, O> {
200        TimerRef {
201            work_queue: self.work_queue.clone(),
202        }
203    }
204
205    /// Shut this timer down and wait for the thread to join.
206    pub fn shutdown(self) -> Result<(), ThreadTimerError> {
207        if let Err(send_err) = self.work_queue.send(TimerMsg::Stop) {
208            log::error!("Failed to send stop message: {}", send_err);
209            // We can't be sure the time_thread will ever finish.
210            if self.timer_thread.is_finished() {
211                // But if it did, we can print the error message.
212                if self.timer_thread.join().is_err() {
213                    log::error!("The timer thread panicked. See stderr for more information.");
214                }
215            } // Otherwise we'll just leak it, rather than risking blocking this thread as well.
216            SendMessageSnafu.fail()
217        } else {
218            self.timer_thread.join().map_err(|_| {
219                log::error!("The timer thread panicked. See stderr for more information.");
220                JoinThreadSnafu.build()
221            })
222        }
223    }
224
225    /// Same as `shutdown`, but doesn't wait for the thread to join.
226    pub fn shutdown_async(&self) -> Result<(), ThreadTimerError> {
227        self.work_queue.send(TimerMsg::Stop).map_err(|err| {
228            log::error!("Failed to send stop message: {}", err);
229            SendMessageSnafu.build()
230        })
231    }
232}
233
234#[cfg(feature = "uuid")]
235impl TimerWithThread<uuid::Uuid, ClosureState<uuid::Uuid>> {
236    /// Create a UUID-based timer using closure states.
237    pub fn for_uuid_closures(max_wait_time: Duration) -> Result<Self, ThreadTimerError> {
238        Self::new(max_wait_time)
239    }
240}
241
242impl<I, O> fmt::Debug for TimerWithThread<I, O>
243where
244    I: Hash + Clone + Eq + Ord,
245    O: State<Id = I>,
246{
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        write!(f, "<TimerWithThread>")
249    }
250}
251
252impl<I, O> Default for TimerWithThread<I, O>
253where
254    I: Hash + Clone + Eq + Ord + fmt::Debug + Send + 'static,
255    O: State<Id = I> + fmt::Debug + Send + 'static,
256{
257    fn default() -> Self {
258        Self::new(DEFAULT_MAX_WAIT).expect("Failed to create default timer")
259    }
260}
261
262#[derive(Debug, PartialEq, Eq)]
263struct HeapEntry<I> {
264    deadline: SystemTime,
265    id: I,
266}
267
268impl<I> PartialOrd for HeapEntry<I>
269where
270    I: Eq + Ord,
271{
272    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
273        Some(self.cmp(other))
274    }
275}
276
277impl<I> Ord for HeapEntry<I>
278where
279    I: Eq + Ord,
280{
281    fn cmp(&self, other: &Self) -> Ordering {
282        // match other.deadline.cmp(&self.deadline) {
283        //     Ordering::Equal => other.sid.cmp(&self.id),
284        //     ord => ord,
285        // }
286        other
287            .deadline
288            .cmp(&self.deadline)
289            .then_with(|| other.id.cmp(&self.id))
290    }
291}
292
293/// A concrete entry for an outstanding timeout using a wall-clock deadline.
294#[derive(Debug)]
295struct TimerEntry<I, O>
296where
297    I: Hash + Clone + Eq,
298    O: State<Id = I>,
299{
300    /// The wall clock deadline at which this should trigger.
301    pub deadline: SystemTime,
302    /// The information to store along with the timer.
303    pub state: O,
304}
305
306impl<I, O> TimerEntry<I, O>
307where
308    I: Hash + Clone + Eq,
309    O: State<Id = I>,
310{
311    /// A reference to the id associated with this entry.
312    pub fn id(&self) -> &I {
313        self.state.id()
314    }
315}
316
317struct TimerThread<I, O, C>
318where
319    I: Hash + Clone + Eq + Ord + fmt::Debug,
320    O: State<Id = I> + fmt::Debug,
321    C: Clock + Send + 'static,
322{
323    entry_queue: BinaryHeap<HeapEntry<I>>,
324    entries: FxHashMap<I, TimerEntry<I, O>>,
325    work_queue: channel::Receiver<TimerMsg<I, O>>,
326    running: bool,
327    clock: C,
328    max_wait_time: Duration,
329}
330
331impl<I, O, C> TimerThread<I, O, C>
332where
333    I: Hash + Clone + Eq + Ord + fmt::Debug,
334    O: State<Id = I> + fmt::Debug,
335    C: Clock + Send + 'static,
336{
337    fn new(
338        work_queue: channel::Receiver<TimerMsg<I, O>>,
339        clock: C,
340        max_wait_time: Duration,
341    ) -> Self {
342        TimerThread {
343            entry_queue: BinaryHeap::new(),
344            entries: FxHashMap::default(),
345            work_queue,
346            running: true,
347            clock,
348            max_wait_time,
349        }
350    }
351
352    fn run(mut self) {
353        'run_loop: while self.running {
354            let now = self.clock.now();
355            // eprintln!(
356            //     "Checking run loop at {now:?}. State: running={}, entry_queue={:?}, entries={:?}",
357            //     self.running, self.entry_queue, self.entries
358            // );
359            self.process_due(now);
360            if !self.running {
361                break 'run_loop;
362            }
363
364            match self.next_deadline() {
365                None => match self.work_queue.recv() {
366                    Ok(msg) => self.handle_msg(msg),
367                    Err(channel::RecvError) => {
368                        log::error!("Channel died, stopping timer thread...");
369                        break 'run_loop;
370                    }
371                },
372                Some(deadline) => {
373                    if deadline <= now {
374                        continue 'run_loop;
375                    }
376                    // Take a new reading of the clock, since some time could have passed processing the due entries.
377                    let wait = deadline
378                        .duration_since(self.clock.now())
379                        .unwrap_or(Duration::ZERO)
380                        .min(self.max_wait_time);
381                    match self.work_queue.recv_timeout(wait) {
382                        Ok(msg) => self.handle_msg(msg),
383                        Err(channel::RecvTimeoutError::Timeout) => {
384                            continue 'run_loop;
385                        }
386                        Err(channel::RecvTimeoutError::Disconnected) => {
387                            log::error!("Channel died, stopping timer thread...");
388                            break 'run_loop;
389                        }
390                    }
391                }
392            }
393        }
394    }
395
396    fn handle_msg(&mut self, msg: TimerMsg<I, O>) {
397        match msg {
398            TimerMsg::Stop => {
399                log::info!("Timer thread received stop signal. Shutting down...");
400                self.running = false
401            }
402            TimerMsg::Schedule(entry) => self.schedule_entry(entry),
403            TimerMsg::Cancel(id) => match self.entries.remove(&id) {
404                Some(e) => {
405                    log::info!("Cancelled timer entry {e:?}");
406                }
407                None => {
408                    log::warn!(
409                        "Could not find timer entry with {id:?} to cancel. It might have expired already?"
410                    );
411                }
412            },
413        }
414    }
415
416    fn schedule_entry(&mut self, entry: TimerEntry<I, O>) {
417        let now = self.clock.now();
418        if entry.deadline <= now {
419            log::debug!(
420                "Triggering entry with id {:?} instead of scheduling, since it's already expired.",
421                entry.id()
422            );
423            entry.state.trigger();
424            return;
425        }
426        let id = entry.id().clone();
427        self.insert_entry(id, entry);
428    }
429
430    fn insert_entry(&mut self, id: I, entry: TimerEntry<I, O>) {
431        match self.entries.entry(id) {
432            hash_map::Entry::Occupied(e) => {
433                log::error!(
434                    "Attempted to re-insert a timer entry with an already existing id. Scheduled timer ids must be unique! Existing entry: {:?}, new entry: {:?}",
435                    e,
436                    entry
437                );
438            }
439            hash_map::Entry::Vacant(e) => {
440                let id = entry.id().clone();
441                let deadline = entry.deadline;
442                e.insert(entry);
443                self.entry_queue.push(HeapEntry { deadline, id });
444            }
445        }
446    }
447
448    fn process_due(&mut self, now: SystemTime) {
449        while let Some(scheduled) = self.pop_next_due(now) {
450            scheduled.state.trigger();
451        }
452    }
453
454    #[inline(always)]
455    fn next_deadline(&mut self) -> Option<SystemTime> {
456        self.entry_queue.peek().map(|entry| entry.deadline)
457    }
458
459    fn pop_next_due(&mut self, now: SystemTime) -> Option<TimerEntry<I, O>> {
460        if let Some(top) = self.entry_queue.peek() {
461            if top.deadline > now {
462                return None;
463            }
464            let entry = self.entry_queue.pop().expect("peeked entry");
465            let scheduled = self.entries.remove(&entry.id);
466            if scheduled.is_none() {
467                log::debug!("Skipping entry {entry:?}, because it always already cancelled.");
468            }
469            scheduled
470        } else {
471            None
472        }
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::timers::ClosureTimer;
480    use std::{
481        sync::{
482            Arc,
483            Mutex,
484            Once,
485            atomic::{AtomicUsize, Ordering as AtomicOrdering},
486        },
487        time::Instant,
488    };
489
490    fn init_logger() {
491        static INIT: Once = Once::new();
492        INIT.call_once(|| {
493            let _ = simple_logger::SimpleLogger::new().init();
494            log::set_max_level(log::LevelFilter::Debug);
495        });
496    }
497
498    #[derive(Clone)]
499    struct MockClock {
500        now: Arc<Mutex<SystemTime>>,
501    }
502
503    impl MockClock {
504        fn new(start: SystemTime) -> Self {
505            Self {
506                now: Arc::new(Mutex::new(start)),
507            }
508        }
509
510        fn advance(&self, delta: Duration) {
511            let mut guard = self.now.lock().expect("clock lock");
512            *guard = guard.checked_add(delta).expect("advance");
513        }
514
515        fn set(&self, time: SystemTime) {
516            let mut guard = self.now.lock().expect("clock lock");
517            *guard = time;
518        }
519    }
520
521    impl Clock for MockClock {
522        fn now(&self) -> SystemTime {
523            *self.now.lock().expect("clock lock")
524        }
525    }
526
527    #[derive(Clone, Debug, Default)]
528    struct AtomicCounter {
529        inner: Arc<AtomicUsize>,
530    }
531
532    impl AtomicCounter {
533        fn new() -> Self {
534            Self {
535                inner: Arc::new(AtomicUsize::new(0)),
536            }
537        }
538
539        fn increment(&self) {
540            self.inner.fetch_add(1, AtomicOrdering::SeqCst);
541        }
542
543        fn get(&self) -> usize {
544            self.inner.load(AtomicOrdering::SeqCst)
545        }
546    }
547
548    #[derive(Debug)]
549    struct TestState {
550        id: u64,
551        hits: AtomicCounter,
552    }
553
554    impl State for TestState {
555        type Id = u64;
556
557        fn id(&self) -> &Self::Id {
558            &self.id
559        }
560
561        fn trigger(self) {
562            self.hits.increment();
563        }
564    }
565
566    #[test]
567    fn mock_clock_triggers_on_deadline() {
568        init_logger();
569        let clock = MockClock::new(SystemTime::UNIX_EPOCH);
570        let timer = TimerWithThread::<u64, TestState>::new_with_clock(
571            clock.clone(),
572            Duration::from_millis(5),
573        )
574        .expect("timer");
575        let mut tref = timer.timer_ref();
576        let hits = AtomicCounter::new();
577        let hits2 = AtomicCounter::new();
578
579        let deadline = SystemTime::UNIX_EPOCH + Duration::from_millis(5);
580        tref.schedule_at(
581            deadline,
582            TestState {
583                id: 1,
584                hits: hits.clone(),
585            },
586        )
587        .expect("schedule");
588
589        let later_deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(20);
590        tref.schedule_at(
591            later_deadline,
592            TestState {
593                id: 2,
594                hits: hits2.clone(),
595            },
596        )
597        .expect("schedule");
598
599        // Wait a good bit longer than the initial timeout to ensure it's properly controlled by
600        // the clock.
601        thread::sleep(Duration::from_millis(20));
602        assert_eq!(hits.get(), 0);
603        assert_eq!(hits2.get(), 0);
604
605        clock.advance(Duration::from_millis(6));
606        wait_for_hits(&hits, 1, Duration::from_secs(2));
607        assert_eq!(hits.get(), 1);
608        assert_eq!(hits2.get(), 0);
609
610        clock.advance(Duration::from_secs(20));
611        wait_for_hits(&hits2, 1, Duration::from_secs(2));
612        assert_eq!(hits.get(), 1);
613        assert_eq!(hits2.get(), 1);
614
615        timer.shutdown().expect("shutdown");
616    }
617
618    #[test]
619    fn wake_on_message_while_waiting_long_timeout() {
620        init_logger();
621        let timer = TimerWithThread::<u64, TestState>::default();
622        let mut tref = timer.timer_ref();
623
624        let far_hits = AtomicCounter::new();
625        let far_deadline = SystemTime::now() + Duration::from_hours(1000);
626        tref.schedule_at(
627            far_deadline,
628            TestState {
629                id: 1,
630                hits: far_hits.clone(),
631            },
632        )
633        .expect("schedule");
634
635        // Wait a bit so the thread can go to sleep on the listening channel.
636        thread::sleep(Duration::from_millis(5));
637
638        let near_hits = AtomicCounter::new();
639        let near_deadline = SystemTime::now() + Duration::from_millis(50);
640        tref.schedule_at(
641            near_deadline,
642            TestState {
643                id: 2,
644                hits: near_hits.clone(),
645            },
646        )
647        .expect("schedule");
648
649        wait_for_hits(&near_hits, 1, Duration::from_secs(2));
650        assert_eq!(near_hits.get(), 1);
651        assert_eq!(far_hits.get(), 0);
652
653        timer.shutdown().expect("shutdown");
654    }
655
656    #[test]
657    fn time_jump_forward_triggers_immediately() {
658        init_logger();
659        let clock = MockClock::new(SystemTime::UNIX_EPOCH);
660        let timer = TimerWithThread::<u64, TestState>::new_with_clock(
661            clock.clone(),
662            Duration::from_millis(5),
663        )
664        .expect("timer");
665        let mut tref = timer.timer_ref();
666        let hits = AtomicCounter::new();
667        let hits2 = AtomicCounter::new();
668
669        let deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(10);
670        let far_deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(90);
671        tref.schedule_at(
672            deadline,
673            TestState {
674                id: 1,
675                hits: hits.clone(),
676            },
677        )
678        .expect("schedule");
679        tref.schedule_at(
680            far_deadline,
681            TestState {
682                id: 2,
683                hits: hits2.clone(),
684            },
685        )
686        .expect("schedule");
687
688        // Wait to ensure nothing gets triggered without advancing the clock.
689        thread::sleep(Duration::from_millis(20));
690        assert_eq!(hits.get(), 0);
691        assert_eq!(hits2.get(), 0);
692
693        clock.advance(Duration::from_secs(30));
694        wait_for_hits(&hits, 1, Duration::from_secs(2));
695        assert_eq!(hits.get(), 1);
696        assert_eq!(hits2.get(), 0);
697
698        clock.advance(Duration::from_secs(100));
699        wait_for_hits(&hits2, 1, Duration::from_secs(2));
700        assert_eq!(hits.get(), 1);
701        assert_eq!(hits2.get(), 1);
702
703        timer.shutdown().expect("shutdown");
704    }
705
706    #[test]
707    fn time_jump_backward_does_not_trigger_early() {
708        init_logger();
709        let start = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
710        let clock = MockClock::new(start);
711        let timer = TimerWithThread::<u64, TestState>::new_with_clock(
712            clock.clone(),
713            Duration::from_millis(5),
714        )
715        .expect("timer");
716        let mut tref = timer.timer_ref();
717        let hits = AtomicCounter::new();
718        let hits2 = AtomicCounter::new();
719
720        // Essentially 110
721        let deadline = start + Duration::from_secs(10);
722        // Essentially 140
723        let later_deadline = start + Duration::from_secs(40);
724        tref.schedule_at(
725            deadline,
726            TestState {
727                id: 1,
728                hits: hits.clone(),
729            },
730        )
731        .expect("schedule");
732        tref.schedule_at(
733            later_deadline,
734            TestState {
735                id: 2,
736                hits: hits2.clone(),
737            },
738        )
739        .expect("schedule");
740
741        // Wait to ensure nothing gets triggered without advancing the clock.
742        thread::sleep(Duration::from_millis(20));
743        assert_eq!(hits.get(), 0);
744        assert_eq!(hits2.get(), 0);
745
746        // Jump the clock backwards (to 70)
747        clock.set(start - Duration::from_secs(30));
748        // Wait a bit for the test thread to wake up and check the clock.
749        thread::sleep(Duration::from_millis(20));
750        // Nothing should have gotten triggered.
751        assert_eq!(hits.get(), 0);
752        assert_eq!(hits2.get(), 0);
753
754        // Advance forward again (to 90)
755        clock.advance(Duration::from_secs(20));
756        // Wait a bit for the test thread to wake up and check the clock.
757        thread::sleep(Duration::from_millis(20));
758        // Nothing should have gotten triggered.
759        assert_eq!(hits.get(), 0);
760        assert_eq!(hits2.get(), 0);
761
762        // Advance forward again past the first deadline (to 111)
763        clock.advance(Duration::from_secs(21));
764        wait_for_hits(&hits, 1, Duration::from_secs(2));
765        assert_eq!(hits.get(), 1);
766        assert_eq!(hits2.get(), 0);
767
768        // Advance forward past the second deadline (to 142)
769        clock.advance(Duration::from_secs(31));
770        wait_for_hits(&hits2, 1, Duration::from_secs(2));
771        assert_eq!(hits.get(), 1);
772        assert_eq!(hits2.get(), 1);
773
774        timer.shutdown().expect("shutdown");
775    }
776
777    #[test]
778    fn closure_timer_schedules_actions() {
779        init_logger();
780        let clock = MockClock::new(SystemTime::UNIX_EPOCH);
781        let timer = TimerWithThread::<u64, crate::timers::ClosureState<u64>>::new_with_clock(
782            clock.clone(),
783            Duration::from_millis(5),
784        )
785        .expect("timer");
786        let mut tref = timer.timer_ref();
787
788        let hits = AtomicCounter::new();
789        let hits2 = AtomicCounter::new();
790
791        let hits_clone = hits.clone();
792        tref.schedule_action_at(
793            1,
794            SystemTime::UNIX_EPOCH + Duration::from_secs(5),
795            move |_| {
796                hits_clone.increment();
797            },
798        )
799        .expect("schedule");
800        let hits2_clone = hits2.clone();
801        tref.schedule_action_at(
802            2,
803            SystemTime::UNIX_EPOCH + Duration::from_secs(50),
804            move |_| {
805                hits2_clone.increment();
806            },
807        )
808        .expect("schedule");
809
810        thread::sleep(Duration::from_millis(20));
811        assert_eq!(hits.get(), 0);
812        assert_eq!(hits2.get(), 0);
813
814        clock.advance(Duration::from_secs(10));
815        wait_for_hits(&hits, 1, Duration::from_secs(2));
816        assert_eq!(hits.get(), 1);
817        assert_eq!(hits2.get(), 0);
818
819        clock.advance(Duration::from_secs(50));
820        wait_for_hits(&hits2, 1, Duration::from_secs(2));
821        assert_eq!(hits2.get(), 1);
822        timer.shutdown().expect("shutdown");
823    }
824
825    #[test]
826    fn cancel_prevents_overdue_trigger_with_multiple_timers() {
827        init_logger();
828        let clock = MockClock::new(SystemTime::UNIX_EPOCH);
829        let timer = TimerWithThread::<u64, TestState>::new_with_clock(
830            clock.clone(),
831            Duration::from_millis(5),
832        )
833        .expect("timer");
834        let mut tref = timer.timer_ref();
835        let hits = AtomicCounter::new();
836        let hits2 = AtomicCounter::new();
837
838        let deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(5);
839        tref.schedule_at(
840            deadline,
841            TestState {
842                id: 1,
843                hits: hits.clone(),
844            },
845        )
846        .expect("schedule");
847        tref.schedule_at(
848            deadline,
849            TestState {
850                id: 2,
851                hits: hits2.clone(),
852            },
853        )
854        .expect("schedule");
855
856        // Wait a bit for things to get scheduled.
857        thread::sleep(Duration::from_millis(20));
858
859        tref.cancel(1).expect("cancel");
860        thread::sleep(Duration::from_millis(20));
861        assert_eq!(hits.get(), 0);
862        assert_eq!(hits2.get(), 0);
863
864        clock.advance(Duration::from_secs(6));
865        wait_for_hits(&hits2, 1, Duration::from_secs(2));
866        assert_eq!(hits.get(), 0);
867        assert_eq!(hits2.get(), 1);
868
869        timer.shutdown().expect("shutdown");
870    }
871
872    #[test]
873    fn join_thread_error_from_panicking_handler() {
874        init_logger();
875        let timer = TimerWithThread::<u64, crate::timers::ClosureState<u64>>::new(DEFAULT_MAX_WAIT)
876            .expect("timer");
877        let mut tref = timer.timer_ref();
878        tref.schedule_action_at(1, SystemTime::now(), |_| panic!("boom"))
879            .expect("schedule");
880        thread::sleep(Duration::from_millis(10));
881        let err = timer.shutdown().expect_err("expected shutdown error");
882        // Depending on time it may either throw while trying to send the shutdown or while waiting for it.
883        const POSSIBLE_ERRORS: [ThreadTimerError; 2] =
884            [ThreadTimerError::JoinThread, ThreadTimerError::SendMessage];
885        assert!(
886            POSSIBLE_ERRORS.contains(&err),
887            "Should have gotten a shutdown error but was: {err}"
888        );
889    }
890
891    fn wait_for_hits(hits: &AtomicCounter, expected: usize, timeout: Duration) {
892        let start = Instant::now();
893        'wait_loop: while start.elapsed() < timeout {
894            if hits.get() >= expected {
895                break 'wait_loop;
896            }
897            thread::yield_now();
898        }
899    }
900}