monotonic_timer/lib.rs
1//! A simple timer, used to enqueue operations meant to be executed at
2//! a given time or after a given delay.
3
4use std::cmp::Ordering;
5use std::collections::BinaryHeap;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::Ordering as AtomicOrdering;
8use std::sync::mpsc::{channel, Sender};
9use std::sync::{Arc, Condvar, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13/// An item scheduled for delayed execution.
14struct Schedule<T> {
15 /// The instant at which to execute.
16 date: Instant,
17
18 /// The schedule data.
19 data: T,
20
21 /// A mechanism to cancel execution of an item.
22 guard: Guard,
23
24 /// If `Some(d)`, the item must be repeated every interval of
25 /// length `d`, until cancelled.
26 repeat: Option<Duration>,
27}
28impl<T> Ord for Schedule<T> {
29 fn cmp(&self, other: &Self) -> Ordering {
30 self.date.cmp(&other.date).reverse()
31 }
32}
33impl<T> PartialOrd for Schedule<T> {
34 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
35 Some(self.cmp(other))
36 }
37}
38impl<T> Eq for Schedule<T> {}
39impl<T> PartialEq for Schedule<T> {
40 fn eq(&self, other: &Self) -> bool {
41 self.date.eq(&other.date)
42 }
43}
44
45/// An operation to be sent across threads.
46enum Op<T> {
47 /// Schedule a new item for execution.
48 Schedule(Schedule<T>),
49
50 /// Stop the thread.
51 Stop,
52}
53
54/// A mutex-based kind-of-channel used to communicate between the
55/// Communication thread and the Scheduler thread.
56struct WaiterChannel<T> {
57 /// Pending messages.
58 messages: Mutex<Vec<Op<T>>>,
59 /// A condition variable used for waiting.
60 condvar: Condvar,
61}
62impl<T> WaiterChannel<T> {
63 fn with_capacity(cap: usize) -> Self {
64 WaiterChannel {
65 messages: Mutex::new(Vec::with_capacity(cap)),
66 condvar: Condvar::new(),
67 }
68 }
69}
70
71/// A trait that allows configurable execution of scheduled item
72/// on the scheduler thread.
73trait Executor<T> {
74 // Due to difference in use between Box<FnMut()> and most other data
75 // types, this trait requires implementors to provide two implementations
76 // of execute. While both of these functions execute the data item
77 // they differ on whether they make an equivalent data item available
78 // to the Scheduler to store in recurring schedules.
79 //
80 // execute() is called whenever a non-recurring data item needs
81 // to be executed, and consumes the data item in the process.
82 //
83 // execute_clone() is called whenever a recurring data item needs
84 // to be executed, and produces a new equivalent data item. This
85 // function should be more or less equivalent to:
86 //
87 // fn execute_clone(&mut self, data : T) -> T {
88 // self.execute(data.clone());
89 // data
90 // }
91
92 fn execute(&mut self, data: T);
93
94 fn execute_clone(&mut self, data: T) -> T;
95}
96
97/// An executor implementation for executing callbacks on the scheduler
98/// thread.
99struct CallbackExecutor;
100
101impl Executor<Box<dyn FnMut() + Send>> for CallbackExecutor {
102 fn execute(&mut self, mut data: Box<dyn FnMut() + Send>) {
103 data();
104 }
105
106 fn execute_clone(&mut self, mut data: Box<dyn FnMut() + Send>) -> Box<dyn FnMut() + Send> {
107 data();
108 data
109 }
110}
111
112/// An executor implementation for delivering messages to a channel.
113struct DeliveryExecutor<T>
114where
115 T: 'static + Send,
116{
117 /// The channel to deliver messages to.
118 tx: Sender<T>,
119}
120
121impl<T> Executor<T> for DeliveryExecutor<T>
122where
123 T: 'static + Send + Clone,
124{
125 fn execute(&mut self, data: T) {
126 let _ = self.tx.send(data);
127 }
128
129 fn execute_clone(&mut self, data: T) -> T {
130 let _ = self.tx.send(data.clone());
131 data
132 }
133}
134
135struct Scheduler<T, E>
136where
137 E: Executor<T>,
138{
139 waiter: Arc<WaiterChannel<T>>,
140 heap: BinaryHeap<Schedule<T>>,
141 executor: E,
142}
143
144impl<T, E> Scheduler<T, E>
145where
146 E: Executor<T>,
147{
148 fn with_capacity(waiter: Arc<WaiterChannel<T>>, executor: E, capacity: usize) -> Self {
149 Scheduler {
150 waiter,
151 executor,
152 heap: BinaryHeap::with_capacity(capacity),
153 }
154 }
155
156 fn run(&mut self) {
157 enum Sleep {
158 NotAtAll,
159 UntilAwakened,
160 AtMost(Duration),
161 }
162
163 let waiter = &(*self.waiter);
164 loop {
165 let mut sleep = if let Some(sched) = self.heap.peek() {
166 let now = Instant::now();
167 if sched.date > now {
168 // First item is not ready yet, so we need to
169 // wait until it is or something happens.
170 Sleep::AtMost(sched.date.duration_since(now))
171 } else {
172 // At this stage, we have an item that has reached
173 // execution time. The `unwrap()` is guaranteed to
174 // succeed.
175 let sched = self.heap.pop().unwrap();
176
177 // The item we just popped might have been killed.
178 // Let's check that before executing.
179 if sched.guard.should_execute() {
180 // We have something to do.
181 if let Some(delta) = sched.repeat {
182 let data = self.executor.execute_clone(sched.data);
183
184 // This is a repeating timer, so we need to
185 // enqueue the next call.
186 self.heap.push(Schedule {
187 date: sched.date + delta,
188 data,
189 guard: sched.guard,
190 repeat: Some(delta),
191 });
192 } else {
193 self.executor.execute(sched.data);
194 }
195 }
196
197 // We have just popped an item, but it might be too early
198 // to go back to sleep. Maybe the next item will need to
199 // be executed immediately.
200 // We do not `continue`, to ensure the `waiter.messages`
201 // are checked before next item is executed.
202 Sleep::NotAtAll
203 }
204 } else {
205 // Nothing to do
206 Sleep::UntilAwakened
207 };
208
209 let mut lock = waiter.messages.lock().unwrap();
210 // Pop all messages.
211 for msg in lock.drain(..) {
212 match msg {
213 Op::Stop => {
214 // Stop immediately, even if there are any pending timer actions.
215 return;
216 }
217 Op::Schedule(sched) => {
218 self.heap.push(sched);
219 // New item was added to heap, we must check if sleep
220 // is needed or not, hence we cannot sleep
221 sleep = Sleep::NotAtAll;
222 }
223 }
224 }
225
226 match sleep {
227 Sleep::UntilAwakened => {
228 let _ = waiter.condvar.wait(lock);
229 }
230 Sleep::AtMost(delay) => {
231 let sec = delay.as_secs();
232 let ns = delay.subsec_nanos();
233 let duration = Duration::new(sec, ns);
234 let _ = waiter.condvar.wait_timeout(lock, duration);
235 }
236 Sleep::NotAtAll => {}
237 }
238 }
239 }
240}
241
242/// Shared coordination logic for timer threads.
243pub struct TimerBase<T>
244where
245 T: 'static + Send,
246{
247 /// Sender used to communicate with the _Communication_ thread. In
248 /// turn, this thread will send
249 tx: Sender<Op<T>>,
250}
251
252impl<T> Drop for TimerBase<T>
253where
254 T: 'static + Send,
255{
256 /// Stop the timer threads.
257 fn drop(&mut self) {
258 self.tx.send(Op::Stop).unwrap();
259 }
260}
261
262impl<T> TimerBase<T>
263where
264 T: 'static + Send,
265{
266 /// Create a timer base.
267 ///
268 /// This immediately launches two threads, which will remain
269 /// launched until the timer is dropped. As expected, the threads
270 /// spend most of their life waiting for instructions.
271 fn new<E>(executor: E) -> Self
272 where
273 E: 'static + Executor<T> + Send,
274 {
275 Self::with_capacity(executor, 32)
276 }
277
278 /// As `new()`, but with a manually specified initial capacity.
279 fn with_capacity<E>(executor: E, capacity: usize) -> Self
280 where
281 E: 'static + Executor<T> + Send,
282 {
283 let waiter_send = Arc::new(WaiterChannel::with_capacity(capacity));
284 let waiter_recv = waiter_send.clone();
285
286 // Spawn a first thread, whose sole role is to dispatch
287 // messages to the second thread without having to wait too
288 // long for the mutex.
289 let (tx, rx) = channel();
290 thread::spawn(move || {
291 use Op::*;
292 let waiter = &(*waiter_send);
293 for msg in rx.iter() {
294 let mut vec = waiter.messages.lock().unwrap();
295 match msg {
296 Schedule(sched) => {
297 vec.push(Schedule(sched));
298 waiter.condvar.notify_one();
299 }
300 Stop => {
301 vec.clear();
302 vec.push(Stop);
303 waiter.condvar.notify_one();
304 return;
305 }
306 }
307 }
308 });
309
310 // Spawn a second thread, in charge of scheduling.
311 thread::Builder::new()
312 .name("Timer thread".to_owned())
313 .spawn(move || {
314 let mut scheduler = Scheduler::with_capacity(waiter_recv, executor, capacity);
315 scheduler.run()
316 })
317 .unwrap();
318 TimerBase { tx }
319 }
320
321 pub fn schedule_with_delay(&self, delay: Duration, data: T) -> Guard {
322 self.schedule(Instant::now() + delay, None, data)
323 }
324
325 pub fn schedule_repeating(&self, repeat: Duration, data: T) -> Guard {
326 self.schedule(Instant::now() + repeat, Some(repeat), data)
327 }
328
329 pub fn schedule(&self, date: Instant, repeat: Option<Duration>, data: T) -> Guard {
330 let guard = Guard::new();
331 self.tx
332 .send(Op::Schedule(Schedule {
333 date,
334 data,
335 guard: guard.clone(),
336 repeat,
337 }))
338 .unwrap();
339 guard
340 }
341}
342
343/// A monotonic timer, used to schedule execution of callbacks at a later date.
344///
345/// In the current implementation, each timer is executed as two
346/// threads. The _Scheduler_ thread is in charge of maintaining the
347/// queue of callbacks to execute and of actually executing them. The
348/// _Communication_ thread is in charge of communicating with the
349/// _Scheduler_ thread (which requires acquiring a possibly-long-held
350/// Mutex) without blocking the caller thread.
351pub struct Timer {
352 base: TimerBase<Box<dyn FnMut() + Send>>,
353}
354
355impl Timer {
356 /// Create a timer.
357 ///
358 /// This immediately launches two threads, which will remain
359 /// launched until the timer is dropped. As expected, the threads
360 /// spend most of their life waiting for instructions.
361 pub fn new() -> Self {
362 Timer {
363 base: TimerBase::new(CallbackExecutor),
364 }
365 }
366
367 /// As `new()`, but with a manually specified initial capacity.
368 pub fn with_capacity(capacity: usize) -> Self {
369 Timer {
370 base: TimerBase::with_capacity(CallbackExecutor, capacity),
371 }
372 }
373
374 /// Schedule a callback for execution after a delay.
375 ///
376 /// Callbacks are guaranteed to never be called before the
377 /// delay. However, it is possible that they will be called a
378 /// little after the delay.
379 ///
380 /// If the delay is negative or 0, the callback is executed as
381 /// soon as possible.
382 ///
383 /// This method returns a `Guard` object. If that `Guard` is
384 /// dropped, execution is cancelled.
385 ///
386 /// # Performance
387 ///
388 /// The callback is executed on the Scheduler thread. It should
389 /// therefore terminate very quickly, or risk causing delaying
390 /// other callbacks.
391 ///
392 /// # Failures
393 ///
394 /// Any failure in `cb` will scheduler thread and progressively
395 /// contaminate the Timer and the calling thread itself. You have
396 /// been warned.
397 ///
398 /// # Example
399 ///
400 /// ```
401 /// extern crate monotonic_timer;
402 /// use std::sync::mpsc::channel;
403 /// use std::time::Duration;
404 /// let timer = monotonic_timer::Timer::new();
405 /// let (tx, rx) = channel();
406 ///
407 /// let _guard = timer.schedule_with_delay(Duration::from_secs(3), move || {
408 /// // This closure is executed on the scheduler thread,
409 /// // so we want to move it away asap.
410 ///
411 /// let _ignored = tx.send(()); // Avoid unwrapping here.
412 /// });
413 ///
414 /// rx.recv().unwrap();
415 /// println!("This code has been executed after 3 seconds");
416 /// ```
417 pub fn schedule_with_delay<F>(&self, delay: Duration, cb: F) -> Guard
418 where
419 F: 'static + FnMut() + Send,
420 {
421 self.base.schedule_with_delay(delay, Box::new(cb))
422 }
423
424 /// Schedule a callback for execution once per interval.
425 ///
426 /// Callbacks are guaranteed to never be called before their
427 /// date. However, it is possible that they will be called a
428 /// little after it.
429 ///
430 /// This method returns a `Guard` object. If that `Guard` is
431 /// dropped, repeat is stopped.
432 ///
433 ///
434 /// # Performance
435 ///
436 /// The callback is executed on the Scheduler thread. It should
437 /// therefore terminate very quickly, or risk causing delaying
438 /// other callbacks.
439 ///
440 /// # Failures
441 ///
442 /// Any failure in `cb` will scheduler thread and progressively
443 /// contaminate the Timer and the calling thread itself. You have
444 /// been warned.
445 ///
446 /// # Example
447 ///
448 /// ```
449 /// extern crate monotonic_timer;
450 /// use std::thread;
451 /// use std::sync::{Arc, Mutex};
452 /// use std::time::Duration;
453 /// let timer = monotonic_timer::Timer::new();
454 /// // Number of times the callback has been called.
455 /// let count = Arc::new(Mutex::new(0));
456 ///
457 /// // Start repeating. Each callback increases `count`.
458 /// let guard = {
459 /// let count = count.clone();
460 /// timer.schedule_repeating(Duration::from_millis(5), move || {
461 /// *count.lock().unwrap() += 1;
462 /// })
463 /// };
464 ///
465 /// // Sleep one second. The callback should be called ~200 times.
466 /// thread::sleep(Duration::from_secs(1));
467 /// let count_result = *count.lock().unwrap();
468 /// assert!(190 <= count_result && count_result <= 210,
469 /// "The timer was called {} times", count_result);
470 ///
471 /// // Now drop the guard. This should stop the timer.
472 /// drop(guard);
473 /// thread::sleep(Duration::new(0, 100));
474 ///
475 /// // Let's check that the count stops increasing.
476 /// let count_start = *count.lock().unwrap();
477 /// thread::sleep(Duration::from_secs(1));
478 /// let count_stop = *count.lock().unwrap();
479 /// assert_eq!(count_start, count_stop);
480 /// ```
481 pub fn schedule_repeating<F>(&self, repeat: Duration, cb: F) -> Guard
482 where
483 F: 'static + FnMut() + Send,
484 {
485 self.base.schedule_repeating(repeat, Box::new(cb))
486 }
487
488 /// Schedule a callback for execution at a given time, then once
489 /// per interval. A typical use case is to execute code once per
490 /// day at 12am.
491 ///
492 /// Callbacks are guaranteed to never be called before their
493 /// date. However, it is possible that they will be called a
494 /// little after it.
495 ///
496 /// This method returns a `Guard` object. If that `Guard` is
497 /// dropped, repeat is stopped.
498 ///
499 ///
500 /// # Performance
501 ///
502 /// The callback is executed on the Scheduler thread. It should
503 /// therefore terminate very quickly, or risk causing delaying
504 /// other callbacks.
505 ///
506 /// # Failures
507 ///
508 /// Any failure in `cb` will scheduler thread and progressively
509 /// contaminate the Timer and the calling thread itself. You have
510 /// been warned.
511 pub fn schedule<F>(&self, date: Instant, repeat: Option<Duration>, cb: F) -> Guard
512 where
513 F: 'static + FnMut() + Send,
514 {
515 self.base.schedule(date, repeat, Box::new(cb))
516 }
517}
518
519impl Default for Timer {
520 fn default() -> Self {
521 Self::new()
522 }
523}
524
525/// A monotonic timer, used to schedule delivery of messages at a later date.
526///
527/// In the current implementation, each timer is executed as two
528/// threads. The _Scheduler_ thread is in charge of maintaining the
529/// queue of messages to deliver and of actually deliverying them. The
530/// _Communication_ thread is in charge of communicating with the
531/// _Scheduler_ thread (which requires acquiring a possibly-long-held
532/// Mutex) without blocking the caller thread.
533///
534/// Similar functionality could be implemented using the generic Timer
535/// type, however, using MessageTimer has two performance advantages
536/// over doing so. First, MessageTimer does not need to heap allocate
537/// a closure for each scheduled item, since the messages to queue are
538/// passed directly. Second, MessageTimer avoids the dynamic dispatch
539/// overhead associated with invoking the closure functions.
540pub struct MessageTimer<T>
541where
542 T: 'static + Send + Clone,
543{
544 base: TimerBase<T>,
545}
546
547impl<T> MessageTimer<T>
548where
549 T: 'static + Send + Clone,
550{
551 /// Create a message timer.
552 ///
553 /// This immediately launches two threads, which will remain
554 /// launched until the timer is dropped. As expected, the threads
555 /// spend most of their life waiting for instructions.
556 pub fn new(tx: Sender<T>) -> Self {
557 MessageTimer {
558 base: TimerBase::new(DeliveryExecutor { tx }),
559 }
560 }
561
562 /// As `new()`, but with a manually specified initial capacity.
563 pub fn with_capacity(tx: Sender<T>, capacity: usize) -> Self {
564 MessageTimer {
565 base: TimerBase::with_capacity(DeliveryExecutor { tx }, capacity),
566 }
567 }
568
569 /// Schedule a message for delivery after a delay.
570 ///
571 /// Messages are guaranteed to never be delivered before the
572 /// delay. However, it is possible that they will be delivered a
573 /// little after the delay.
574 ///
575 /// If the delay is negative or 0, the message is delivered as
576 /// soon as possible.
577 ///
578 /// This method returns a `Guard` object. If that `Guard` is
579 /// dropped, delivery is cancelled.
580 ///
581 ///
582 /// # Example
583 ///
584 /// ```
585 /// extern crate monotonic_timer;
586 ///
587 /// use std::sync::mpsc::channel;
588 /// use std::time::Duration;
589 ///
590 /// let (tx, rx) = channel();
591 /// let timer = monotonic_timer::MessageTimer::new(tx);
592 /// let _guard = timer.schedule_with_delay(Duration::from_secs(3), 3);
593 ///
594 /// rx.recv().unwrap();
595 /// println!("This code has been executed after 3 seconds");
596 /// ```
597 pub fn schedule_with_delay(&self, delay: Duration, msg: T) -> Guard {
598 self.base.schedule_with_delay(delay, msg)
599 }
600
601 /// Schedule a message for delivery once per interval.
602 ///
603 /// Messages are guaranteed to never be delivered before their
604 /// date. However, it is possible that they will be delivered a
605 /// little after it.
606 ///
607 /// This method returns a `Guard` object. If that `Guard` is
608 /// dropped, repeat is stopped.
609 ///
610 ///
611 /// # Performance
612 ///
613 /// The message is cloned on the Scheduler thread. Cloning of
614 /// messages should therefore succeed very quickly, or risk
615 /// delaying other messages.
616 ///
617 /// # Failures
618 ///
619 /// Any failure in cloning of messages will occur on the scheduler thread
620 /// and will contaminate the Timer and the calling thread itself. You have
621 /// been warned.
622 ///
623 /// # Example
624 ///
625 /// ```
626 /// extern crate monotonic_timer;
627 /// use std::sync::mpsc::channel;
628 /// use std::time::Duration;
629 ///
630 /// let (tx, rx) = channel();
631 /// let timer = monotonic_timer::MessageTimer::new(tx);
632 ///
633 /// // Start repeating.
634 /// let guard = timer.schedule_repeating(Duration::from_millis(5), 0);
635 ///
636 /// let mut count = 0;
637 /// while count < 5 {
638 /// let _ = rx.recv();
639 /// println!("Prints every 5 milliseconds");
640 /// count += 1;
641 /// }
642 /// ```
643 pub fn schedule_repeating(&self, repeat: Duration, msg: T) -> Guard {
644 self.base.schedule_repeating(repeat, msg)
645 }
646
647 /// Schedule a message for delivery at a given time, then once
648 /// per interval. A typical use case is to execute code once per
649 /// day at 12am.
650 ///
651 /// Messages are guaranteed to never be delivered before their
652 /// date. However, it is possible that they will be delivered a
653 /// little after it.
654 ///
655 /// This method returns a `Guard` object. If that `Guard` is
656 /// dropped, repeat is stopped.
657 ///
658 /// # Performance
659 ///
660 /// The message is cloned on the Scheduler thread. Cloning of
661 /// messages should therefore succeed very quickly, or risk
662 /// delaying other messages.
663 ///
664 /// # Failures
665 ///
666 /// Any failure in cloning of messages will occur on the scheduler thread
667 /// and will contaminate the Timer and the calling thread itself. You have
668 /// been warned.
669 pub fn schedule<D>(&self, date: Instant, repeat: Option<Duration>, msg: T) -> Guard {
670 self.base.schedule(date, repeat, msg)
671 }
672}
673
674/// A value scoping a schedule. When this value is dropped, the
675/// schedule is cancelled.
676#[derive(Clone)]
677pub struct Guard {
678 should_execute: Arc<AtomicBool>,
679 ignore_drop: bool,
680}
681impl Guard {
682 fn new() -> Self {
683 Guard {
684 should_execute: Arc::new(AtomicBool::new(true)),
685 ignore_drop: false,
686 }
687 }
688 fn should_execute(&self) -> bool {
689 self.should_execute.load(AtomicOrdering::Relaxed)
690 }
691
692 /// Ignores the guard, preventing it from disabling the scheduled
693 /// item. This can be used to avoid maintaining a Guard handle
694 /// for items that will never be cancelled.
695 pub fn ignore(mut self) {
696 self.ignore_drop = true;
697 }
698}
699impl Drop for Guard {
700 /// Cancel a schedule.
701 fn drop(&mut self) {
702 if !self.ignore_drop {
703 self.should_execute.store(false, AtomicOrdering::Relaxed)
704 }
705 }
706}
707
708#[cfg(test)]
709mod tests {
710 extern crate std;
711 use super::*;
712 use std::sync::mpsc::channel;
713 use std::sync::{Arc, Mutex};
714 use std::thread;
715 use std::time::Duration;
716
717 #[test]
718 fn test_schedule_with_delay() {
719 let timer = Timer::new();
720 let (tx, rx) = channel();
721 let mut guards = vec![];
722
723 // Schedule a number of callbacks in an arbitrary order, make sure
724 // that they are executed in the right order.
725 let mut delays = vec![1, 5, 3, 0];
726 let start = Instant::now();
727 for i in delays.clone() {
728 println!("Scheduling for execution in {} seconds", i);
729 let tx = tx.clone();
730 guards.push(timer.schedule_with_delay(Duration::from_secs(i), move || {
731 println!("Callback {}", i);
732 tx.send(i).unwrap();
733 }));
734 }
735
736 delays.sort();
737 for (i, msg) in (0..delays.len()).zip(rx.iter()) {
738 let elapsed = start.elapsed().as_secs();
739 println!("Received message {} after {} seconds", msg, elapsed);
740 assert_eq!(msg, delays[i]);
741 assert!(
742 delays[i] <= elapsed && elapsed <= delays[i] + 3,
743 "We have waited {} seconds, expecting [{}, {}]",
744 elapsed,
745 delays[i],
746 delays[i] + 3
747 );
748 }
749
750 // Now make sure that callbacks that are designed to be executed
751 // immediately are executed quickly.
752 let start = Instant::now();
753 for i in vec![10, 0] {
754 println!("Scheduling for execution in {} seconds", i);
755 let tx = tx.clone();
756 guards.push(timer.schedule_with_delay(Duration::from_secs(i), move || {
757 println!("Callback {}", i);
758 tx.send(i).unwrap();
759 }));
760 }
761
762 assert_eq!(rx.recv().unwrap(), 0);
763 assert!(start.elapsed() <= Duration::from_secs(1));
764 }
765
766 #[test]
767 fn test_message_timer() {
768 let (tx, rx) = channel();
769 let timer = MessageTimer::new(tx);
770 let start = Instant::now();
771
772 let mut delays = vec![400, 300, 100, 500, 200];
773 for delay in delays.clone() {
774 timer
775 .schedule_with_delay(Duration::from_millis(delay), delay)
776 .ignore();
777 }
778
779 delays.sort();
780 for delay in delays {
781 assert_eq!(rx.recv().unwrap(), delay);
782 }
783 assert!(start.elapsed() <= Duration::from_secs(1));
784 }
785
786 #[test]
787 fn test_guards() {
788 println!("Testing that callbacks aren't called if the guard is dropped");
789 let timer = Timer::new();
790 let called = Arc::new(Mutex::new(false));
791
792 for i in 0..10 {
793 let called = called.clone();
794 timer.schedule_with_delay(Duration::from_millis(i), move || {
795 *called.lock().unwrap() = true;
796 });
797 }
798
799 thread::sleep(Duration::from_secs(1));
800 assert_eq!(*called.lock().unwrap(), false);
801 }
802
803 #[test]
804 fn test_guard_ignore() {
805 let timer = Timer::new();
806 let called = Arc::new(Mutex::new(false));
807
808 {
809 let called = called.clone();
810 timer
811 .schedule_with_delay(Duration::from_millis(1), move || {
812 *called.lock().unwrap() = true;
813 })
814 .ignore();
815 }
816
817 thread::sleep(Duration::from_secs(1));
818 assert_eq!(*called.lock().unwrap(), true);
819 }
820
821 struct NoCloneMessage;
822
823 impl Clone for NoCloneMessage {
824 fn clone(&self) -> Self {
825 panic!("TestMessage should not be cloned");
826 }
827 }
828
829 #[test]
830 fn test_no_clone() {
831 // Make sure that, if no schedule is supplied to a MessageTimer
832 // the message instances are not cloned.
833 let (tx, rx) = channel();
834 let timer = MessageTimer::new(tx);
835 timer
836 .schedule_with_delay(Duration::from_millis(0), NoCloneMessage)
837 .ignore();
838 timer
839 .schedule_with_delay(Duration::from_millis(0), NoCloneMessage)
840 .ignore();
841
842 for _ in 0..2 {
843 let _ = rx.recv();
844 }
845 }
846
847 #[test]
848 fn test_too_much_work() {
849 // Make sure that even if the timer has too much work, tasks still get executed
850 // and dropping the timer still kills future tasks.
851
852 // To do this, we schedule a task longer to execute than its `repeat` interval.
853 let timer = Timer::new();
854 let was_called = Arc::new(Mutex::new(false));
855 let was_called_2 = Arc::new(Mutex::new(false));
856
857 {
858 let was_called = was_called.clone();
859 // Schedule a task longer than repeat time
860 timer
861 .schedule(Instant::now(), Some(Duration::from_millis(10)), move || {
862 thread::sleep(Duration::from_millis(30));
863 *was_called.lock().unwrap() = true;
864 })
865 .ignore();
866 let was_called_2 = was_called_2.clone();
867
868 // Now schedule another task.
869 timer
870 .schedule(Instant::now(), None, move || {
871 thread::sleep(Duration::from_millis(30));
872 *was_called_2.lock().unwrap() = true;
873 })
874 .ignore();
875 }
876
877 // Check that both our tasks were executed.
878 thread::sleep(Duration::from_millis(150));
879 assert!(
880 *was_called.lock().unwrap(),
881 "Periodic task should have been called"
882 );
883 assert!(
884 *was_called_2.lock().unwrap(),
885 "One-time task should have been called"
886 );
887
888 // Now drop the timer. This should stop any task from being executed.
889 drop(timer);
890
891 // Check that the periodic task isn't executed anymore.
892 // First, we wait in case we haven't finished executing it,
893 // then we reset it and check that it isn't executed.
894 thread::sleep(Duration::from_millis(150));
895 *was_called.lock().unwrap() = false;
896 thread::sleep(Duration::from_millis(200));
897 assert!(
898 !*was_called.lock().unwrap(),
899 "Task should have been stopped when the timer dropped"
900 );
901 }
902}