timer/lib.rs
1//! A simple timer, used to enqueue operations meant to be executed at
2//! a given time or after a given delay.
3
4extern crate chrono;
5
6use std::cmp::Ordering;
7use std::thread;
8use std::sync::atomic::AtomicBool;
9use std::sync::atomic::Ordering as AtomicOrdering;
10use std::sync::{Arc, Mutex, Condvar};
11use std::sync::mpsc::{channel, Sender};
12use std::collections::BinaryHeap;
13use chrono::{Duration, DateTime};
14use chrono::offset::Utc;
15
16/// An item scheduled for delayed execution.
17struct Schedule<T> {
18 /// The instant at which to execute.
19 date: DateTime<Utc>,
20
21 /// The schedule data.
22 data : T,
23
24 /// A mechanism to cancel execution of an item.
25 guard: Guard,
26
27 /// If `Some(d)`, the item must be repeated every interval of
28 /// length `d`, until cancelled.
29 repeat: Option<Duration>
30}
31impl <T> Ord for Schedule<T> {
32 fn cmp(&self, other: &Self) -> Ordering {
33 self.date.cmp(&other.date).reverse()
34 }
35}
36impl <T> PartialOrd for Schedule<T> {
37 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
38 self.date.partial_cmp(&other.date).map(|ord| ord.reverse())
39 }
40}
41impl <T> Eq for Schedule<T> {
42}
43impl <T> PartialEq for Schedule<T> {
44 fn eq(&self, other: &Self) -> bool {
45 self.date.eq(&other.date)
46 }
47}
48
49/// An operation to be sent across threads.
50enum Op<T> {
51 /// Schedule a new item for execution.
52 Schedule(Schedule<T>),
53
54 /// Stop the thread.
55 Stop
56}
57
58/// A mutex-based kind-of-channel used to communicate between the
59/// Communication thread and the Scheuler thread.
60struct WaiterChannel<T> {
61 /// Pending messages.
62 messages: Mutex<Vec<Op<T>>>,
63 /// A condition variable used for waiting.
64 condvar: Condvar,
65}
66impl <T> WaiterChannel<T> {
67 fn with_capacity(cap: usize) -> Self {
68 WaiterChannel {
69 messages: Mutex::new(Vec::with_capacity(cap)),
70 condvar: Condvar::new(),
71 }
72 }
73}
74
75/// A trait that allows configurable execution of scheduled item
76/// on the scheduler thread.
77trait Executor<T> {
78 // Due to difference in use between Box<FnMut()> and most other data
79 // types, this trait requires implementors to provide two implementations
80 // of execute. While both of these functions execute the data item
81 // they differ on whether they make an equivalent data item available
82 // to the Scheduler to store in recurring schedules.
83 //
84 // execute() is called whenever a non-recurring data item needs
85 // to be executed, and consumes the data item in the process.
86 //
87 // execute_clone() is called whenever a recurring data item needs
88 // to be executed, and produces a new equivalent data item. This
89 // function should be more or less equivalent to:
90 //
91 // fn execute_clone(&mut self, data : T) -> T {
92 // self.execute(data.clone());
93 // data
94 // }
95
96 fn execute(&mut self, data : T);
97
98 fn execute_clone(&mut self, data : T) -> T;
99}
100
101/// An executor implementation for executing callbacks on the scheduler
102/// thread.
103struct CallbackExecutor;
104
105impl Executor<Box<FnMut() + Send>> for CallbackExecutor {
106 fn execute(&mut self, mut data : Box<FnMut() + Send>) {
107 data();
108 }
109
110 fn execute_clone(&mut self, mut data : Box<FnMut() + Send>) -> Box<FnMut() + Send> {
111 data();
112 data
113 }
114}
115
116/// An executor implementation for delivering messages to a channel.
117struct DeliveryExecutor<T>
118 where T : 'static + Send {
119 /// The channel to deliver messages to.
120 tx : Sender<T>
121}
122
123impl <T> Executor<T> for DeliveryExecutor<T>
124 where T : 'static + Send + Clone {
125 fn execute(&mut self, data : T) {
126 let _ = self.tx.send(data);
127 }
128
129 fn execute_clone(&mut self, data : T) -> T {
130 let _ = self.tx.send(data.clone());
131 data
132 }
133}
134
135
136struct Scheduler<T,E> where E : Executor<T> {
137 waiter: Arc<WaiterChannel<T>>,
138 heap: BinaryHeap<Schedule<T>>,
139 executor: E
140}
141
142impl <T,E> Scheduler<T,E> where E : Executor<T> {
143 fn with_capacity(waiter: Arc<WaiterChannel<T>>, executor : E, capacity: usize) -> Self {
144 Scheduler {
145 waiter: waiter,
146 executor: executor,
147 heap: BinaryHeap::with_capacity(capacity),
148 }
149 }
150
151 fn run(&mut self) {
152 enum Sleep {
153 NotAtAll,
154 UntilAwakened,
155 AtMost(Duration)
156 }
157
158 let ref waiter = *self.waiter;
159 loop {
160 let mut lock = waiter.messages.lock().unwrap();
161
162 // Pop all messages.
163 for msg in lock.drain(..) {
164 match msg {
165 Op::Stop => {
166 return;
167 }
168 Op::Schedule(sched) => self.heap.push(sched),
169 }
170 }
171
172 // Pop all the callbacks that are ready.
173
174 // If we don't find
175 let mut sleep = Sleep::UntilAwakened;
176 loop {
177 let now = Utc::now();
178 if let Some(sched) = self.heap.peek() {
179 if sched.date > now {
180 // First item is not ready yet, so we need to
181 // wait until it is or something happens.
182 sleep = Sleep::AtMost(sched.date.signed_duration_since(now));
183 break;
184 }
185 } else {
186 // Schedule is empty, nothing to do, wait until something happens.
187 break;
188 }
189 // At this stage, we have an item that has reached
190 // execution time. The `unwrap()` is guaranteed to
191 // succeed.
192 let sched = self.heap.pop().unwrap();
193 if !sched.guard.should_execute() {
194 // Execution has been cancelled, skip this item.
195 continue;
196 }
197
198 if let Some(delta) = sched.repeat {
199 let data = self.executor.execute_clone(sched.data);
200
201 // This is a repeating timer, so we need to
202 // enqueue the next call.
203 sleep = Sleep::NotAtAll;
204 self.heap.push(Schedule {
205 date: sched.date + delta,
206 data: data,
207 guard: sched.guard,
208 repeat: Some(delta)
209 });
210 } else {
211 self.executor.execute(sched.data);
212 }
213 }
214
215 match sleep {
216 Sleep::UntilAwakened => {
217 let _ = waiter.condvar.wait(lock);
218 },
219 Sleep::AtMost(delay) => {
220 let sec = delay.num_seconds();
221 let ns = (delay - Duration::seconds(sec)).num_nanoseconds().unwrap(); // This `unwrap()` asserts that the number of ns is not > 1_000_000_000. Since we just substracted the number of seconds, the assertion should always pass.
222 let duration = std::time::Duration::new(sec as u64, ns as u32);
223 let _ = waiter.condvar.wait_timeout(lock, duration);
224 },
225 Sleep::NotAtAll => {}
226 }
227 }
228 }
229}
230
231/// Shared coordination logic for timer threads.
232pub struct TimerBase<T>
233 where T : 'static + Send {
234 /// Sender used to communicate with the _Communication_ thread. In
235 /// turn, this thread will send
236 tx: Sender<Op<T>>,
237}
238
239impl <T> Drop for TimerBase<T>
240 where T : 'static + Send {
241 /// Stop the timer threads.
242 fn drop(&mut self) {
243 self.tx.send(Op::Stop).unwrap();
244 }
245}
246
247impl <T> TimerBase<T>
248 where T : 'static + Send {
249 /// Create a timer base.
250 ///
251 /// This immediatey launches two threads, which will remain
252 /// launched until the timer is dropped. As expected, the threads
253 /// spend most of their life waiting for instructions.
254 fn new<E>(executor : E) -> Self
255 where E : 'static + Executor<T> + Send {
256 Self::with_capacity(executor, 32)
257 }
258
259 /// As `new()`, but with a manually specified initial capaicty.
260 fn with_capacity<E>(executor : E, capacity: usize) -> Self
261 where E : 'static + Executor<T> + Send {
262 let waiter_send = Arc::new(WaiterChannel::with_capacity(capacity));
263 let waiter_recv = waiter_send.clone();
264
265 // Spawn a first thread, whose sole role is to dispatch
266 // messages to the second thread without having to wait too
267 // long for the mutex.
268 let (tx, rx) = channel();
269 thread::spawn(move || {
270 use Op::*;
271 let ref waiter = *waiter_send;
272 for msg in rx.iter() {
273 let mut vec = waiter.messages.lock().unwrap();
274 match msg {
275 Schedule(sched) => {
276 vec.push(Schedule(sched));
277 waiter.condvar.notify_one();
278 }
279 Stop => {
280 vec.clear();
281 vec.push(Op::Stop);
282 waiter.condvar.notify_one();
283 return;
284 }
285 }
286 }
287 });
288
289 // Spawn a second thread, in charge of scheduling.
290 thread::Builder::new().name("Timer thread".to_owned()).spawn(move || {
291 let mut scheduler = Scheduler::with_capacity(waiter_recv, executor, capacity);
292 scheduler.run()
293 }).unwrap();
294 TimerBase {
295 tx: tx
296 }
297 }
298
299 pub fn schedule_with_delay(&self, delay: Duration, data : T) -> Guard {
300 self.schedule_with_date(Utc::now() + delay, data)
301 }
302
303 pub fn schedule_with_date<D>(&self, date: DateTime<D>, data : T) -> Guard
304 where D : chrono::offset::TimeZone
305 {
306 self.schedule(date, None, data)
307 }
308
309 pub fn schedule_repeating(&self, repeat: Duration, data : T) -> Guard
310 {
311 self.schedule(Utc::now() + repeat, Some(repeat), data)
312 }
313
314 pub fn schedule<D>(&self, date: DateTime<D>, repeat: Option<Duration>, data : T) -> Guard
315 where D : chrono::offset::TimeZone
316 {
317 let guard = Guard::new();
318 self.tx.send(Op::Schedule(Schedule {
319 date: date.with_timezone(&Utc),
320 data: data,
321 guard: guard.clone(),
322 repeat: repeat
323 })).unwrap();
324 guard
325 }
326}
327
328/// A timer, used to schedule execution of callbacks at a later date.
329///
330/// In the current implementation, each timer is executed as two
331/// threads. The _Scheduler_ thread is in charge of maintaining the
332/// queue of callbacks to execute and of actually executing them. The
333/// _Communication_ thread is in charge of communicating with the
334/// _Scheduler_ thread (which requires acquiring a possibly-long-held
335/// Mutex) without blocking the caller thread.
336pub struct Timer {
337 base: TimerBase<Box<FnMut() + Send>>
338}
339
340impl Timer {
341 /// Create a timer.
342 ///
343 /// This immediatey launches two threads, which will remain
344 /// launched until the timer is dropped. As expected, the threads
345 /// spend most of their life waiting for instructions.
346 pub fn new() -> Self {
347 Timer { base : TimerBase::new(CallbackExecutor) }
348 }
349
350 /// As `new()`, but with a manually specified initial capaicty.
351 pub fn with_capacity(capacity: usize) -> Self {
352 Timer { base : TimerBase::with_capacity(CallbackExecutor, capacity) }
353 }
354
355 /// Schedule a callback for execution after a delay.
356 ///
357 /// Callbacks are guaranteed to never be called before the
358 /// delay. However, it is possible that they will be called a
359 /// little after the delay.
360 ///
361 /// If the delay is negative or 0, the callback is executed as
362 /// soon as possible.
363 ///
364 /// This method returns a `Guard` object. If that `Guard` is
365 /// dropped, execution is cancelled.
366 ///
367 /// # Performance
368 ///
369 /// The callback is executed on the Scheduler thread. It should
370 /// therefore terminate very quickly, or risk causing delaying
371 /// other callbacks.
372 ///
373 /// # Failures
374 ///
375 /// Any failure in `cb` will scheduler thread and progressively
376 /// contaminate the Timer and the calling thread itself. You have
377 /// been warned.
378 ///
379 /// # Example
380 ///
381 /// ```
382 /// extern crate timer;
383 /// extern crate chrono;
384 /// use std::sync::mpsc::channel;
385 ///
386 /// let timer = timer::Timer::new();
387 /// let (tx, rx) = channel();
388 ///
389 /// let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), move || {
390 /// // This closure is executed on the scheduler thread,
391 /// // so we want to move it away asap.
392 ///
393 /// let _ignored = tx.send(()); // Avoid unwrapping here.
394 /// });
395 ///
396 /// rx.recv().unwrap();
397 /// println!("This code has been executed after 3 seconds");
398 /// ```
399 pub fn schedule_with_delay<F>(&self, delay: Duration, cb: F) -> Guard
400 where F: 'static + FnMut() + Send {
401 self.base.schedule_with_delay(delay, Box::new(cb))
402 }
403
404 /// Schedule a callback for execution at a given date.
405 ///
406 /// Callbacks are guaranteed to never be called before their
407 /// date. However, it is possible that they will be called a
408 /// little after it.
409 ///
410 /// If the date is in the past, the callback is executed as soon
411 /// as possible.
412 ///
413 /// This method returns a `Guard` object. If that `Guard` is
414 /// dropped, execution is cancelled.
415 ///
416 ///
417 /// # Performance
418 ///
419 /// The callback is executed on the Scheduler thread. It should
420 /// therefore terminate very quickly, or risk causing delaying
421 /// other callbacks.
422 ///
423 /// # Failures
424 ///
425 /// Any failure in `cb` will scheduler thread and progressively
426 /// contaminate the Timer and the calling thread itself. You have
427 /// been warned.
428 pub fn schedule_with_date<F, T>(&self, date: DateTime<T>, cb: F) -> Guard
429 where F: 'static + FnMut() + Send, T : chrono::offset::TimeZone
430 {
431 self.base.schedule_with_date(date, Box::new(cb))
432 }
433
434 /// Schedule a callback for execution once per interval.
435 ///
436 /// Callbacks are guaranteed to never be called before their
437 /// date. However, it is possible that they will be called a
438 /// little after it.
439 ///
440 /// This method returns a `Guard` object. If that `Guard` is
441 /// dropped, repeat is stopped.
442 ///
443 ///
444 /// # Performance
445 ///
446 /// The callback is executed on the Scheduler thread. It should
447 /// therefore terminate very quickly, or risk causing delaying
448 /// other callbacks.
449 ///
450 /// # Failures
451 ///
452 /// Any failure in `cb` will scheduler thread and progressively
453 /// contaminate the Timer and the calling thread itself. You have
454 /// been warned.
455 ///
456 /// # Example
457 ///
458 /// ```
459 /// extern crate timer;
460 /// extern crate chrono;
461 /// use std::thread;
462 /// use std::sync::{Arc, Mutex};
463 ///
464 /// let timer = timer::Timer::new();
465 /// // Number of times the callback has been called.
466 /// let count = Arc::new(Mutex::new(0));
467 ///
468 /// // Start repeating. Each callback increases `count`.
469 /// let guard = {
470 /// let count = count.clone();
471 /// timer.schedule_repeating(chrono::Duration::milliseconds(5), move || {
472 /// *count.lock().unwrap() += 1;
473 /// })
474 /// };
475 ///
476 /// // Sleep one second. The callback should be called ~200 times.
477 /// thread::sleep(std::time::Duration::new(1, 0));
478 /// let count_result = *count.lock().unwrap();
479 /// assert!(190 <= count_result && count_result <= 210,
480 /// "The timer was called {} times", count_result);
481 ///
482 /// // Now drop the guard. This should stop the timer.
483 /// drop(guard);
484 /// thread::sleep(std::time::Duration::new(0, 100));
485 ///
486 /// // Let's check that the count stops increasing.
487 /// let count_start = *count.lock().unwrap();
488 /// thread::sleep(std::time::Duration::new(1, 0));
489 /// let count_stop = *count.lock().unwrap();
490 /// assert_eq!(count_start, count_stop);
491 /// ```
492 pub fn schedule_repeating<F>(&self, repeat: Duration, cb: F) -> Guard
493 where F: 'static + FnMut() + Send
494 {
495 self.base.schedule_repeating(repeat, Box::new(cb))
496 }
497
498 /// Schedule a callback for execution at a given time, then once
499 /// per interval. A typical use case is to execute code once per
500 /// day at 12am.
501 ///
502 /// Callbacks are guaranteed to never be called before their
503 /// date. However, it is possible that they will be called a
504 /// little after it.
505 ///
506 /// This method returns a `Guard` object. If that `Guard` is
507 /// dropped, repeat is stopped.
508 ///
509 ///
510 /// # Performance
511 ///
512 /// The callback is executed on the Scheduler thread. It should
513 /// therefore terminate very quickly, or risk causing delaying
514 /// other callbacks.
515 ///
516 /// # Failures
517 ///
518 /// Any failure in `cb` will scheduler thread and progressively
519 /// contaminate the Timer and the calling thread itself. You have
520 /// been warned.
521 pub fn schedule<F, T>(&self, date: DateTime<T>, repeat: Option<Duration>, cb: F) -> Guard
522 where F: 'static + FnMut() + Send, T : chrono::offset::TimeZone
523 {
524 self.base.schedule(date, repeat, Box::new(cb))
525 }
526}
527
528/// A timer, used to schedule delivery of messages at a later date.
529///
530/// In the current implementation, each timer is executed as two
531/// threads. The _Scheduler_ thread is in charge of maintaining the
532/// queue of messages to deliver and of actually deliverying them. The
533/// _Communication_ thread is in charge of communicating with the
534/// _Scheduler_ thread (which requires acquiring a possibly-long-held
535/// Mutex) without blocking the caller thread.
536///
537/// Similar functionality could be implemented using the generic Timer
538/// type, however, using MessageTimer has two performance advantages
539/// over doing so. First, MessageTimer does not need to heap allocate
540/// a closure for each scheduled item, since the messages to queue are
541/// passed directly. Second, MessageTimer avoids the dynamic dispatch
542/// overhead associated with invoking the closure functions.
543pub struct MessageTimer<T>
544 where T : 'static + Send + Clone {
545 base: TimerBase<T>
546}
547
548impl <T> MessageTimer<T>
549 where T : 'static + Send + Clone {
550 /// Create a message timer.
551 ///
552 /// This immediatey launches two threads, which will remain
553 /// launched until the timer is dropped. As expected, the threads
554 /// spend most of their life waiting for instructions.
555 pub fn new(tx: Sender<T>) -> Self {
556 MessageTimer {
557 base : TimerBase::new(DeliveryExecutor { tx : tx })
558 }
559 }
560
561 /// As `new()`, but with a manually specified initial capaicty.
562 pub fn with_capacity(tx: Sender<T>, capacity: usize) -> Self {
563 MessageTimer {
564 base : TimerBase::with_capacity(DeliveryExecutor { tx : tx }, capacity)
565 }
566 }
567
568 /// Schedule a message for delivery after a delay.
569 ///
570 /// Messages are guaranteed to never be delivered before the
571 /// delay. However, it is possible that they will be delivered a
572 /// little after the delay.
573 ///
574 /// If the delay is negative or 0, the message is delivered as
575 /// soon as possible.
576 ///
577 /// This method returns a `Guard` object. If that `Guard` is
578 /// dropped, delivery is cancelled.
579 ///
580 ///
581 /// # Example
582 ///
583 /// ```
584 /// extern crate timer;
585 /// extern crate chrono;
586 /// use std::sync::mpsc::channel;
587 ///
588 /// let (tx, rx) = channel();
589 /// let timer = timer::MessageTimer::new(tx);
590 /// let _guard = timer.schedule_with_delay(chrono::Duration::seconds(3), 3);
591 ///
592 /// rx.recv().unwrap();
593 /// println!("This code has been executed after 3 seconds");
594 /// ```
595 pub fn schedule_with_delay(&self, delay: Duration, msg : T) -> Guard {
596 self.base.schedule_with_delay(delay, msg)
597 }
598
599 /// Schedule a message for delivery at a given date.
600 ///
601 /// Messages are guaranteed to never be delivered before their
602 /// date. However, it is possible that they will be delivered a
603 /// little after it.
604 ///
605 /// If the date is in the past, the message is delivered as soon
606 /// as possible.
607 ///
608 /// This method returns a `Guard` object. If that `Guard` is
609 /// dropped, delivery is cancelled.
610 ///
611 pub fn schedule_with_date<D>(&self, date: DateTime<D>, msg : T) -> Guard
612 where D : chrono::offset::TimeZone
613 {
614 self.base.schedule_with_date(date, msg)
615 }
616
617 /// Schedule a message for delivery once per interval.
618 ///
619 /// Messages are guaranteed to never be delivered before their
620 /// date. However, it is possible that they will be delivered a
621 /// little after it.
622 ///
623 /// This method returns a `Guard` object. If that `Guard` is
624 /// dropped, repeat is stopped.
625 ///
626 ///
627 /// # Performance
628 ///
629 /// The message is cloned on the Scheduler thread. Cloning of
630 /// messages should therefore succeed very quickly, or risk
631 /// delaying other messages.
632 ///
633 /// # Failures
634 ///
635 /// Any failure in cloning of messages will occur on the scheduler thread
636 /// and will contaminate the Timer and the calling thread itself. You have
637 /// been warned.
638 ///
639 /// # Example
640 ///
641 /// ```
642 /// extern crate timer;
643 /// extern crate chrono;
644 /// use std::sync::mpsc::channel;
645 ///
646 /// let (tx, rx) = channel();
647 /// let timer = timer::MessageTimer::new(tx);
648 ///
649 /// // Start repeating.
650 /// let guard = timer.schedule_repeating(chrono::Duration::milliseconds(5), 0);
651 ///
652 /// let mut count = 0;
653 /// while count < 5 {
654 /// let _ = rx.recv();
655 /// println!("Prints every 5 milliseconds");
656 /// count += 1;
657 /// }
658 /// ```
659 pub fn schedule_repeating(&self, repeat: Duration, msg : T) -> Guard
660 {
661 self.base.schedule_repeating(repeat, msg)
662 }
663
664 /// Schedule a message for delivery at a given time, then once
665 /// per interval. A typical use case is to execute code once per
666 /// day at 12am.
667 ///
668 /// Messages are guaranteed to never be delivered before their
669 /// date. However, it is possible that they will be delivered a
670 /// little after it.
671 ///
672 /// This method returns a `Guard` object. If that `Guard` is
673 /// dropped, repeat is stopped.
674 ///
675 /// # Performance
676 ///
677 /// The message is cloned on the Scheduler thread. Cloning of
678 /// messages should therefore succeed very quickly, or risk
679 /// delaying other messages.
680 ///
681 /// # Failures
682 ///
683 /// Any failure in cloning of messages will occur on the scheduler thread
684 /// and will contaminate the Timer and the calling thread itself. You have
685 /// been warned.
686 pub fn schedule<D>(&self, date: DateTime<D>, repeat: Option<Duration>, msg : T) -> Guard
687 where D : chrono::offset::TimeZone
688 {
689 self.base.schedule(date, repeat, msg)
690 }
691}
692
693/// A value scoping a schedule. When this value is dropped, the
694/// schedule is cancelled.
695#[derive(Clone)]
696pub struct Guard {
697 should_execute: Arc<AtomicBool>,
698 ignore_drop: bool
699}
700impl Guard {
701 fn new() -> Self {
702 Guard {
703 should_execute: Arc::new(AtomicBool::new(true)),
704 ignore_drop: false
705 }
706 }
707 fn should_execute(&self) -> bool {
708 self.should_execute.load(AtomicOrdering::Relaxed)
709 }
710
711 /// Ignores the guard, preventing it from disabling the scheduled
712 /// item. This can be used to avoid maintaining a Guard handle
713 /// for items that will never be cancelled.
714 pub fn ignore(mut self) {
715 self.ignore_drop = true;
716 }
717}
718impl Drop for Guard {
719 /// Cancel a schedule.
720 fn drop(&mut self) {
721 if !self.ignore_drop {
722 self.should_execute.store(false, AtomicOrdering::Relaxed)
723 }
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 extern crate std;
730 use super::*;
731 use std::sync::mpsc::channel;
732 use std::sync::{Arc, Mutex};
733 use std::thread;
734 use chrono::{Duration, Utc};
735
736 #[test]
737 fn test_schedule_with_delay() {
738 let timer = Timer::new();
739 let (tx, rx) = channel();
740 let mut guards = vec![];
741
742 // Schedule a number of callbacks in an arbitrary order, make sure
743 // that they are executed in the right order.
744 let mut delays = vec![1, 5, 3, -1];
745 let start = Utc::now();
746 for i in delays.clone() {
747 println!("Scheduling for execution in {} seconds", i);
748 let tx = tx.clone();
749 guards.push(timer.schedule_with_delay(Duration::seconds(i), move || {
750 println!("Callback {}", i);
751 tx.send(i).unwrap();
752 }));
753 }
754
755 delays.sort();
756 for (i, msg) in (0..delays.len()).zip(rx.iter()) {
757 let elapsed = Utc::now().signed_duration_since(start).num_seconds();
758 println!("Received message {} after {} seconds", msg, elapsed);
759 assert_eq!(msg, delays[i]);
760 assert!(delays[i] <= elapsed && elapsed <= delays[i] + 3, "We have waited {} seconds, expecting [{}, {}]", elapsed, delays[i], delays[i] + 3);
761 }
762
763 // Now make sure that callbacks that are designed to be executed
764 // immediately are executed quickly.
765 let start = Utc::now();
766 for i in vec![10, 0] {
767 println!("Scheduling for execution in {} seconds", i);
768 let tx = tx.clone();
769 guards.push(timer.schedule_with_delay(Duration::seconds(i), move || {
770 println!("Callback {}", i);
771 tx.send(i).unwrap();
772 }));
773 }
774
775 assert_eq!(rx.recv().unwrap(), 0);
776 assert!(Utc::now().signed_duration_since(start) <= Duration::seconds(1));
777 }
778
779 #[test]
780 fn test_message_timer() {
781 let (tx, rx) = channel();
782 let timer = MessageTimer::new(tx);
783 let start = Utc::now();
784
785 let mut delays = vec!(400, 300, 100, 500, 200);
786 for delay in delays.clone() {
787 timer.schedule_with_delay(Duration::milliseconds(delay), delay).ignore();
788 }
789
790 delays.sort();
791 for delay in delays {
792 assert_eq!(rx.recv().unwrap(), delay);
793 }
794 assert!(Utc::now().signed_duration_since(start) <= Duration::seconds(1));
795 }
796
797 #[test]
798 fn test_guards() {
799 println!("Testing that callbacks aren't called if the guard is dropped");
800 let timer = Timer::new();
801 let called = Arc::new(Mutex::new(false));
802
803 for i in 0..10 {
804 let called = called.clone();
805 timer.schedule_with_delay(Duration::milliseconds(i), move || {
806 *called.lock().unwrap() = true;
807 });
808 }
809
810 thread::sleep(std::time::Duration::new(1, 0));
811 assert_eq!(*called.lock().unwrap(), false);
812 }
813
814 #[test]
815 fn test_guard_ignore() {
816 let timer = Timer::new();
817 let called = Arc::new(Mutex::new(false));
818
819 {
820 let called = called.clone();
821 timer.schedule_with_delay(Duration::milliseconds(1), move || {
822 *called.lock().unwrap() = true;
823 }).ignore();
824 }
825
826 thread::sleep(std::time::Duration::new(1, 0));
827 assert_eq!(*called.lock().unwrap(), true);
828 }
829
830 struct NoCloneMessage;
831
832 impl Clone for NoCloneMessage {
833 fn clone(&self) -> Self {
834 panic!("TestMessage should not be cloned");
835 }
836 }
837
838 #[test]
839 fn test_no_clone() {
840 // Make sure that, if no schedule is supplied to a MessageTimer
841 // the message instances are not cloned.
842 let (tx, rx) = channel();
843 let timer = MessageTimer::new(tx);
844 timer.schedule_with_delay(Duration::milliseconds(0), NoCloneMessage).ignore();
845 timer.schedule_with_delay(Duration::milliseconds(0), NoCloneMessage).ignore();
846
847 for _ in 0..2 {
848 let _ = rx.recv();
849 }
850 }
851}