tokio_periodic/
lib.rs

1#[macro_use]
2extern crate tokio_core;
3extern crate mio;
4extern crate futures;
5
6use std::time::Duration;
7use tokio_core::reactor;
8use std::io::{Result, Read};
9
10/// Asynchronous, cross-platform periodic timer
11///
12/// This timer, when `reset` for the first time will become ready to `poll` every specified
13/// interval until it is dropped or `reset` again.
14///
15/// # Example
16///
17/// `PeriodicTimer` implements `futures::stream::Stream` and therefore interoperates with other
18/// futures well. In this example `PeriodicTimer` is used to print integers every second:
19///
20/// ```rust
21/// extern crate futures;
22/// extern crate tokio_core;
23/// extern crate tokio_periodic;
24///
25/// let mut core = tokio_core::reactor::Core::new().unwrap();
26/// let handle = core.handle();
27/// let timer = tokio_periodic::PeriodicTimer::new(&handle).unwrap();
28/// timer.reset(::std::time::Duration::new(1, 0));
29/// let digits = futures::stream::unfold(1, |v| if v < 3 { // for demonstration purposes stop at 3
30///     Some(futures::future::ok((v, v + 1)))
31/// } else {
32///     None
33/// });
34/// let mut timer_stream = futures::Stream::zip(timer, digits);
35/// while let Ok((Some(item), stream)) = core.run(futures::Stream::into_future(timer_stream)) {
36///     println!("{:?}", item);
37///     timer_stream = stream;
38/// }
39/// ```
40pub struct PeriodicTimer {
41    poll: reactor::PollEvented<imp::Timer>
42}
43
44impl PeriodicTimer {
45    /// Create a new `PeriodicTimer` associated with specified event loop handle
46    pub fn new(handle: &reactor::Handle) -> Result<PeriodicTimer> {
47        Ok(PeriodicTimer {
48            poll: try!(reactor::PollEvented::new(try!(imp::Timer::new()), handle))
49        })
50    }
51
52    /// Reset the timer to specified `interval`
53    ///
54    /// The previously active timer, if any, is cancelled before the new one is set. Once this
55    /// method returns, this timer will become ready to `poll` every `interval` duration.
56    ///
57    /// Due to some platforms not supporting nanosecond precision the interval duration is rounded
58    /// up to the nearest precision supported by the platform.
59    ///
60    /// If the `interval` is zero-duration, the timer is deactivated.
61    ///
62    /// # Errors
63    ///
64    /// In addition to the errors which may occur when interacting with system APIs, an error
65    /// condition may also occur when `interval` duration exceeds the maximum duration that can be
66    /// specified to the system.
67    pub fn reset(&self, interval: Duration) -> Result<()> {
68        imp::Timer::reset(self.poll.get_ref(), interval)
69    }
70}
71
72impl futures::stream::Stream for PeriodicTimer {
73    type Item = u64;
74    type Error = std::io::Error;
75
76    /// Poll the timer
77    ///
78    /// If the timer has fired at least once since previous call to `poll` or `reset`, this
79    /// resolves to a number of times the timer has fired.
80    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
81        let mut buf = [0xff; 8];
82        let bs = try_nb!(self.poll.read(&mut buf));
83        debug_assert_eq!(bs, 8);
84        Ok(futures::Async::Ready(Some(unsafe { *(buf.as_ptr() as *const u64) })))
85    }
86}
87
88#[cfg(target_os="linux")]
89mod imp {
90    extern crate libc;
91    use std::os::unix::io::RawFd;
92    use std::os::raw::c_int;
93    use std::io::Result;
94    use std::time::Duration;
95    use mio::{Poll, Token, Ready, PollOpt, Evented};
96    use mio::unix::{EventedFd, UnixReady};
97
98    pub struct Timer { fd: RawFd }
99
100    const TFD_CLOEXEC: c_int = libc::O_CLOEXEC;
101    const TFD_NONBLOCK: c_int = libc::O_NONBLOCK;
102
103    impl Timer {
104        pub fn new() -> Result<Timer> {
105            unsafe {
106                let ret = timerfd_create(libc::CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
107                if ret == -1 { return Err(::std::io::Error::last_os_error()); }
108                Ok(Timer {
109                    fd: ret
110                })
111            }
112        }
113
114        pub fn reset(&self, interval: Duration) -> Result<()> {
115            // This seriously needs a nicer way to convert.
116            let tspec = libc::timespec {
117                tv_sec: interval.as_secs() as _,
118                tv_nsec: interval.subsec_nanos() as _,
119            };
120            let itspec = itimerspec {
121                interval: tspec,
122                initial: tspec,
123            };
124            unsafe {
125                let ret = timerfd_settime(self.fd, 0, &itspec, ::std::ptr::null_mut());
126                if ret == -1 { return Err(::std::io::Error::last_os_error()); }
127                Ok(())
128            }
129        }
130    }
131
132    impl Evented for Timer {
133        fn register(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
134            EventedFd(&self.fd).register(poll, token, Ready::readable() | UnixReady::error(), opts)
135        }
136
137        fn reregister(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
138            EventedFd(&self.fd).reregister(poll, token, Ready::readable() | UnixReady::error(), opts)
139        }
140
141        fn deregister(&self, poll: &Poll) -> Result<()> {
142            EventedFd(&self.fd).deregister(poll)
143        }
144    }
145
146    impl ::std::io::Read for Timer {
147        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
148            let ret = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut _, buf.len()) };
149            if ret == -1 { return Err(::std::io::Error::last_os_error()); }
150            debug_assert_eq!(ret, 8);
151            Ok(ret as usize)
152        }
153    }
154
155    impl Drop for Timer {
156        fn drop(&mut self) {
157            unsafe {
158                libc::close(self.fd);
159            }
160        }
161    }
162
163    #[repr(C)]
164    struct itimerspec {
165        interval: libc::timespec,
166        initial: libc::timespec,
167    }
168    extern {
169        fn timerfd_create(clockid: c_int, flags: c_int) -> RawFd;
170        fn timerfd_settime(fd: RawFd, flags: c_int,
171                           new_ts: *const itimerspec,
172                           old_ts: *mut itimerspec) -> c_int;
173    }
174}
175
176#[cfg(any(target_os = "bitrig", target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
177          target_os = "macos", target_os = "netbsd", target_os = "openbsd"))]
178mod imp {
179    extern crate libc;
180    use std::os::unix::io::RawFd;
181    use std::io::Result;
182    use std::time::Duration;
183    use self::libc::{intptr_t, kevent, kqueue};
184    use mio::{Poll, Token, Ready, PollOpt, Evented};
185    use mio::unix::{EventedFd, UnixReady};
186
187    pub struct Timer { fd: RawFd }
188
189    impl Timer {
190        pub fn new() -> Result<Timer> {
191            unsafe {
192                let ret = kqueue();
193                if ret == -1 { return Err(::std::io::Error::last_os_error()); }
194                if libc::ioctl(ret, libc::FIOCLEX) == -1 {
195                    libc::fcntl(ret, libc::F_SETFD, libc::FD_CLOEXEC);
196                }
197                Ok(Timer { fd: ret })
198            }
199        }
200
201        pub fn reset(&self, interval: Duration) -> Result<()> {
202            let (ty, dur) = try!(Timer::duration_to_units(interval));
203            let en_flag = if dur == 0 { libc::EV_DISABLE } else { libc::EV_ENABLE };
204            let kevt = kevent {
205                ident: 0,
206                filter: libc::EVFILT_TIMER,
207                flags: libc::EV_ADD | en_flag,
208                fflags: ty,
209                data: dur,
210                udata: ::std::ptr::null_mut()
211            };
212            unsafe {
213                let ret = kevent(self.fd, &kevt, 1,
214                                       ::std::ptr::null_mut(), 0,
215                                       ::std::ptr::null());
216                if ret == -1 { return Err(::std::io::Error::last_os_error()); }
217                Ok(())
218            }
219        }
220
221        // intptr_t for time lolz?
222        fn duration_to_units(interval: Duration) -> Result<(u32, intptr_t)> {
223            let secs = interval.as_secs();
224            let nanos = interval.subsec_nanos() as u64;
225            let max = intptr_t::max_value() as u64;
226
227            if nanos > max || secs > max {
228                return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
229            }
230
231            let secs = secs as intptr_t;
232            let nanos = nanos as intptr_t;
233
234            if let Some(val) = secs.checked_mul(1_000_000_000)
235                                   .and_then(|v| v.checked_add(nanos)) {
236                return Ok((libc::NOTE_NSECONDS, val));
237            }
238
239            if nanos % 1_000 != 0 {
240                return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
241            }
242
243            let micros = nanos / 1_000;
244
245            if let Some(val) = secs.checked_mul(1_000_000)
246                                   .and_then(|v| v.checked_add(micros)) {
247                return Ok((libc::NOTE_USECONDS, val));
248            }
249
250            if micros % 1_000 != 0 {
251                return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
252            }
253
254            let millis = micros / 1_000;
255
256            if let Some(val) = secs.checked_mul(1_000)
257                                   .and_then(|v| v.checked_add(millis)) {
258                return Ok((NOTE_MSECONDS, val));
259            }
260
261            if millis % 1_000 != 0 {
262                return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
263            }
264
265            Ok((libc::NOTE_SECONDS, secs))
266        }
267    }
268
269    // On OS X MSECONDS is the default if nothing else is specified.
270    #[cfg(any(target_os = "ios", target_os = "macos"))]
271    const NOTE_MSECONDS: u32 = 0;
272    #[cfg(not(any(target_os = "ios", target_os = "macos")))]
273    const NOTE_MSECONDS: u32 = libc::NOTE_MSECONDS;
274
275    impl Evented for Timer {
276        fn register(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
277            EventedFd(&self.fd).register(poll, token, Ready::readable() | UnixReady::error(), opts)
278        }
279
280        fn reregister(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
281            EventedFd(&self.fd).reregister(poll, token, Ready::readable() | UnixReady::error(), opts)
282        }
283
284        fn deregister(&self, poll: &Poll) -> Result<()> {
285            EventedFd(&self.fd).deregister(poll)
286        }
287    }
288
289    impl ::std::io::Read for Timer {
290        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
291            unsafe {
292                // Need at least 8 bytes of buffer
293                if buf.len() < 8 { return Err(::std::io::Error::from_raw_os_error(libc::EINVAL)); }
294                let mut kevt: kevent = ::std::mem::uninitialized();
295                let tspec = libc::timespec { tv_sec: 0, tv_nsec: 0 };
296                let evts = kevent(self.fd, ::std::ptr::null(), 0,
297                                        &mut kevt, 1, &tspec);
298                if evts == -1 {
299                    return Err(::std::io::Error::last_os_error());
300                } else if evts == 1 {
301                    if kevt.filter != libc::EVFILT_TIMER { return Ok(0) } // wtf?
302                    let ptr = buf.as_mut_ptr() as *mut u64;
303                    *ptr = kevt.data as u64;
304                    return Ok(8)
305                } else {
306                    return Err(::mio::would_block())
307                }
308            }
309        }
310    }
311
312    impl Drop for Timer {
313        fn drop(&mut self) {
314            unsafe {
315                libc::close(self.fd);
316            }
317        }
318    }
319}
320
321#[cfg(target_os="windows")]
322mod imp {
323    extern crate winapi;
324    extern crate kernel32;
325    use std::sync::atomic;
326    use std::ptr;
327    use std::io::Result;
328    use std::time::Duration;
329    use mio::{Poll, Token, Ready, PollOpt, Evented, Registration, SetReadiness};
330
331    pub struct Timer {
332        inner: *mut Inner
333    }
334
335    pub struct Inner {
336        active: atomic::AtomicPtr<winapi::c_void>,
337        times_fired: atomic::AtomicUsize,
338        registration: ::std::sync::Mutex<Option<(Registration, SetReadiness)>>,
339
340    }
341
342    impl Timer {
343        pub fn new() -> Result<Timer> {
344            Ok(Timer {
345                inner: Box::into_raw(Box::new(Inner {
346                    active: atomic::AtomicPtr::new(ptr::null_mut()),
347                    times_fired: atomic::AtomicUsize::new(0),
348                    registration: ::std::sync::Mutex::new(None)
349                }))
350            })
351        }
352
353        pub fn reset(&self, interval: Duration) -> Result<()> {
354            unsafe extern "system" fn callback(data: winapi::PVOID,
355                                               _: winapi::BOOLEAN) {
356                let this: &Inner = &*(data as *mut _);
357                let _ = this.times_fired.fetch_add(1, atomic::Ordering::SeqCst);
358                match this.registration.lock().unwrap().as_ref() {
359                    Some(&(_, ref s)) => {
360                        // Can’t error from here, sadly.
361                        let _ = s.set_readiness(Ready::readable());
362                    },
363                    None => {}
364                }
365            }
366            unsafe {
367                let mut out = (*self.inner).active.swap(ptr::null_mut(),
368                                                        atomic::Ordering::SeqCst);
369                if !out.is_null() {
370                    let ret = kernel32::DeleteTimerQueueTimer(ptr::null_mut(), out,
371                                                              winapi::INVALID_HANDLE_VALUE);
372                    if ret == 0 { return Err(::std::io::Error::last_os_error()); }
373                }
374                (*self.inner).times_fired.store(0, atomic::Ordering::SeqCst);
375                let time_in_ms = try!(Timer::interval_to_millis(interval));
376                if time_in_ms == 0 { return Ok(()); }
377                let ret = kernel32::CreateTimerQueueTimer(&mut out, ptr::null_mut(),
378                                                          Some(callback),
379                                                          // not mutated, so its fine
380                                                          self.inner as *mut _,
381                                                          time_in_ms, time_in_ms,
382                                                          0);
383                if ret == 0 { return Err(::std::io::Error::last_os_error()); }
384                // FIXME: probably should just loop (or deregister)
385                (*self.inner).active.compare_exchange(ptr::null_mut(), out,
386                                                      atomic::Ordering::SeqCst,
387                                                      atomic::Ordering::SeqCst)
388                .expect("invariant broken");
389                Ok(())
390            }
391        }
392
393        pub fn interval_to_millis(interval: Duration) -> Result<winapi::DWORD> {
394            let max = winapi::DWORD::max_value();
395            // Round up
396            let subsec_ns = interval.subsec_nanos() as u64 + 999_999;
397            let ms = interval.as_secs().checked_mul(1_000)
398                .and_then(|v| v.checked_add(subsec_ns / 1_000_000));
399            if let Some(ms) = ms {
400                if ms <= max as _ {
401                    return Ok(ms as _);
402                }
403            }
404            Err(::std::io::Error::from_raw_os_error(winapi::ERROR_INVALID_PARAMETER as i32))
405        }
406    }
407
408    impl Evented for Timer {
409        fn register(&self, poll: &Poll, token: Token, rdy: Ready, opts: PollOpt) -> Result<()> {
410            unsafe {
411                let val = Some(Registration::new(poll, token, rdy, opts));
412                *(*self.inner).registration.lock().unwrap() = val;
413                Ok(())
414            }
415        }
416
417        fn reregister(&self, poll: &Poll, token: Token, rdy: Ready, opts: PollOpt) -> Result<()> {
418            unsafe {
419                (*self.inner).registration.lock().unwrap().as_mut().unwrap().0
420                .update(poll, token, rdy, opts)
421            }
422        }
423
424        fn deregister(&self, poll: &Poll) -> Result<()> {
425            unsafe {
426                (*self.inner).registration.lock().unwrap().take().unwrap().0.deregister(poll)
427            }
428        }
429    }
430
431    impl ::std::io::Read for Timer {
432        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
433            unsafe {
434                if buf.len() < 8 { return Err(
435                    ::std::io::Error::from_raw_os_error(winapi::ERROR_INSUFFICIENT_BUFFER as i32)
436                ); }
437                let ptr = buf.as_mut_ptr() as *mut u64;
438                *ptr = (*self.inner).times_fired.swap(0, atomic::Ordering::SeqCst) as u64;
439                if *ptr == 0 {
440                    return Err(::mio::would_block());
441                }
442                Ok(8)
443            }
444        }
445    }
446
447    impl Drop for Timer {
448        fn drop(&mut self) {
449            unsafe {
450                let out = (*self.inner).active.swap(ptr::null_mut(), atomic::Ordering::SeqCst);
451                if !out.is_null() {
452                    let ret = kernel32::DeleteTimerQueueTimer(ptr::null_mut(), out,
453                                                              winapi::INVALID_HANDLE_VALUE);
454                    if ret == 0 {
455                        panic!("DeleteTimerQueueTimer failed with {:?}",
456                               ::std::io::Error::last_os_error());
457                    }
458                }
459                Box::from_raw(self.inner);
460            }
461        }
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use std::time;
468    use tokio_core::reactor;
469    use futures::stream::Stream;
470    use futures::future::{self, Future};
471
472    #[cfg(windows)]
473    fn allowed_delta() -> time::Duration {
474        // Windows has horrendous scheduling wherein a thread could take 16ms to wake up or
475        // something. Give it 20ms to figure out what’s happening :)
476        time::Duration::new(0, 20_000_000)
477    }
478
479    #[cfg(any(target_os = "ios", target_os = "macos"))]
480    fn allowed_delta() -> time::Duration {
481        // macOS seems to better at schedulling than windows, but still sucks terribly. Lets give
482        // them 10ms of leeway.
483        time::Duration::new(0, 10_000_000)
484    }
485
486    #[cfg(not(any(windows, target_os = "ios", target_os = "macos")))]
487    fn allowed_delta() -> time::Duration {
488        // Every other OS is swifter and can handle 1ms and probably even smaller deltas
489        // comfortably.
490        time::Duration::new(0, 1_000_000)
491    }
492
493    #[test]
494    fn works() {
495        let mut core = reactor::Core::new().unwrap();
496        let handle = core.handle();
497        let mut timer = super::PeriodicTimer::new(&handle)
498            .expect("periodic timer can be created");
499        let interval = time::Duration::new(0, 32_000_000);
500        timer.reset(interval)
501            .expect("reset works");
502        for _ in 0..3 {
503            let start = time::Instant::now();
504            let res = core.run(timer.into_future());
505            timer = match res {
506                Ok((Some(1), timer)) => timer,
507                Ok((x, _)) => panic!("expected Ok((Some(1), _)), got Ok(({:?}, _))", x),
508                Err((x, _)) => panic!("expected Ok((Some(1), _)), got Err(({:?}, _))", x),
509            };
510            let duration = time::Instant::now().duration_since(start);
511            let absdiff = if duration < interval {
512                interval - duration
513            } else {
514                duration - interval
515            };
516            assert!(absdiff < allowed_delta(), "absdiff is {:?}", absdiff);
517        }
518    }
519
520    #[test]
521    fn reset_works() {
522        let mut core = reactor::Core::new().unwrap();
523        let handle = core.handle();
524        let mut timer = super::PeriodicTimer::new(&handle)
525            .expect("periodic timer can be created");
526        timer.reset(time::Duration::new(0, 5_000_000))
527            .expect("reset works");
528        // run one iteration, so in case reset worked incorrectly and set up two timers,
529        // timer would then fire in a pattern of `a-b-[32ms of a]-b-[32ms of a]-b-...` and test
530        // would fail
531        timer = core.run(timer.into_future()).map_err(|(a, _)| a).unwrap().1;
532        let interval = time::Duration::new(0, 32_000_000);
533        timer.reset(interval)
534            .expect("reset works");
535        for _ in 0..3 {
536            let start = time::Instant::now();
537            let res = core.run(timer.into_future());
538            timer = match res {
539                Ok((Some(1), timer)) => timer,
540                Ok((x, _)) => panic!("expected Ok((Some(1), _)), got Ok(({:?}, _))", x),
541                Err((x, _)) => panic!("expected Ok((Some(1), _)), got Err(({:?}, _))", x),
542            };
543            let duration = time::Instant::now().duration_since(start);
544            let absdiff = if duration < interval {
545                interval - duration
546            } else {
547                duration - interval
548            };
549            assert!(absdiff < allowed_delta(), "absdiff is {:?}", absdiff);
550        }
551    }
552
553    #[test]
554    fn drop_unset_works() {
555        let core = reactor::Core::new().unwrap();
556        let handle = core.handle();
557        super::PeriodicTimer::new(&handle).expect("periodic timer can be created");
558    }
559
560    #[test]
561    fn reset_zero_works() {
562        let mut core = reactor::Core::new().unwrap();
563        let handle = core.handle();
564        let timer = super::PeriodicTimer::new(&handle).expect("periodic timer can be created");
565        timer.reset(time::Duration::new(0, 1_000_000)).expect("reset works");
566        timer.reset(time::Duration::new(0, 0)).expect("reset zero works");
567        ::std::thread::sleep(time::Duration::new(0, 100_000_000));
568        let future = timer.into_future().map(|(a, _)| a).map_err(|(a, _)| a)
569                          .select(future::ok(Some(9876)));
570        let resets = core.run(future).map_err(|(a, _)| a).unwrap().0;
571        assert_eq!(resets, Some(9876)); // timer never fired, so the other future got selected.
572    }
573}