timer/
lib.rs

1//! A simple timer, used to enqueue operations meant to be executed at
2//! a given time or after a given delay.
3
4extern crate chrono;
5
6use std::cmp::Ordering;
7use std::thread;
8use std::sync::atomic::AtomicBool;
9use std::sync::atomic::Ordering as AtomicOrdering;
10use std::sync::{Arc, Mutex, Condvar};
11use std::sync::mpsc::{channel, Sender};
12use std::collections::BinaryHeap;
13use chrono::{Duration, DateTime};
14use chrono::offset::Utc;
15
16/// An item scheduled for delayed execution.
17struct Schedule<T> {
18    /// The instant at which to execute.
19    date: DateTime<Utc>,
20
21    /// The schedule data.
22    data : T,
23
24    /// A mechanism to cancel execution of an item.
25    guard: Guard,
26
27    /// If `Some(d)`, the item must be repeated every interval of
28    /// length `d`, until cancelled.
29    repeat: Option<Duration>
30}
31impl <T> Ord for Schedule<T> {
32    fn cmp(&self, other: &Self) -> Ordering {
33        self.date.cmp(&other.date).reverse()
34    }
35}
36impl <T> PartialOrd for Schedule<T> {
37    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
38        self.date.partial_cmp(&other.date).map(|ord| ord.reverse())
39    }
40}
41impl <T> Eq for Schedule<T> {
42}
43impl <T> PartialEq for Schedule<T> {
44    fn eq(&self, other: &Self) -> bool {
45        self.date.eq(&other.date)
46    }
47}
48
49/// An operation to be sent across threads.
50enum Op<T> {
51    /// Schedule a new item for execution.
52    Schedule(Schedule<T>),
53
54    /// Stop the thread.
55    Stop
56}
57
58/// A mutex-based kind-of-channel used to communicate between the
59/// Communication thread and the Scheuler thread.
60struct WaiterChannel<T> {
61    /// Pending messages.
62    messages: Mutex<Vec<Op<T>>>,
63    /// A condition variable used for waiting.
64    condvar: Condvar,
65}
66impl <T> WaiterChannel<T> {
67    fn with_capacity(cap: usize) -> Self {
68        WaiterChannel {
69            messages: Mutex::new(Vec::with_capacity(cap)),
70            condvar: Condvar::new(),
71        }
72    }
73}
74
75/// A trait that allows configurable execution of scheduled item
76/// on the scheduler thread.
77trait Executor<T> {
78    // Due to difference in use between Box<FnMut()> and most other data
79    // types, this trait requires implementors to provide two implementations
80    // of execute. While both of these functions execute the data item
81    // they differ on whether they make an equivalent data item available
82    // to the Scheduler to store in recurring schedules.
83    //
84    // execute() is called whenever a non-recurring data item needs
85    // to be executed, and consumes the data item in the process.
86    //
87    // execute_clone() is called whenever a recurring data item needs
88    // to be executed, and produces a new equivalent data item. This
89    // function should be more or less equivalent to:
90    //
91    // fn execute_clone(&mut self, data : T) -> T {
92    //   self.execute(data.clone());
93    //   data
94    // }
95
96    fn execute(&mut self, data : T);
97
98    fn execute_clone(&mut self, data : T) -> T;
99}
100
101/// An executor implementation for executing callbacks on the scheduler
102/// thread.
103struct CallbackExecutor;
104
105impl Executor<Box<FnMut() + Send>> for CallbackExecutor {
106    fn execute(&mut self, mut data : Box<FnMut() + Send>) {
107        data();
108    }
109
110    fn execute_clone(&mut self, mut data : Box<FnMut() + Send>) -> Box<FnMut() + Send> {
111        data();
112        data
113    }
114}
115
116/// An executor implementation for delivering messages to a channel.
117struct DeliveryExecutor<T>
118    where T : 'static + Send {
119    /// The channel to deliver messages to.
120    tx : Sender<T>
121}
122
123impl <T> Executor<T> for DeliveryExecutor<T>
124    where T : 'static + Send + Clone {
125    fn execute(&mut self, data : T) {
126        let _ = self.tx.send(data);
127    }
128
129    fn execute_clone(&mut self, data : T) -> T {
130        let _ = self.tx.send(data.clone());
131        data
132    }
133}
134
135
136struct Scheduler<T,E> where E : Executor<T> {
137    waiter: Arc<WaiterChannel<T>>,
138    heap: BinaryHeap<Schedule<T>>,
139    executor: E
140}
141
142impl <T,E> Scheduler<T,E> where E : Executor<T> {
143    fn with_capacity(waiter: Arc<WaiterChannel<T>>, executor : E, capacity: usize) -> Self {
144        Scheduler {
145            waiter: waiter,
146            executor: executor,
147            heap: BinaryHeap::with_capacity(capacity),
148        }
149    }
150
151    fn run(&mut self) {
152        enum Sleep {
153            NotAtAll,
154            UntilAwakened,
155            AtMost(Duration)
156        }
157
158        let ref waiter = *self.waiter;
159        loop {
160            let mut lock = waiter.messages.lock().unwrap();
161
162            // Pop all messages.
163            for msg in lock.drain(..) {
164                match msg {
165                    Op::Stop => {
166                        return;
167                    }
168                    Op::Schedule(sched) => self.heap.push(sched),
169                }
170            }
171
172            // Pop all the callbacks that are ready.
173
174            // If we don't find
175            let mut sleep = Sleep::UntilAwakened;
176            loop {
177                let now = Utc::now();
178                if let Some(sched) = self.heap.peek() {
179                    if sched.date > now {
180                        // First item is not ready yet, so we need to
181                        // wait until it is or something happens.
182                        sleep = Sleep::AtMost(sched.date.signed_duration_since(now));
183                        break;
184                    }
185                } else {
186                    // Schedule is empty, nothing to do, wait until something happens.
187                    break;
188                }
189                // At this stage, we have an item that has reached
190                // execution time. The `unwrap()` is guaranteed to
191                // succeed.
192                let sched = self.heap.pop().unwrap();
193                if !sched.guard.should_execute() {
194                    // Execution has been cancelled, skip this item.
195                    continue;
196                }
197
198                if let Some(delta) = sched.repeat {
199                    let data = self.executor.execute_clone(sched.data);
200
201                    // This is a repeating timer, so we need to
202                    // enqueue the next call.
203                    sleep = Sleep::NotAtAll;
204                    self.heap.push(Schedule {
205                        date: sched.date + delta,
206                        data: data,
207                        guard: sched.guard,
208                        repeat: Some(delta)
209                    });
210                } else {
211                    self.executor.execute(sched.data);
212                }
213            }
214
215            match sleep {
216                Sleep::UntilAwakened => {
217                    let _ = waiter.condvar.wait(lock);
218                },
219                Sleep::AtMost(delay) => {
220                    let sec = delay.num_seconds();
221                    let ns = (delay - Duration::seconds(sec)).num_nanoseconds().unwrap(); // This `unwrap()` asserts that the number of ns is not > 1_000_000_000. Since we just substracted the number of seconds, the assertion should always pass.
222                    let duration = std::time::Duration::new(sec as u64, ns as u32);
223                    let _ = waiter.condvar.wait_timeout(lock, duration);
224                },
225                Sleep::NotAtAll => {}
226            }
227        }
228    }
229}
230
231/// Shared coordination logic for timer threads.
232pub struct TimerBase<T>
233    where T : 'static + Send {
234    /// Sender used to communicate with the _Communication_ thread. In
235    /// turn, this thread will send 
236    tx: Sender<Op<T>>,
237}
238
239impl <T> Drop for TimerBase<T>
240    where T : 'static + Send {
241    /// Stop the timer threads.
242    fn drop(&mut self) {
243        self.tx.send(Op::Stop).unwrap();
244    }
245}
246
247impl <T> TimerBase<T>
248    where T : 'static + Send {
249    /// Create a timer base.
250    ///
251    /// This immediatey launches two threads, which will remain
252    /// launched until the timer is dropped. As expected, the threads
253    /// spend most of their life waiting for instructions.
254    fn new<E>(executor : E) -> Self
255        where E : 'static + Executor<T> + Send {
256        Self::with_capacity(executor, 32)
257    }
258
259    /// As `new()`, but with a manually specified initial capaicty.
260    fn with_capacity<E>(executor : E, capacity: usize) -> Self
261        where E : 'static + Executor<T> + Send {
262        let waiter_send = Arc::new(WaiterChannel::with_capacity(capacity));
263        let waiter_recv = waiter_send.clone();
264
265        // Spawn a first thread, whose sole role is to dispatch
266        // messages to the second thread without having to wait too
267        // long for the mutex.
268        let (tx, rx) = channel();
269        thread::spawn(move || {
270            use Op::*;
271            let ref waiter = *waiter_send;
272            for msg in rx.iter() {
273                let mut vec = waiter.messages.lock().unwrap();
274                match msg {
275                    Schedule(sched) => {
276                        vec.push(Schedule(sched));
277                        waiter.condvar.notify_one();
278                    }
279                    Stop => {
280                        vec.clear();
281                        vec.push(Op::Stop);
282                        waiter.condvar.notify_one();
283                        return;
284                    }
285                }
286            }
287        });
288
289        // Spawn a second thread, in charge of scheduling.
290        thread::Builder::new().name("Timer thread".to_owned()).spawn(move || {
291            let mut scheduler = Scheduler::with_capacity(waiter_recv, executor, capacity);
292            scheduler.run()
293        }).unwrap();
294        TimerBase {
295            tx: tx
296        }
297    }
298
299    pub fn schedule_with_delay(&self, delay: Duration, data : T) -> Guard {
300        self.schedule_with_date(Utc::now() + delay, data)
301    }
302
303    pub fn schedule_with_date<D>(&self, date: DateTime<D>, data : T) -> Guard
304        where D : chrono::offset::TimeZone
305    {
306        self.schedule(date, None, data)
307    }
308
309    pub fn schedule_repeating(&self, repeat: Duration, data : T) -> Guard
310    {
311        self.schedule(Utc::now() + repeat, Some(repeat), data)
312    }
313
314    pub fn schedule<D>(&self, date: DateTime<D>, repeat: Option<Duration>, data : T) -> Guard
315        where D : chrono::offset::TimeZone
316    {
317        let guard = Guard::new();
318        self.tx.send(Op::Schedule(Schedule {
319            date: date.with_timezone(&Utc),
320            data: data,
321            guard: guard.clone(),
322            repeat: repeat
323        })).unwrap();
324        guard
325    }
326}
327
328/// A timer, used to schedule execution of callbacks at a later date.
329///
330/// In the current implementation, each timer is executed as two
331/// threads. The _Scheduler_ thread is in charge of maintaining the
332/// queue of callbacks to execute and of actually executing them. The
333/// _Communication_ thread is in charge of communicating with the
334/// _Scheduler_ thread (which requires acquiring a possibly-long-held
335/// Mutex) without blocking the caller thread.
336pub struct Timer {
337    base: TimerBase<Box<FnMut() + Send>>
338}
339
340impl Timer {
341    /// Create a timer.
342    ///
343    /// This immediatey launches two threads, which will remain
344    /// launched until the timer is dropped. As expected, the threads
345    /// spend most of their life waiting for instructions.
346    pub fn new() -> Self {
347        Timer { base : TimerBase::new(CallbackExecutor) }
348    }
349
350    /// As `new()`, but with a manually specified initial capaicty.
351    pub fn with_capacity(capacity: usize) -> Self {
352        Timer { base : TimerBase::with_capacity(CallbackExecutor, capacity) }
353    }
354
355    /// Schedule a callback for execution after a delay.
356    ///
357    /// Callbacks are guaranteed to never be called before the
358    /// delay. However, it is possible that they will be called a
359    /// little after the delay.
360    ///
361    /// If the delay is negative or 0, the callback is executed as
362    /// soon as possible.
363    ///
364    /// This method returns a `Guard` object. If that `Guard` is
365    /// dropped, execution is cancelled.
366    ///
367    /// # Performance
368    ///
369    /// The callback is executed on the Scheduler thread. It should
370    /// therefore terminate very quickly, or risk causing delaying
371    /// other callbacks.
372    ///
373    /// # Failures
374    ///
375    /// Any failure in `cb` will scheduler thread and progressively
376    /// contaminate the Timer and the calling thread itself. You have
377    /// been warned.
378    ///
379    /// # Example
380    ///
381    /// ```
382    /// extern crate timer;
383    /// extern crate chrono;
384    /// use std::sync::mpsc::channel;
385    ///
386    /// let timer = timer::Timer::new();
387    /// let (tx, rx) = channel();
388    ///
389    /// let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), move || {
390    ///   // This closure is executed on the scheduler thread,
391    ///   // so we want to move it away asap.
392    ///
393    ///   let _ignored = tx.send(()); // Avoid unwrapping here.
394    /// });
395    ///
396    /// rx.recv().unwrap();
397    /// println!("This code has been executed after 3 seconds");
398    /// ```
399    pub fn schedule_with_delay<F>(&self, delay: Duration, cb: F) -> Guard
400        where F: 'static + FnMut() + Send {
401        self.base.schedule_with_delay(delay, Box::new(cb))
402    }
403
404    /// Schedule a callback for execution at a given date.
405    ///
406    /// Callbacks are guaranteed to never be called before their
407    /// date. However, it is possible that they will be called a
408    /// little after it.
409    ///
410    /// If the date is in the past, the callback is executed as soon
411    /// as possible.
412    ///
413    /// This method returns a `Guard` object. If that `Guard` is
414    /// dropped, execution is cancelled.
415    ///
416    ///
417    /// # Performance
418    ///
419    /// The callback is executed on the Scheduler thread. It should
420    /// therefore terminate very quickly, or risk causing delaying
421    /// other callbacks.
422    ///
423    /// # Failures
424    ///
425    /// Any failure in `cb` will scheduler thread and progressively
426    /// contaminate the Timer and the calling thread itself. You have
427    /// been warned.
428    pub fn schedule_with_date<F, T>(&self, date: DateTime<T>, cb: F) -> Guard
429        where F: 'static + FnMut() + Send, T : chrono::offset::TimeZone
430    {
431        self.base.schedule_with_date(date, Box::new(cb))
432    }
433
434    /// Schedule a callback for execution once per interval.
435    ///
436    /// Callbacks are guaranteed to never be called before their
437    /// date. However, it is possible that they will be called a
438    /// little after it.
439    ///
440    /// This method returns a `Guard` object. If that `Guard` is
441    /// dropped, repeat is stopped.
442    ///
443    ///
444    /// # Performance
445    ///
446    /// The callback is executed on the Scheduler thread. It should
447    /// therefore terminate very quickly, or risk causing delaying
448    /// other callbacks.
449    ///
450    /// # Failures
451    ///
452    /// Any failure in `cb` will scheduler thread and progressively
453    /// contaminate the Timer and the calling thread itself. You have
454    /// been warned.
455    ///
456    /// # Example
457    ///
458    /// ```
459    /// extern crate timer;
460    /// extern crate chrono;
461    /// use std::thread;
462    /// use std::sync::{Arc, Mutex};
463    ///
464    /// let timer = timer::Timer::new();
465    /// // Number of times the callback has been called.
466    /// let count = Arc::new(Mutex::new(0));
467    ///
468    /// // Start repeating. Each callback increases `count`.
469    /// let guard = {
470    ///   let count = count.clone();
471    ///   timer.schedule_repeating(chrono::Duration::milliseconds(5), move || {
472    ///     *count.lock().unwrap() += 1;
473    ///   })
474    /// };
475    ///
476    /// // Sleep one second. The callback should be called ~200 times.
477    /// thread::sleep(std::time::Duration::new(1, 0));
478    /// let count_result = *count.lock().unwrap();
479    /// assert!(190 <= count_result && count_result <= 210,
480    ///   "The timer was called {} times", count_result);
481    ///
482    /// // Now drop the guard. This should stop the timer.
483    /// drop(guard);
484    /// thread::sleep(std::time::Duration::new(0, 100));
485    ///
486    /// // Let's check that the count stops increasing.
487    /// let count_start = *count.lock().unwrap();
488    /// thread::sleep(std::time::Duration::new(1, 0));
489    /// let count_stop =  *count.lock().unwrap();
490    /// assert_eq!(count_start, count_stop);
491    /// ```
492    pub fn schedule_repeating<F>(&self, repeat: Duration, cb: F) -> Guard
493        where F: 'static + FnMut() + Send
494    {
495        self.base.schedule_repeating(repeat, Box::new(cb))
496    }
497
498    /// Schedule a callback for execution at a given time, then once
499    /// per interval. A typical use case is to execute code once per
500    /// day at 12am.
501    ///
502    /// Callbacks are guaranteed to never be called before their
503    /// date. However, it is possible that they will be called a
504    /// little after it.
505    ///
506    /// This method returns a `Guard` object. If that `Guard` is
507    /// dropped, repeat is stopped.
508    ///
509    ///
510    /// # Performance
511    ///
512    /// The callback is executed on the Scheduler thread. It should
513    /// therefore terminate very quickly, or risk causing delaying
514    /// other callbacks.
515    ///
516    /// # Failures
517    ///
518    /// Any failure in `cb` will scheduler thread and progressively
519    /// contaminate the Timer and the calling thread itself. You have
520    /// been warned.
521    pub fn schedule<F, T>(&self, date: DateTime<T>, repeat: Option<Duration>, cb: F) -> Guard
522        where F: 'static + FnMut() + Send, T : chrono::offset::TimeZone
523    {
524        self.base.schedule(date, repeat, Box::new(cb))
525    }
526}
527
528/// A timer, used to schedule delivery of messages at a later date.
529///
530/// In the current implementation, each timer is executed as two
531/// threads. The _Scheduler_ thread is in charge of maintaining the
532/// queue of messages to deliver and of actually deliverying them. The
533/// _Communication_ thread is in charge of communicating with the
534/// _Scheduler_ thread (which requires acquiring a possibly-long-held
535/// Mutex) without blocking the caller thread.
536///
537/// Similar functionality could be implemented using the generic Timer
538/// type, however, using MessageTimer has two performance advantages
539/// over doing so. First, MessageTimer does not need to heap allocate
540/// a closure for each scheduled item, since the messages to queue are
541/// passed directly. Second, MessageTimer avoids the dynamic dispatch
542/// overhead associated with invoking the closure functions.
543pub struct MessageTimer<T>
544    where T : 'static + Send + Clone {
545    base: TimerBase<T>
546}
547
548impl <T> MessageTimer<T>
549    where T : 'static + Send + Clone {
550    /// Create a message timer.
551    ///
552    /// This immediatey launches two threads, which will remain
553    /// launched until the timer is dropped. As expected, the threads
554    /// spend most of their life waiting for instructions.
555    pub fn new(tx: Sender<T>) -> Self {
556        MessageTimer {
557            base : TimerBase::new(DeliveryExecutor { tx : tx })
558        }
559    }
560
561    /// As `new()`, but with a manually specified initial capaicty.
562    pub fn with_capacity(tx: Sender<T>, capacity: usize) -> Self {
563        MessageTimer {
564            base : TimerBase::with_capacity(DeliveryExecutor { tx : tx }, capacity)
565        }
566    }
567
568    /// Schedule a message for delivery after a delay.
569    ///
570    /// Messages are guaranteed to never be delivered before the
571    /// delay. However, it is possible that they will be delivered a
572    /// little after the delay.
573    ///
574    /// If the delay is negative or 0, the message is delivered as
575    /// soon as possible.
576    ///
577    /// This method returns a `Guard` object. If that `Guard` is
578    /// dropped, delivery is cancelled.
579    ///
580    ///
581    /// # Example
582    ///
583    /// ```
584    /// extern crate timer;
585    /// extern crate chrono;
586    /// use std::sync::mpsc::channel;
587    ///
588    /// let (tx, rx) = channel();
589    /// let timer = timer::MessageTimer::new(tx);
590    /// let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
591    ///
592    /// rx.recv().unwrap();
593    /// println!("This code has been executed after 3 seconds");
594    /// ```
595    pub fn schedule_with_delay(&self, delay: Duration, msg : T) -> Guard {
596        self.base.schedule_with_delay(delay, msg)
597    }
598
599    /// Schedule a message for delivery at a given date.
600    ///
601    /// Messages are guaranteed to never be delivered before their
602    /// date. However, it is possible that they will be delivered a
603    /// little after it.
604    ///
605    /// If the date is in the past, the message is delivered as soon
606    /// as possible.
607    ///
608    /// This method returns a `Guard` object. If that `Guard` is
609    /// dropped, delivery is cancelled.
610    ///
611    pub fn schedule_with_date<D>(&self, date: DateTime<D>, msg : T) -> Guard
612        where D : chrono::offset::TimeZone
613    {
614        self.base.schedule_with_date(date, msg)
615    }
616
617    /// Schedule a message for delivery once per interval.
618    ///
619    /// Messages are guaranteed to never be delivered before their
620    /// date. However, it is possible that they will be delivered a
621    /// little after it.
622    ///
623    /// This method returns a `Guard` object. If that `Guard` is
624    /// dropped, repeat is stopped.
625    ///
626    ///
627    /// # Performance
628    ///
629    /// The message is cloned on the Scheduler thread. Cloning of
630    /// messages should therefore succeed very quickly, or risk
631    /// delaying other messages.
632    ///
633    /// # Failures
634    ///
635    /// Any failure in cloning of messages will occur on the scheduler thread
636    /// and will contaminate the Timer and the calling thread itself. You have
637    /// been warned.
638    ///
639    /// # Example
640    ///
641    /// ```
642    /// extern crate timer;
643    /// extern crate chrono;
644    /// use std::sync::mpsc::channel;
645    ///
646    /// let (tx, rx) = channel();
647    /// let timer = timer::MessageTimer::new(tx);
648    ///
649    /// // Start repeating.
650    /// let guard = timer.schedule_repeating(chrono::Duration::milliseconds(5), 0);
651    ///
652    /// let mut count = 0;
653    /// while count < 5 {
654    ///   let _ = rx.recv();
655    ///   println!("Prints every 5 milliseconds");
656    ///   count += 1;
657    /// }
658    /// ```
659    pub fn schedule_repeating(&self, repeat: Duration, msg : T) -> Guard
660    {
661        self.base.schedule_repeating(repeat, msg)
662    }
663
664    /// Schedule a message for delivery at a given time, then once
665    /// per interval. A typical use case is to execute code once per
666    /// day at 12am.
667    ///
668    /// Messages are guaranteed to never be delivered before their
669    /// date. However, it is possible that they will be delivered a
670    /// little after it.
671    ///
672    /// This method returns a `Guard` object. If that `Guard` is
673    /// dropped, repeat is stopped.
674    ///
675    /// # Performance
676    ///
677    /// The message is cloned on the Scheduler thread. Cloning of
678    /// messages should therefore succeed very quickly, or risk
679    /// delaying other messages.
680    ///
681    /// # Failures
682    ///
683    /// Any failure in cloning of messages will occur on the scheduler thread
684    /// and will contaminate the Timer and the calling thread itself. You have
685    /// been warned.
686    pub fn schedule<D>(&self, date: DateTime<D>, repeat: Option<Duration>, msg : T) -> Guard
687        where D : chrono::offset::TimeZone
688    {
689        self.base.schedule(date, repeat, msg)
690    }
691}
692
693/// A value scoping a schedule. When this value is dropped, the
694/// schedule is cancelled.
695#[derive(Clone)]
696pub struct Guard {
697    should_execute: Arc<AtomicBool>,
698    ignore_drop: bool
699}
700impl Guard {
701    fn new() -> Self {
702        Guard {
703            should_execute: Arc::new(AtomicBool::new(true)),
704            ignore_drop: false
705        }
706    }
707    fn should_execute(&self) -> bool {
708        self.should_execute.load(AtomicOrdering::Relaxed)
709    }
710
711    /// Ignores the guard, preventing it from disabling the scheduled
712    /// item. This can be used to avoid maintaining a Guard handle
713    /// for items that will never be cancelled.
714    pub fn ignore(mut self) {
715        self.ignore_drop = true;
716    }
717}
718impl Drop for Guard {
719    /// Cancel a schedule.
720    fn drop(&mut self) {
721        if !self.ignore_drop {
722            self.should_execute.store(false, AtomicOrdering::Relaxed)
723        }
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    extern crate std;
730    use super::*;
731    use std::sync::mpsc::channel;
732    use std::sync::{Arc, Mutex};
733    use std::thread;
734    use chrono::{Duration, Utc};
735
736    #[test]
737    fn test_schedule_with_delay() {
738        let timer = Timer::new();
739        let (tx, rx) = channel();
740        let mut guards = vec![];
741
742        // Schedule a number of callbacks in an arbitrary order, make sure
743        // that they are executed in the right order.
744        let mut delays = vec![1, 5, 3, -1];
745        let start = Utc::now();
746        for i in delays.clone() {
747            println!("Scheduling for execution in {} seconds", i);
748            let tx = tx.clone();
749            guards.push(timer.schedule_with_delay(Duration::seconds(i), move || {
750                println!("Callback {}", i);
751                tx.send(i).unwrap();
752            }));
753        }
754
755        delays.sort();
756        for (i, msg) in (0..delays.len()).zip(rx.iter()) {
757            let elapsed = Utc::now().signed_duration_since(start).num_seconds();
758            println!("Received message {} after {} seconds", msg, elapsed);
759            assert_eq!(msg, delays[i]);
760            assert!(delays[i] <= elapsed && elapsed <= delays[i] + 3, "We have waited {} seconds, expecting [{}, {}]", elapsed, delays[i], delays[i] + 3);
761        }
762
763        // Now make sure that callbacks that are designed to be executed
764        // immediately are executed quickly.
765        let start = Utc::now();
766        for i in vec![10, 0] {
767            println!("Scheduling for execution in {} seconds", i);
768            let tx = tx.clone();
769            guards.push(timer.schedule_with_delay(Duration::seconds(i), move || {
770                println!("Callback {}", i);
771                tx.send(i).unwrap();
772            }));
773        }
774
775        assert_eq!(rx.recv().unwrap(), 0);
776        assert!(Utc::now().signed_duration_since(start) <= Duration::seconds(1));
777    }
778
779    #[test]
780    fn test_message_timer() {
781        let (tx, rx) = channel();
782        let timer = MessageTimer::new(tx);
783        let start = Utc::now();
784
785        let mut delays = vec!(400, 300, 100, 500, 200);
786        for delay in delays.clone() {
787            timer.schedule_with_delay(Duration::milliseconds(delay), delay).ignore();
788        }
789
790        delays.sort();
791        for delay in delays {
792            assert_eq!(rx.recv().unwrap(), delay);
793        }
794        assert!(Utc::now().signed_duration_since(start) <= Duration::seconds(1));
795    }
796
797    #[test]
798    fn test_guards() {
799        println!("Testing that callbacks aren't called if the guard is dropped");
800        let timer = Timer::new();
801        let called = Arc::new(Mutex::new(false));
802
803        for i in 0..10 {
804            let called = called.clone();
805            timer.schedule_with_delay(Duration::milliseconds(i), move || {
806                *called.lock().unwrap() = true;
807            });
808        }
809
810        thread::sleep(std::time::Duration::new(1, 0));
811        assert_eq!(*called.lock().unwrap(), false);
812    }
813
814    #[test]
815    fn test_guard_ignore() {
816        let timer = Timer::new();
817        let called = Arc::new(Mutex::new(false));
818
819        {
820            let called = called.clone();
821            timer.schedule_with_delay(Duration::milliseconds(1), move || {
822                *called.lock().unwrap() = true;
823            }).ignore();
824        }
825
826        thread::sleep(std::time::Duration::new(1, 0));
827        assert_eq!(*called.lock().unwrap(), true);
828    }
829
830    struct NoCloneMessage;
831
832    impl Clone for NoCloneMessage {
833        fn clone(&self) -> Self {
834            panic!("TestMessage should not be cloned");
835        }
836    }
837
838    #[test]
839    fn test_no_clone() {
840        // Make sure that, if no schedule is supplied to a MessageTimer
841        // the message instances are not cloned.
842        let (tx, rx) = channel();
843        let timer = MessageTimer::new(tx);
844        timer.schedule_with_delay(Duration::milliseconds(0), NoCloneMessage).ignore();
845        timer.schedule_with_delay(Duration::milliseconds(0), NoCloneMessage).ignore();
846        
847        for _ in 0..2 {
848            let _  = rx.recv();
849        }
850    }
851}