1use crate::{
7 ClosureState,
8 timers::{State, WallClockTimer},
9};
10use crossbeam_channel as channel;
11use rustc_hash::FxHashMap;
12use snafu::prelude::*;
13use std::{
14 cmp::Ordering,
15 collections::{BinaryHeap, hash_map},
16 fmt,
17 hash::Hash,
18 thread,
19 time::{Duration, SystemTime},
20};
21
22pub trait Clock: Send + 'static {
24 fn now(&self) -> SystemTime;
26}
27
28#[derive(Debug, Clone, Copy)]
30pub struct RealClock;
31
32impl Clock for RealClock {
33 fn now(&self) -> SystemTime {
34 SystemTime::now()
35 }
36}
37
38#[derive(Debug, Snafu)]
40pub enum ThreadTimerError {
41 #[snafu(display("Failed to spawn timer thread: {source}"))]
43 SpawnThread { source: std::io::Error },
44 #[snafu(display("Failed to send message to timer thread"))]
46 SendMessage,
47 #[snafu(display("Timer thread panicked while waiting to join"))]
49 JoinThread,
50}
51
52impl PartialEq for ThreadTimerError {
53 fn eq(&self, other: &Self) -> bool {
54 match (self, other) {
55 (ThreadTimerError::SpawnThread { .. }, _)
56 | (_, ThreadTimerError::SpawnThread { .. }) => false,
57 (ThreadTimerError::SendMessage, ThreadTimerError::SendMessage) => true,
58 (ThreadTimerError::JoinThread, ThreadTimerError::JoinThread) => true,
59 _ => false,
60 }
61 }
62}
63
64#[derive(Debug)]
65enum TimerMsg<I, O>
66where
67 I: Hash + Clone + Eq + Ord,
68 O: State<Id = I>,
69{
70 Schedule(TimerEntry<I, O>),
71 Cancel(I),
72 Stop,
73}
74
75pub type ClosureTimerRef<I> = TimerRef<I, ClosureState<I>>;
77
78#[cfg(feature = "uuid")]
80#[cfg_attr(docsrs, doc(cfg(feature = "uuid")))]
81pub type UuidClosureTimerRef = TimerRef<uuid::Uuid, ClosureState<uuid::Uuid>>;
82
83#[cfg(feature = "uuid")]
85#[cfg_attr(docsrs, doc(cfg(feature = "uuid")))]
86pub type UuidClosureTimer = TimerWithThread<uuid::Uuid, ClosureState<uuid::Uuid>>;
87
88pub struct TimerRef<I, O>
90where
91 I: Hash + Clone + Eq + Ord,
92 O: State<Id = I>,
93{
94 work_queue: channel::Sender<TimerMsg<I, O>>,
95}
96
97impl<I, O> WallClockTimer for TimerRef<I, O>
98where
99 I: Hash + Clone + Eq + Ord,
100 O: State<Id = I>,
101{
102 type Id = I;
103 type State = O;
104 type Error = ThreadTimerError;
105
106 fn schedule_at(
107 &mut self,
108 deadline: SystemTime,
109 state: Self::State,
110 ) -> Result<(), ThreadTimerError> {
111 let entry = TimerEntry { deadline, state };
112 self.work_queue
113 .send(TimerMsg::Schedule(entry))
114 .map_err(|err| {
115 log::error!("Failed to send schedule message: {}", err);
116 ThreadTimerError::SendMessage
117 })
118 }
119
120 fn cancel(&mut self, id: Self::Id) -> Result<(), ThreadTimerError> {
121 self.work_queue.send(TimerMsg::Cancel(id)).map_err(|err| {
122 log::error!("Failed to send cancel message: {}", err);
123 ThreadTimerError::SendMessage
124 })
125 }
126}
127
128impl<I, O> Clone for TimerRef<I, O>
130where
131 I: Hash + Clone + Eq + Ord,
132 O: State<Id = I>,
133{
134 fn clone(&self) -> Self {
135 Self {
136 work_queue: self.work_queue.clone(),
137 }
138 }
139}
140
141pub const DEFAULT_MAX_WAIT: Duration = Duration::from_secs(5);
143
144pub struct TimerWithThread<I, O>
149where
150 I: Hash + Clone + Eq + Ord,
151 O: State<Id = I>,
152{
153 timer_thread: thread::JoinHandle<()>,
154 work_queue: channel::Sender<TimerMsg<I, O>>,
155}
156
157impl<I, O> TimerWithThread<I, O>
158where
159 I: Hash + Clone + Eq + Ord + fmt::Debug + Send + 'static,
160 O: State<Id = I> + fmt::Debug + Send + 'static,
161{
162 pub fn new(max_wait_time: Duration) -> Result<TimerWithThread<I, O>, ThreadTimerError> {
167 Self::new_with_clock(RealClock, max_wait_time)
168 }
169
170 pub fn new_with_clock<C>(
178 clock: C,
179 max_wait_time: Duration,
180 ) -> Result<TimerWithThread<I, O>, ThreadTimerError>
181 where
182 C: Clock,
183 {
184 let (s, r) = channel::unbounded();
185 let handle = thread::Builder::new()
186 .name("wallclock-timer-thread".to_string())
187 .spawn(move || {
188 let timer = TimerThread::new(r, clock, max_wait_time);
189 timer.run();
190 })
191 .context(SpawnThreadSnafu)?;
192 Ok(TimerWithThread {
193 timer_thread: handle,
194 work_queue: s,
195 })
196 }
197
198 pub fn timer_ref(&self) -> TimerRef<I, O> {
200 TimerRef {
201 work_queue: self.work_queue.clone(),
202 }
203 }
204
205 pub fn shutdown(self) -> Result<(), ThreadTimerError> {
207 if let Err(send_err) = self.work_queue.send(TimerMsg::Stop) {
208 log::error!("Failed to send stop message: {}", send_err);
209 if self.timer_thread.is_finished() {
211 if self.timer_thread.join().is_err() {
213 log::error!("The timer thread panicked. See stderr for more information.");
214 }
215 } SendMessageSnafu.fail()
217 } else {
218 self.timer_thread.join().map_err(|_| {
219 log::error!("The timer thread panicked. See stderr for more information.");
220 JoinThreadSnafu.build()
221 })
222 }
223 }
224
225 pub fn shutdown_async(&self) -> Result<(), ThreadTimerError> {
227 self.work_queue.send(TimerMsg::Stop).map_err(|err| {
228 log::error!("Failed to send stop message: {}", err);
229 SendMessageSnafu.build()
230 })
231 }
232}
233
234#[cfg(feature = "uuid")]
235impl TimerWithThread<uuid::Uuid, ClosureState<uuid::Uuid>> {
236 pub fn for_uuid_closures(max_wait_time: Duration) -> Result<Self, ThreadTimerError> {
238 Self::new(max_wait_time)
239 }
240}
241
242impl<I, O> fmt::Debug for TimerWithThread<I, O>
243where
244 I: Hash + Clone + Eq + Ord,
245 O: State<Id = I>,
246{
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 write!(f, "<TimerWithThread>")
249 }
250}
251
252impl<I, O> Default for TimerWithThread<I, O>
253where
254 I: Hash + Clone + Eq + Ord + fmt::Debug + Send + 'static,
255 O: State<Id = I> + fmt::Debug + Send + 'static,
256{
257 fn default() -> Self {
258 Self::new(DEFAULT_MAX_WAIT).expect("Failed to create default timer")
259 }
260}
261
262#[derive(Debug, PartialEq, Eq)]
263struct HeapEntry<I> {
264 deadline: SystemTime,
265 id: I,
266}
267
268impl<I> PartialOrd for HeapEntry<I>
269where
270 I: Eq + Ord,
271{
272 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
273 Some(self.cmp(other))
274 }
275}
276
277impl<I> Ord for HeapEntry<I>
278where
279 I: Eq + Ord,
280{
281 fn cmp(&self, other: &Self) -> Ordering {
282 other
287 .deadline
288 .cmp(&self.deadline)
289 .then_with(|| other.id.cmp(&self.id))
290 }
291}
292
293#[derive(Debug)]
295struct TimerEntry<I, O>
296where
297 I: Hash + Clone + Eq,
298 O: State<Id = I>,
299{
300 pub deadline: SystemTime,
302 pub state: O,
304}
305
306impl<I, O> TimerEntry<I, O>
307where
308 I: Hash + Clone + Eq,
309 O: State<Id = I>,
310{
311 pub fn id(&self) -> &I {
313 self.state.id()
314 }
315}
316
317struct TimerThread<I, O, C>
318where
319 I: Hash + Clone + Eq + Ord + fmt::Debug,
320 O: State<Id = I> + fmt::Debug,
321 C: Clock + Send + 'static,
322{
323 entry_queue: BinaryHeap<HeapEntry<I>>,
324 entries: FxHashMap<I, TimerEntry<I, O>>,
325 work_queue: channel::Receiver<TimerMsg<I, O>>,
326 running: bool,
327 clock: C,
328 max_wait_time: Duration,
329}
330
331impl<I, O, C> TimerThread<I, O, C>
332where
333 I: Hash + Clone + Eq + Ord + fmt::Debug,
334 O: State<Id = I> + fmt::Debug,
335 C: Clock + Send + 'static,
336{
337 fn new(
338 work_queue: channel::Receiver<TimerMsg<I, O>>,
339 clock: C,
340 max_wait_time: Duration,
341 ) -> Self {
342 TimerThread {
343 entry_queue: BinaryHeap::new(),
344 entries: FxHashMap::default(),
345 work_queue,
346 running: true,
347 clock,
348 max_wait_time,
349 }
350 }
351
352 fn run(mut self) {
353 'run_loop: while self.running {
354 let now = self.clock.now();
355 self.process_due(now);
360 if !self.running {
361 break 'run_loop;
362 }
363
364 match self.next_deadline() {
365 None => match self.work_queue.recv() {
366 Ok(msg) => self.handle_msg(msg),
367 Err(channel::RecvError) => {
368 log::error!("Channel died, stopping timer thread...");
369 break 'run_loop;
370 }
371 },
372 Some(deadline) => {
373 if deadline <= now {
374 continue 'run_loop;
375 }
376 let wait = deadline
378 .duration_since(self.clock.now())
379 .unwrap_or(Duration::ZERO)
380 .min(self.max_wait_time);
381 match self.work_queue.recv_timeout(wait) {
382 Ok(msg) => self.handle_msg(msg),
383 Err(channel::RecvTimeoutError::Timeout) => {
384 continue 'run_loop;
385 }
386 Err(channel::RecvTimeoutError::Disconnected) => {
387 log::error!("Channel died, stopping timer thread...");
388 break 'run_loop;
389 }
390 }
391 }
392 }
393 }
394 }
395
396 fn handle_msg(&mut self, msg: TimerMsg<I, O>) {
397 match msg {
398 TimerMsg::Stop => {
399 log::info!("Timer thread received stop signal. Shutting down...");
400 self.running = false
401 }
402 TimerMsg::Schedule(entry) => self.schedule_entry(entry),
403 TimerMsg::Cancel(id) => match self.entries.remove(&id) {
404 Some(e) => {
405 log::info!("Cancelled timer entry {e:?}");
406 }
407 None => {
408 log::warn!(
409 "Could not find timer entry with {id:?} to cancel. It might have expired already?"
410 );
411 }
412 },
413 }
414 }
415
416 fn schedule_entry(&mut self, entry: TimerEntry<I, O>) {
417 let now = self.clock.now();
418 if entry.deadline <= now {
419 log::debug!(
420 "Triggering entry with id {:?} instead of scheduling, since it's already expired.",
421 entry.id()
422 );
423 entry.state.trigger();
424 return;
425 }
426 let id = entry.id().clone();
427 self.insert_entry(id, entry);
428 }
429
430 fn insert_entry(&mut self, id: I, entry: TimerEntry<I, O>) {
431 match self.entries.entry(id) {
432 hash_map::Entry::Occupied(e) => {
433 log::error!(
434 "Attempted to re-insert a timer entry with an already existing id. Scheduled timer ids must be unique! Existing entry: {:?}, new entry: {:?}",
435 e,
436 entry
437 );
438 }
439 hash_map::Entry::Vacant(e) => {
440 let id = entry.id().clone();
441 let deadline = entry.deadline;
442 e.insert(entry);
443 self.entry_queue.push(HeapEntry { deadline, id });
444 }
445 }
446 }
447
448 fn process_due(&mut self, now: SystemTime) {
449 while let Some(scheduled) = self.pop_next_due(now) {
450 scheduled.state.trigger();
451 }
452 }
453
454 #[inline(always)]
455 fn next_deadline(&mut self) -> Option<SystemTime> {
456 self.entry_queue.peek().map(|entry| entry.deadline)
457 }
458
459 fn pop_next_due(&mut self, now: SystemTime) -> Option<TimerEntry<I, O>> {
460 if let Some(top) = self.entry_queue.peek() {
461 if top.deadline > now {
462 return None;
463 }
464 let entry = self.entry_queue.pop().expect("peeked entry");
465 let scheduled = self.entries.remove(&entry.id);
466 if scheduled.is_none() {
467 log::debug!("Skipping entry {entry:?}, because it always already cancelled.");
468 }
469 scheduled
470 } else {
471 None
472 }
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::timers::ClosureTimer;
480 use std::{
481 sync::{
482 Arc,
483 Mutex,
484 Once,
485 atomic::{AtomicUsize, Ordering as AtomicOrdering},
486 },
487 time::Instant,
488 };
489
490 fn init_logger() {
491 static INIT: Once = Once::new();
492 INIT.call_once(|| {
493 let _ = simple_logger::SimpleLogger::new().init();
494 log::set_max_level(log::LevelFilter::Debug);
495 });
496 }
497
498 #[derive(Clone)]
499 struct MockClock {
500 now: Arc<Mutex<SystemTime>>,
501 }
502
503 impl MockClock {
504 fn new(start: SystemTime) -> Self {
505 Self {
506 now: Arc::new(Mutex::new(start)),
507 }
508 }
509
510 fn advance(&self, delta: Duration) {
511 let mut guard = self.now.lock().expect("clock lock");
512 *guard = guard.checked_add(delta).expect("advance");
513 }
514
515 fn set(&self, time: SystemTime) {
516 let mut guard = self.now.lock().expect("clock lock");
517 *guard = time;
518 }
519 }
520
521 impl Clock for MockClock {
522 fn now(&self) -> SystemTime {
523 *self.now.lock().expect("clock lock")
524 }
525 }
526
527 #[derive(Clone, Debug, Default)]
528 struct AtomicCounter {
529 inner: Arc<AtomicUsize>,
530 }
531
532 impl AtomicCounter {
533 fn new() -> Self {
534 Self {
535 inner: Arc::new(AtomicUsize::new(0)),
536 }
537 }
538
539 fn increment(&self) {
540 self.inner.fetch_add(1, AtomicOrdering::SeqCst);
541 }
542
543 fn get(&self) -> usize {
544 self.inner.load(AtomicOrdering::SeqCst)
545 }
546 }
547
548 #[derive(Debug)]
549 struct TestState {
550 id: u64,
551 hits: AtomicCounter,
552 }
553
554 impl State for TestState {
555 type Id = u64;
556
557 fn id(&self) -> &Self::Id {
558 &self.id
559 }
560
561 fn trigger(self) {
562 self.hits.increment();
563 }
564 }
565
566 #[test]
567 fn mock_clock_triggers_on_deadline() {
568 init_logger();
569 let clock = MockClock::new(SystemTime::UNIX_EPOCH);
570 let timer = TimerWithThread::<u64, TestState>::new_with_clock(
571 clock.clone(),
572 Duration::from_millis(5),
573 )
574 .expect("timer");
575 let mut tref = timer.timer_ref();
576 let hits = AtomicCounter::new();
577 let hits2 = AtomicCounter::new();
578
579 let deadline = SystemTime::UNIX_EPOCH + Duration::from_millis(5);
580 tref.schedule_at(
581 deadline,
582 TestState {
583 id: 1,
584 hits: hits.clone(),
585 },
586 )
587 .expect("schedule");
588
589 let later_deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(20);
590 tref.schedule_at(
591 later_deadline,
592 TestState {
593 id: 2,
594 hits: hits2.clone(),
595 },
596 )
597 .expect("schedule");
598
599 thread::sleep(Duration::from_millis(20));
602 assert_eq!(hits.get(), 0);
603 assert_eq!(hits2.get(), 0);
604
605 clock.advance(Duration::from_millis(6));
606 wait_for_hits(&hits, 1, Duration::from_secs(2));
607 assert_eq!(hits.get(), 1);
608 assert_eq!(hits2.get(), 0);
609
610 clock.advance(Duration::from_secs(20));
611 wait_for_hits(&hits2, 1, Duration::from_secs(2));
612 assert_eq!(hits.get(), 1);
613 assert_eq!(hits2.get(), 1);
614
615 timer.shutdown().expect("shutdown");
616 }
617
618 #[test]
619 fn wake_on_message_while_waiting_long_timeout() {
620 init_logger();
621 let timer = TimerWithThread::<u64, TestState>::default();
622 let mut tref = timer.timer_ref();
623
624 let far_hits = AtomicCounter::new();
625 let far_deadline = SystemTime::now() + Duration::from_hours(1000);
626 tref.schedule_at(
627 far_deadline,
628 TestState {
629 id: 1,
630 hits: far_hits.clone(),
631 },
632 )
633 .expect("schedule");
634
635 thread::sleep(Duration::from_millis(5));
637
638 let near_hits = AtomicCounter::new();
639 let near_deadline = SystemTime::now() + Duration::from_millis(50);
640 tref.schedule_at(
641 near_deadline,
642 TestState {
643 id: 2,
644 hits: near_hits.clone(),
645 },
646 )
647 .expect("schedule");
648
649 wait_for_hits(&near_hits, 1, Duration::from_secs(2));
650 assert_eq!(near_hits.get(), 1);
651 assert_eq!(far_hits.get(), 0);
652
653 timer.shutdown().expect("shutdown");
654 }
655
656 #[test]
657 fn time_jump_forward_triggers_immediately() {
658 init_logger();
659 let clock = MockClock::new(SystemTime::UNIX_EPOCH);
660 let timer = TimerWithThread::<u64, TestState>::new_with_clock(
661 clock.clone(),
662 Duration::from_millis(5),
663 )
664 .expect("timer");
665 let mut tref = timer.timer_ref();
666 let hits = AtomicCounter::new();
667 let hits2 = AtomicCounter::new();
668
669 let deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(10);
670 let far_deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(90);
671 tref.schedule_at(
672 deadline,
673 TestState {
674 id: 1,
675 hits: hits.clone(),
676 },
677 )
678 .expect("schedule");
679 tref.schedule_at(
680 far_deadline,
681 TestState {
682 id: 2,
683 hits: hits2.clone(),
684 },
685 )
686 .expect("schedule");
687
688 thread::sleep(Duration::from_millis(20));
690 assert_eq!(hits.get(), 0);
691 assert_eq!(hits2.get(), 0);
692
693 clock.advance(Duration::from_secs(30));
694 wait_for_hits(&hits, 1, Duration::from_secs(2));
695 assert_eq!(hits.get(), 1);
696 assert_eq!(hits2.get(), 0);
697
698 clock.advance(Duration::from_secs(100));
699 wait_for_hits(&hits2, 1, Duration::from_secs(2));
700 assert_eq!(hits.get(), 1);
701 assert_eq!(hits2.get(), 1);
702
703 timer.shutdown().expect("shutdown");
704 }
705
706 #[test]
707 fn time_jump_backward_does_not_trigger_early() {
708 init_logger();
709 let start = SystemTime::UNIX_EPOCH + Duration::from_secs(100);
710 let clock = MockClock::new(start);
711 let timer = TimerWithThread::<u64, TestState>::new_with_clock(
712 clock.clone(),
713 Duration::from_millis(5),
714 )
715 .expect("timer");
716 let mut tref = timer.timer_ref();
717 let hits = AtomicCounter::new();
718 let hits2 = AtomicCounter::new();
719
720 let deadline = start + Duration::from_secs(10);
722 let later_deadline = start + Duration::from_secs(40);
724 tref.schedule_at(
725 deadline,
726 TestState {
727 id: 1,
728 hits: hits.clone(),
729 },
730 )
731 .expect("schedule");
732 tref.schedule_at(
733 later_deadline,
734 TestState {
735 id: 2,
736 hits: hits2.clone(),
737 },
738 )
739 .expect("schedule");
740
741 thread::sleep(Duration::from_millis(20));
743 assert_eq!(hits.get(), 0);
744 assert_eq!(hits2.get(), 0);
745
746 clock.set(start - Duration::from_secs(30));
748 thread::sleep(Duration::from_millis(20));
750 assert_eq!(hits.get(), 0);
752 assert_eq!(hits2.get(), 0);
753
754 clock.advance(Duration::from_secs(20));
756 thread::sleep(Duration::from_millis(20));
758 assert_eq!(hits.get(), 0);
760 assert_eq!(hits2.get(), 0);
761
762 clock.advance(Duration::from_secs(21));
764 wait_for_hits(&hits, 1, Duration::from_secs(2));
765 assert_eq!(hits.get(), 1);
766 assert_eq!(hits2.get(), 0);
767
768 clock.advance(Duration::from_secs(31));
770 wait_for_hits(&hits2, 1, Duration::from_secs(2));
771 assert_eq!(hits.get(), 1);
772 assert_eq!(hits2.get(), 1);
773
774 timer.shutdown().expect("shutdown");
775 }
776
777 #[test]
778 fn closure_timer_schedules_actions() {
779 init_logger();
780 let clock = MockClock::new(SystemTime::UNIX_EPOCH);
781 let timer = TimerWithThread::<u64, crate::timers::ClosureState<u64>>::new_with_clock(
782 clock.clone(),
783 Duration::from_millis(5),
784 )
785 .expect("timer");
786 let mut tref = timer.timer_ref();
787
788 let hits = AtomicCounter::new();
789 let hits2 = AtomicCounter::new();
790
791 let hits_clone = hits.clone();
792 tref.schedule_action_at(
793 1,
794 SystemTime::UNIX_EPOCH + Duration::from_secs(5),
795 move |_| {
796 hits_clone.increment();
797 },
798 )
799 .expect("schedule");
800 let hits2_clone = hits2.clone();
801 tref.schedule_action_at(
802 2,
803 SystemTime::UNIX_EPOCH + Duration::from_secs(50),
804 move |_| {
805 hits2_clone.increment();
806 },
807 )
808 .expect("schedule");
809
810 thread::sleep(Duration::from_millis(20));
811 assert_eq!(hits.get(), 0);
812 assert_eq!(hits2.get(), 0);
813
814 clock.advance(Duration::from_secs(10));
815 wait_for_hits(&hits, 1, Duration::from_secs(2));
816 assert_eq!(hits.get(), 1);
817 assert_eq!(hits2.get(), 0);
818
819 clock.advance(Duration::from_secs(50));
820 wait_for_hits(&hits2, 1, Duration::from_secs(2));
821 assert_eq!(hits2.get(), 1);
822 timer.shutdown().expect("shutdown");
823 }
824
825 #[test]
826 fn cancel_prevents_overdue_trigger_with_multiple_timers() {
827 init_logger();
828 let clock = MockClock::new(SystemTime::UNIX_EPOCH);
829 let timer = TimerWithThread::<u64, TestState>::new_with_clock(
830 clock.clone(),
831 Duration::from_millis(5),
832 )
833 .expect("timer");
834 let mut tref = timer.timer_ref();
835 let hits = AtomicCounter::new();
836 let hits2 = AtomicCounter::new();
837
838 let deadline = SystemTime::UNIX_EPOCH + Duration::from_secs(5);
839 tref.schedule_at(
840 deadline,
841 TestState {
842 id: 1,
843 hits: hits.clone(),
844 },
845 )
846 .expect("schedule");
847 tref.schedule_at(
848 deadline,
849 TestState {
850 id: 2,
851 hits: hits2.clone(),
852 },
853 )
854 .expect("schedule");
855
856 thread::sleep(Duration::from_millis(20));
858
859 tref.cancel(1).expect("cancel");
860 thread::sleep(Duration::from_millis(20));
861 assert_eq!(hits.get(), 0);
862 assert_eq!(hits2.get(), 0);
863
864 clock.advance(Duration::from_secs(6));
865 wait_for_hits(&hits2, 1, Duration::from_secs(2));
866 assert_eq!(hits.get(), 0);
867 assert_eq!(hits2.get(), 1);
868
869 timer.shutdown().expect("shutdown");
870 }
871
872 #[test]
873 fn join_thread_error_from_panicking_handler() {
874 init_logger();
875 let timer = TimerWithThread::<u64, crate::timers::ClosureState<u64>>::new(DEFAULT_MAX_WAIT)
876 .expect("timer");
877 let mut tref = timer.timer_ref();
878 tref.schedule_action_at(1, SystemTime::now(), |_| panic!("boom"))
879 .expect("schedule");
880 thread::sleep(Duration::from_millis(10));
881 let err = timer.shutdown().expect_err("expected shutdown error");
882 const POSSIBLE_ERRORS: [ThreadTimerError; 2] =
884 [ThreadTimerError::JoinThread, ThreadTimerError::SendMessage];
885 assert!(
886 POSSIBLE_ERRORS.contains(&err),
887 "Should have gotten a shutdown error but was: {err}"
888 );
889 }
890
891 fn wait_for_hits(hits: &AtomicCounter, expected: usize, timeout: Duration) {
892 let start = Instant::now();
893 'wait_loop: while start.elapsed() < timeout {
894 if hits.get() >= expected {
895 break 'wait_loop;
896 }
897 thread::yield_now();
898 }
899 }
900}