monotonic_timer/
lib.rs

1//! A simple timer, used to enqueue operations meant to be executed at
2//! a given time or after a given delay.
3
4use std::cmp::Ordering;
5use std::collections::BinaryHeap;
6use std::sync::atomic::AtomicBool;
7use std::sync::atomic::Ordering as AtomicOrdering;
8use std::sync::mpsc::{channel, Sender};
9use std::sync::{Arc, Condvar, Mutex};
10use std::thread;
11use std::time::{Duration, Instant};
12
13/// An item scheduled for delayed execution.
14struct Schedule<T> {
15    /// The instant at which to execute.
16    date: Instant,
17
18    /// The schedule data.
19    data: T,
20
21    /// A mechanism to cancel execution of an item.
22    guard: Guard,
23
24    /// If `Some(d)`, the item must be repeated every interval of
25    /// length `d`, until cancelled.
26    repeat: Option<Duration>,
27}
28impl<T> Ord for Schedule<T> {
29    fn cmp(&self, other: &Self) -> Ordering {
30        self.date.cmp(&other.date).reverse()
31    }
32}
33impl<T> PartialOrd for Schedule<T> {
34    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
35        Some(self.cmp(other))
36    }
37}
38impl<T> Eq for Schedule<T> {}
39impl<T> PartialEq for Schedule<T> {
40    fn eq(&self, other: &Self) -> bool {
41        self.date.eq(&other.date)
42    }
43}
44
45/// An operation to be sent across threads.
46enum Op<T> {
47    /// Schedule a new item for execution.
48    Schedule(Schedule<T>),
49
50    /// Stop the thread.
51    Stop,
52}
53
54/// A mutex-based kind-of-channel used to communicate between the
55/// Communication thread and the Scheduler thread.
56struct WaiterChannel<T> {
57    /// Pending messages.
58    messages: Mutex<Vec<Op<T>>>,
59    /// A condition variable used for waiting.
60    condvar: Condvar,
61}
62impl<T> WaiterChannel<T> {
63    fn with_capacity(cap: usize) -> Self {
64        WaiterChannel {
65            messages: Mutex::new(Vec::with_capacity(cap)),
66            condvar: Condvar::new(),
67        }
68    }
69}
70
71/// A trait that allows configurable execution of scheduled item
72/// on the scheduler thread.
73trait Executor<T> {
74    // Due to difference in use between Box<FnMut()> and most other data
75    // types, this trait requires implementors to provide two implementations
76    // of execute. While both of these functions execute the data item
77    // they differ on whether they make an equivalent data item available
78    // to the Scheduler to store in recurring schedules.
79    //
80    // execute() is called whenever a non-recurring data item needs
81    // to be executed, and consumes the data item in the process.
82    //
83    // execute_clone() is called whenever a recurring data item needs
84    // to be executed, and produces a new equivalent data item. This
85    // function should be more or less equivalent to:
86    //
87    // fn execute_clone(&mut self, data : T) -> T {
88    //   self.execute(data.clone());
89    //   data
90    // }
91
92    fn execute(&mut self, data: T);
93
94    fn execute_clone(&mut self, data: T) -> T;
95}
96
97/// An executor implementation for executing callbacks on the scheduler
98/// thread.
99struct CallbackExecutor;
100
101impl Executor<Box<dyn FnMut() + Send>> for CallbackExecutor {
102    fn execute(&mut self, mut data: Box<dyn FnMut() + Send>) {
103        data();
104    }
105
106    fn execute_clone(&mut self, mut data: Box<dyn FnMut() + Send>) -> Box<dyn FnMut() + Send> {
107        data();
108        data
109    }
110}
111
112/// An executor implementation for delivering messages to a channel.
113struct DeliveryExecutor<T>
114where
115    T: 'static + Send,
116{
117    /// The channel to deliver messages to.
118    tx: Sender<T>,
119}
120
121impl<T> Executor<T> for DeliveryExecutor<T>
122where
123    T: 'static + Send + Clone,
124{
125    fn execute(&mut self, data: T) {
126        let _ = self.tx.send(data);
127    }
128
129    fn execute_clone(&mut self, data: T) -> T {
130        let _ = self.tx.send(data.clone());
131        data
132    }
133}
134
135struct Scheduler<T, E>
136where
137    E: Executor<T>,
138{
139    waiter: Arc<WaiterChannel<T>>,
140    heap: BinaryHeap<Schedule<T>>,
141    executor: E,
142}
143
144impl<T, E> Scheduler<T, E>
145where
146    E: Executor<T>,
147{
148    fn with_capacity(waiter: Arc<WaiterChannel<T>>, executor: E, capacity: usize) -> Self {
149        Scheduler {
150            waiter,
151            executor,
152            heap: BinaryHeap::with_capacity(capacity),
153        }
154    }
155
156    fn run(&mut self) {
157        enum Sleep {
158            NotAtAll,
159            UntilAwakened,
160            AtMost(Duration),
161        }
162
163        let waiter = &(*self.waiter);
164        loop {
165            let mut sleep = if let Some(sched) = self.heap.peek() {
166                let now = Instant::now();
167                if sched.date > now {
168                    // First item is not ready yet, so we need to
169                    // wait until it is or something happens.
170                    Sleep::AtMost(sched.date.duration_since(now))
171                } else {
172                    // At this stage, we have an item that has reached
173                    // execution time. The `unwrap()` is guaranteed to
174                    // succeed.
175                    let sched = self.heap.pop().unwrap();
176
177                    // The item we just popped might have been killed.
178                    // Let's check that before executing.
179                    if sched.guard.should_execute() {
180                        // We have something to do.
181                        if let Some(delta) = sched.repeat {
182                            let data = self.executor.execute_clone(sched.data);
183
184                            // This is a repeating timer, so we need to
185                            // enqueue the next call.
186                            self.heap.push(Schedule {
187                                date: sched.date + delta,
188                                data,
189                                guard: sched.guard,
190                                repeat: Some(delta),
191                            });
192                        } else {
193                            self.executor.execute(sched.data);
194                        }
195                    }
196
197                    // We have just popped an item, but it might be too early
198                    // to go back to sleep. Maybe the next item will need to
199                    // be executed immediately.
200                    // We do not `continue`, to ensure the `waiter.messages`
201                    // are checked before next item is executed.
202                    Sleep::NotAtAll
203                }
204            } else {
205                // Nothing to do
206                Sleep::UntilAwakened
207            };
208
209            let mut lock = waiter.messages.lock().unwrap();
210            // Pop all messages.
211            for msg in lock.drain(..) {
212                match msg {
213                    Op::Stop => {
214                        // Stop immediately, even if there are any pending timer actions.
215                        return;
216                    }
217                    Op::Schedule(sched) => {
218                        self.heap.push(sched);
219                        // New item was added to heap, we must check if sleep
220                        // is needed or not, hence we cannot sleep
221                        sleep = Sleep::NotAtAll;
222                    }
223                }
224            }
225
226            match sleep {
227                Sleep::UntilAwakened => {
228                    let _ = waiter.condvar.wait(lock);
229                }
230                Sleep::AtMost(delay) => {
231                    let sec = delay.as_secs();
232                    let ns = delay.subsec_nanos();
233                    let duration = Duration::new(sec, ns);
234                    let _ = waiter.condvar.wait_timeout(lock, duration);
235                }
236                Sleep::NotAtAll => {}
237            }
238        }
239    }
240}
241
242/// Shared coordination logic for timer threads.
243pub struct TimerBase<T>
244where
245    T: 'static + Send,
246{
247    /// Sender used to communicate with the _Communication_ thread. In
248    /// turn, this thread will send
249    tx: Sender<Op<T>>,
250}
251
252impl<T> Drop for TimerBase<T>
253where
254    T: 'static + Send,
255{
256    /// Stop the timer threads.
257    fn drop(&mut self) {
258        self.tx.send(Op::Stop).unwrap();
259    }
260}
261
262impl<T> TimerBase<T>
263where
264    T: 'static + Send,
265{
266    /// Create a timer base.
267    ///
268    /// This immediately launches two threads, which will remain
269    /// launched until the timer is dropped. As expected, the threads
270    /// spend most of their life waiting for instructions.
271    fn new<E>(executor: E) -> Self
272    where
273        E: 'static + Executor<T> + Send,
274    {
275        Self::with_capacity(executor, 32)
276    }
277
278    /// As `new()`, but with a manually specified initial capacity.
279    fn with_capacity<E>(executor: E, capacity: usize) -> Self
280    where
281        E: 'static + Executor<T> + Send,
282    {
283        let waiter_send = Arc::new(WaiterChannel::with_capacity(capacity));
284        let waiter_recv = waiter_send.clone();
285
286        // Spawn a first thread, whose sole role is to dispatch
287        // messages to the second thread without having to wait too
288        // long for the mutex.
289        let (tx, rx) = channel();
290        thread::spawn(move || {
291            use Op::*;
292            let waiter = &(*waiter_send);
293            for msg in rx.iter() {
294                let mut vec = waiter.messages.lock().unwrap();
295                match msg {
296                    Schedule(sched) => {
297                        vec.push(Schedule(sched));
298                        waiter.condvar.notify_one();
299                    }
300                    Stop => {
301                        vec.clear();
302                        vec.push(Stop);
303                        waiter.condvar.notify_one();
304                        return;
305                    }
306                }
307            }
308        });
309
310        // Spawn a second thread, in charge of scheduling.
311        thread::Builder::new()
312            .name("Timer thread".to_owned())
313            .spawn(move || {
314                let mut scheduler = Scheduler::with_capacity(waiter_recv, executor, capacity);
315                scheduler.run()
316            })
317            .unwrap();
318        TimerBase { tx }
319    }
320
321    pub fn schedule_with_delay(&self, delay: Duration, data: T) -> Guard {
322        self.schedule(Instant::now() + delay, None, data)
323    }
324
325    pub fn schedule_repeating(&self, repeat: Duration, data: T) -> Guard {
326        self.schedule(Instant::now() + repeat, Some(repeat), data)
327    }
328
329    pub fn schedule(&self, date: Instant, repeat: Option<Duration>, data: T) -> Guard {
330        let guard = Guard::new();
331        self.tx
332            .send(Op::Schedule(Schedule {
333                date,
334                data,
335                guard: guard.clone(),
336                repeat,
337            }))
338            .unwrap();
339        guard
340    }
341}
342
343/// A monotonic timer, used to schedule execution of callbacks at a later date.
344///
345/// In the current implementation, each timer is executed as two
346/// threads. The _Scheduler_ thread is in charge of maintaining the
347/// queue of callbacks to execute and of actually executing them. The
348/// _Communication_ thread is in charge of communicating with the
349/// _Scheduler_ thread (which requires acquiring a possibly-long-held
350/// Mutex) without blocking the caller thread.
351pub struct Timer {
352    base: TimerBase<Box<dyn FnMut() + Send>>,
353}
354
355impl Timer {
356    /// Create a timer.
357    ///
358    /// This immediately launches two threads, which will remain
359    /// launched until the timer is dropped. As expected, the threads
360    /// spend most of their life waiting for instructions.
361    pub fn new() -> Self {
362        Timer {
363            base: TimerBase::new(CallbackExecutor),
364        }
365    }
366
367    /// As `new()`, but with a manually specified initial capacity.
368    pub fn with_capacity(capacity: usize) -> Self {
369        Timer {
370            base: TimerBase::with_capacity(CallbackExecutor, capacity),
371        }
372    }
373
374    /// Schedule a callback for execution after a delay.
375    ///
376    /// Callbacks are guaranteed to never be called before the
377    /// delay. However, it is possible that they will be called a
378    /// little after the delay.
379    ///
380    /// If the delay is negative or 0, the callback is executed as
381    /// soon as possible.
382    ///
383    /// This method returns a `Guard` object. If that `Guard` is
384    /// dropped, execution is cancelled.
385    ///
386    /// # Performance
387    ///
388    /// The callback is executed on the Scheduler thread. It should
389    /// therefore terminate very quickly, or risk causing delaying
390    /// other callbacks.
391    ///
392    /// # Failures
393    ///
394    /// Any failure in `cb` will scheduler thread and progressively
395    /// contaminate the Timer and the calling thread itself. You have
396    /// been warned.
397    ///
398    /// # Example
399    ///
400    /// ```
401    /// extern crate monotonic_timer;
402    /// use std::sync::mpsc::channel;
403    /// use std::time::Duration;
404    /// let timer = monotonic_timer::Timer::new();
405    /// let (tx, rx) = channel();
406    ///
407    /// let _guard = timer.schedule_with_delay(Duration::from_secs(3), move || {
408    ///   // This closure is executed on the scheduler thread,
409    ///   // so we want to move it away asap.
410    ///
411    ///   let _ignored = tx.send(()); // Avoid unwrapping here.
412    /// });
413    ///
414    /// rx.recv().unwrap();
415    /// println!("This code has been executed after 3 seconds");
416    /// ```
417    pub fn schedule_with_delay<F>(&self, delay: Duration, cb: F) -> Guard
418    where
419        F: 'static + FnMut() + Send,
420    {
421        self.base.schedule_with_delay(delay, Box::new(cb))
422    }
423
424    /// Schedule a callback for execution once per interval.
425    ///
426    /// Callbacks are guaranteed to never be called before their
427    /// date. However, it is possible that they will be called a
428    /// little after it.
429    ///
430    /// This method returns a `Guard` object. If that `Guard` is
431    /// dropped, repeat is stopped.
432    ///
433    ///
434    /// # Performance
435    ///
436    /// The callback is executed on the Scheduler thread. It should
437    /// therefore terminate very quickly, or risk causing delaying
438    /// other callbacks.
439    ///
440    /// # Failures
441    ///
442    /// Any failure in `cb` will scheduler thread and progressively
443    /// contaminate the Timer and the calling thread itself. You have
444    /// been warned.
445    ///
446    /// # Example
447    ///
448    /// ```
449    /// extern crate monotonic_timer;
450    /// use std::thread;
451    /// use std::sync::{Arc, Mutex};
452    /// use std::time::Duration;
453    /// let timer = monotonic_timer::Timer::new();
454    /// // Number of times the callback has been called.
455    /// let count = Arc::new(Mutex::new(0));
456    ///
457    /// // Start repeating. Each callback increases `count`.
458    /// let guard = {
459    ///   let count = count.clone();
460    ///   timer.schedule_repeating(Duration::from_millis(5), move || {
461    ///     *count.lock().unwrap() += 1;
462    ///   })
463    /// };
464    ///
465    /// // Sleep one second. The callback should be called ~200 times.
466    /// thread::sleep(Duration::from_secs(1));
467    /// let count_result = *count.lock().unwrap();
468    /// assert!(190 <= count_result && count_result <= 210,
469    ///   "The timer was called {} times", count_result);
470    ///
471    /// // Now drop the guard. This should stop the timer.
472    /// drop(guard);
473    /// thread::sleep(Duration::new(0, 100));
474    ///
475    /// // Let's check that the count stops increasing.
476    /// let count_start = *count.lock().unwrap();
477    /// thread::sleep(Duration::from_secs(1));
478    /// let count_stop =  *count.lock().unwrap();
479    /// assert_eq!(count_start, count_stop);
480    /// ```
481    pub fn schedule_repeating<F>(&self, repeat: Duration, cb: F) -> Guard
482    where
483        F: 'static + FnMut() + Send,
484    {
485        self.base.schedule_repeating(repeat, Box::new(cb))
486    }
487
488    /// Schedule a callback for execution at a given time, then once
489    /// per interval. A typical use case is to execute code once per
490    /// day at 12am.
491    ///
492    /// Callbacks are guaranteed to never be called before their
493    /// date. However, it is possible that they will be called a
494    /// little after it.
495    ///
496    /// This method returns a `Guard` object. If that `Guard` is
497    /// dropped, repeat is stopped.
498    ///
499    ///
500    /// # Performance
501    ///
502    /// The callback is executed on the Scheduler thread. It should
503    /// therefore terminate very quickly, or risk causing delaying
504    /// other callbacks.
505    ///
506    /// # Failures
507    ///
508    /// Any failure in `cb` will scheduler thread and progressively
509    /// contaminate the Timer and the calling thread itself. You have
510    /// been warned.
511    pub fn schedule<F>(&self, date: Instant, repeat: Option<Duration>, cb: F) -> Guard
512    where
513        F: 'static + FnMut() + Send,
514    {
515        self.base.schedule(date, repeat, Box::new(cb))
516    }
517}
518
519impl Default for Timer {
520    fn default() -> Self {
521        Self::new()
522    }
523}
524
525/// A monotonic timer, used to schedule delivery of messages at a later date.
526///
527/// In the current implementation, each timer is executed as two
528/// threads. The _Scheduler_ thread is in charge of maintaining the
529/// queue of messages to deliver and of actually deliverying them. The
530/// _Communication_ thread is in charge of communicating with the
531/// _Scheduler_ thread (which requires acquiring a possibly-long-held
532/// Mutex) without blocking the caller thread.
533///
534/// Similar functionality could be implemented using the generic Timer
535/// type, however, using MessageTimer has two performance advantages
536/// over doing so. First, MessageTimer does not need to heap allocate
537/// a closure for each scheduled item, since the messages to queue are
538/// passed directly. Second, MessageTimer avoids the dynamic dispatch
539/// overhead associated with invoking the closure functions.
540pub struct MessageTimer<T>
541where
542    T: 'static + Send + Clone,
543{
544    base: TimerBase<T>,
545}
546
547impl<T> MessageTimer<T>
548where
549    T: 'static + Send + Clone,
550{
551    /// Create a message timer.
552    ///
553    /// This immediately launches two threads, which will remain
554    /// launched until the timer is dropped. As expected, the threads
555    /// spend most of their life waiting for instructions.
556    pub fn new(tx: Sender<T>) -> Self {
557        MessageTimer {
558            base: TimerBase::new(DeliveryExecutor { tx }),
559        }
560    }
561
562    /// As `new()`, but with a manually specified initial capacity.
563    pub fn with_capacity(tx: Sender<T>, capacity: usize) -> Self {
564        MessageTimer {
565            base: TimerBase::with_capacity(DeliveryExecutor { tx }, capacity),
566        }
567    }
568
569    /// Schedule a message for delivery after a delay.
570    ///
571    /// Messages are guaranteed to never be delivered before the
572    /// delay. However, it is possible that they will be delivered a
573    /// little after the delay.
574    ///
575    /// If the delay is negative or 0, the message is delivered as
576    /// soon as possible.
577    ///
578    /// This method returns a `Guard` object. If that `Guard` is
579    /// dropped, delivery is cancelled.
580    ///
581    ///
582    /// # Example
583    ///
584    /// ```
585    /// extern crate monotonic_timer;
586    ///
587    /// use std::sync::mpsc::channel;
588    /// use std::time::Duration;
589    ///
590    /// let (tx, rx) = channel();
591    /// let timer = monotonic_timer::MessageTimer::new(tx);
592    /// let _guard = timer.schedule_with_delay(Duration::from_secs(3), 3);
593    ///
594    /// rx.recv().unwrap();
595    /// println!("This code has been executed after 3 seconds");
596    /// ```
597    pub fn schedule_with_delay(&self, delay: Duration, msg: T) -> Guard {
598        self.base.schedule_with_delay(delay, msg)
599    }
600
601    /// Schedule a message for delivery once per interval.
602    ///
603    /// Messages are guaranteed to never be delivered before their
604    /// date. However, it is possible that they will be delivered a
605    /// little after it.
606    ///
607    /// This method returns a `Guard` object. If that `Guard` is
608    /// dropped, repeat is stopped.
609    ///
610    ///
611    /// # Performance
612    ///
613    /// The message is cloned on the Scheduler thread. Cloning of
614    /// messages should therefore succeed very quickly, or risk
615    /// delaying other messages.
616    ///
617    /// # Failures
618    ///
619    /// Any failure in cloning of messages will occur on the scheduler thread
620    /// and will contaminate the Timer and the calling thread itself. You have
621    /// been warned.
622    ///
623    /// # Example
624    ///
625    /// ```
626    /// extern crate monotonic_timer;
627    /// use std::sync::mpsc::channel;
628    /// use std::time::Duration;
629    ///
630    /// let (tx, rx) = channel();
631    /// let timer = monotonic_timer::MessageTimer::new(tx);
632    ///
633    /// // Start repeating.
634    /// let guard = timer.schedule_repeating(Duration::from_millis(5), 0);
635    ///
636    /// let mut count = 0;
637    /// while count < 5 {
638    ///   let _ = rx.recv();
639    ///   println!("Prints every 5 milliseconds");
640    ///   count += 1;
641    /// }
642    /// ```
643    pub fn schedule_repeating(&self, repeat: Duration, msg: T) -> Guard {
644        self.base.schedule_repeating(repeat, msg)
645    }
646
647    /// Schedule a message for delivery at a given time, then once
648    /// per interval. A typical use case is to execute code once per
649    /// day at 12am.
650    ///
651    /// Messages are guaranteed to never be delivered before their
652    /// date. However, it is possible that they will be delivered a
653    /// little after it.
654    ///
655    /// This method returns a `Guard` object. If that `Guard` is
656    /// dropped, repeat is stopped.
657    ///
658    /// # Performance
659    ///
660    /// The message is cloned on the Scheduler thread. Cloning of
661    /// messages should therefore succeed very quickly, or risk
662    /// delaying other messages.
663    ///
664    /// # Failures
665    ///
666    /// Any failure in cloning of messages will occur on the scheduler thread
667    /// and will contaminate the Timer and the calling thread itself. You have
668    /// been warned.
669    pub fn schedule<D>(&self, date: Instant, repeat: Option<Duration>, msg: T) -> Guard {
670        self.base.schedule(date, repeat, msg)
671    }
672}
673
674/// A value scoping a schedule. When this value is dropped, the
675/// schedule is cancelled.
676#[derive(Clone)]
677pub struct Guard {
678    should_execute: Arc<AtomicBool>,
679    ignore_drop: bool,
680}
681impl Guard {
682    fn new() -> Self {
683        Guard {
684            should_execute: Arc::new(AtomicBool::new(true)),
685            ignore_drop: false,
686        }
687    }
688    fn should_execute(&self) -> bool {
689        self.should_execute.load(AtomicOrdering::Relaxed)
690    }
691
692    /// Ignores the guard, preventing it from disabling the scheduled
693    /// item. This can be used to avoid maintaining a Guard handle
694    /// for items that will never be cancelled.
695    pub fn ignore(mut self) {
696        self.ignore_drop = true;
697    }
698}
699impl Drop for Guard {
700    /// Cancel a schedule.
701    fn drop(&mut self) {
702        if !self.ignore_drop {
703            self.should_execute.store(false, AtomicOrdering::Relaxed)
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    extern crate std;
711    use super::*;
712    use std::sync::mpsc::channel;
713    use std::sync::{Arc, Mutex};
714    use std::thread;
715    use std::time::Duration;
716
717    #[test]
718    fn test_schedule_with_delay() {
719        let timer = Timer::new();
720        let (tx, rx) = channel();
721        let mut guards = vec![];
722
723        // Schedule a number of callbacks in an arbitrary order, make sure
724        // that they are executed in the right order.
725        let mut delays = vec![1, 5, 3, 0];
726        let start = Instant::now();
727        for i in delays.clone() {
728            println!("Scheduling for execution in {} seconds", i);
729            let tx = tx.clone();
730            guards.push(timer.schedule_with_delay(Duration::from_secs(i), move || {
731                println!("Callback {}", i);
732                tx.send(i).unwrap();
733            }));
734        }
735
736        delays.sort();
737        for (i, msg) in (0..delays.len()).zip(rx.iter()) {
738            let elapsed = start.elapsed().as_secs();
739            println!("Received message {} after {} seconds", msg, elapsed);
740            assert_eq!(msg, delays[i]);
741            assert!(
742                delays[i] <= elapsed && elapsed <= delays[i] + 3,
743                "We have waited {} seconds, expecting [{}, {}]",
744                elapsed,
745                delays[i],
746                delays[i] + 3
747            );
748        }
749
750        // Now make sure that callbacks that are designed to be executed
751        // immediately are executed quickly.
752        let start = Instant::now();
753        for i in vec![10, 0] {
754            println!("Scheduling for execution in {} seconds", i);
755            let tx = tx.clone();
756            guards.push(timer.schedule_with_delay(Duration::from_secs(i), move || {
757                println!("Callback {}", i);
758                tx.send(i).unwrap();
759            }));
760        }
761
762        assert_eq!(rx.recv().unwrap(), 0);
763        assert!(start.elapsed() <= Duration::from_secs(1));
764    }
765
766    #[test]
767    fn test_message_timer() {
768        let (tx, rx) = channel();
769        let timer = MessageTimer::new(tx);
770        let start = Instant::now();
771
772        let mut delays = vec![400, 300, 100, 500, 200];
773        for delay in delays.clone() {
774            timer
775                .schedule_with_delay(Duration::from_millis(delay), delay)
776                .ignore();
777        }
778
779        delays.sort();
780        for delay in delays {
781            assert_eq!(rx.recv().unwrap(), delay);
782        }
783        assert!(start.elapsed() <= Duration::from_secs(1));
784    }
785
786    #[test]
787    fn test_guards() {
788        println!("Testing that callbacks aren't called if the guard is dropped");
789        let timer = Timer::new();
790        let called = Arc::new(Mutex::new(false));
791
792        for i in 0..10 {
793            let called = called.clone();
794            timer.schedule_with_delay(Duration::from_millis(i), move || {
795                *called.lock().unwrap() = true;
796            });
797        }
798
799        thread::sleep(Duration::from_secs(1));
800        assert_eq!(*called.lock().unwrap(), false);
801    }
802
803    #[test]
804    fn test_guard_ignore() {
805        let timer = Timer::new();
806        let called = Arc::new(Mutex::new(false));
807
808        {
809            let called = called.clone();
810            timer
811                .schedule_with_delay(Duration::from_millis(1), move || {
812                    *called.lock().unwrap() = true;
813                })
814                .ignore();
815        }
816
817        thread::sleep(Duration::from_secs(1));
818        assert_eq!(*called.lock().unwrap(), true);
819    }
820
821    struct NoCloneMessage;
822
823    impl Clone for NoCloneMessage {
824        fn clone(&self) -> Self {
825            panic!("TestMessage should not be cloned");
826        }
827    }
828
829    #[test]
830    fn test_no_clone() {
831        // Make sure that, if no schedule is supplied to a MessageTimer
832        // the message instances are not cloned.
833        let (tx, rx) = channel();
834        let timer = MessageTimer::new(tx);
835        timer
836            .schedule_with_delay(Duration::from_millis(0), NoCloneMessage)
837            .ignore();
838        timer
839            .schedule_with_delay(Duration::from_millis(0), NoCloneMessage)
840            .ignore();
841
842        for _ in 0..2 {
843            let _ = rx.recv();
844        }
845    }
846
847    #[test]
848    fn test_too_much_work() {
849        // Make sure that even if the timer has too much work, tasks still get executed
850        // and dropping the timer still kills future tasks.
851
852        // To do this, we schedule a task longer to execute than its `repeat` interval.
853        let timer = Timer::new();
854        let was_called = Arc::new(Mutex::new(false));
855        let was_called_2 = Arc::new(Mutex::new(false));
856
857        {
858            let was_called = was_called.clone();
859            // Schedule a task longer than repeat time
860            timer
861                .schedule(Instant::now(), Some(Duration::from_millis(10)), move || {
862                    thread::sleep(Duration::from_millis(30));
863                    *was_called.lock().unwrap() = true;
864                })
865                .ignore();
866            let was_called_2 = was_called_2.clone();
867
868            // Now schedule another task.
869            timer
870                .schedule(Instant::now(), None, move || {
871                    thread::sleep(Duration::from_millis(30));
872                    *was_called_2.lock().unwrap() = true;
873                })
874                .ignore();
875        }
876
877        // Check that both our tasks were executed.
878        thread::sleep(Duration::from_millis(150));
879        assert!(
880            *was_called.lock().unwrap(),
881            "Periodic task should have been called"
882        );
883        assert!(
884            *was_called_2.lock().unwrap(),
885            "One-time task should have been called"
886        );
887
888        // Now drop the timer. This should stop any task from being executed.
889        drop(timer);
890
891        // Check that the periodic task isn't executed anymore.
892        // First, we wait in case we haven't finished executing it,
893        // then we reset it and check that it isn't executed.
894        thread::sleep(Duration::from_millis(150));
895        *was_called.lock().unwrap() = false;
896        thread::sleep(Duration::from_millis(200));
897        assert!(
898            !*was_called.lock().unwrap(),
899            "Task should have been stopped when the timer dropped"
900        );
901    }
902}