1#[macro_use]
2extern crate tokio_core;
3extern crate mio;
4extern crate futures;
5
6use std::time::Duration;
7use tokio_core::reactor;
8use std::io::{Result, Read};
9
10pub struct PeriodicTimer {
41 poll: reactor::PollEvented<imp::Timer>
42}
43
44impl PeriodicTimer {
45 pub fn new(handle: &reactor::Handle) -> Result<PeriodicTimer> {
47 Ok(PeriodicTimer {
48 poll: try!(reactor::PollEvented::new(try!(imp::Timer::new()), handle))
49 })
50 }
51
52 pub fn reset(&self, interval: Duration) -> Result<()> {
68 imp::Timer::reset(self.poll.get_ref(), interval)
69 }
70}
71
72impl futures::stream::Stream for PeriodicTimer {
73 type Item = u64;
74 type Error = std::io::Error;
75
76 fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
81 let mut buf = [0xff; 8];
82 let bs = try_nb!(self.poll.read(&mut buf));
83 debug_assert_eq!(bs, 8);
84 Ok(futures::Async::Ready(Some(unsafe { *(buf.as_ptr() as *const u64) })))
85 }
86}
87
88#[cfg(target_os="linux")]
89mod imp {
90 extern crate libc;
91 use std::os::unix::io::RawFd;
92 use std::os::raw::c_int;
93 use std::io::Result;
94 use std::time::Duration;
95 use mio::{Poll, Token, Ready, PollOpt, Evented};
96 use mio::unix::{EventedFd, UnixReady};
97
98 pub struct Timer { fd: RawFd }
99
100 const TFD_CLOEXEC: c_int = libc::O_CLOEXEC;
101 const TFD_NONBLOCK: c_int = libc::O_NONBLOCK;
102
103 impl Timer {
104 pub fn new() -> Result<Timer> {
105 unsafe {
106 let ret = timerfd_create(libc::CLOCK_MONOTONIC, TFD_CLOEXEC | TFD_NONBLOCK);
107 if ret == -1 { return Err(::std::io::Error::last_os_error()); }
108 Ok(Timer {
109 fd: ret
110 })
111 }
112 }
113
114 pub fn reset(&self, interval: Duration) -> Result<()> {
115 let tspec = libc::timespec {
117 tv_sec: interval.as_secs() as _,
118 tv_nsec: interval.subsec_nanos() as _,
119 };
120 let itspec = itimerspec {
121 interval: tspec,
122 initial: tspec,
123 };
124 unsafe {
125 let ret = timerfd_settime(self.fd, 0, &itspec, ::std::ptr::null_mut());
126 if ret == -1 { return Err(::std::io::Error::last_os_error()); }
127 Ok(())
128 }
129 }
130 }
131
132 impl Evented for Timer {
133 fn register(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
134 EventedFd(&self.fd).register(poll, token, Ready::readable() | UnixReady::error(), opts)
135 }
136
137 fn reregister(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
138 EventedFd(&self.fd).reregister(poll, token, Ready::readable() | UnixReady::error(), opts)
139 }
140
141 fn deregister(&self, poll: &Poll) -> Result<()> {
142 EventedFd(&self.fd).deregister(poll)
143 }
144 }
145
146 impl ::std::io::Read for Timer {
147 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
148 let ret = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut _, buf.len()) };
149 if ret == -1 { return Err(::std::io::Error::last_os_error()); }
150 debug_assert_eq!(ret, 8);
151 Ok(ret as usize)
152 }
153 }
154
155 impl Drop for Timer {
156 fn drop(&mut self) {
157 unsafe {
158 libc::close(self.fd);
159 }
160 }
161 }
162
163 #[repr(C)]
164 struct itimerspec {
165 interval: libc::timespec,
166 initial: libc::timespec,
167 }
168 extern {
169 fn timerfd_create(clockid: c_int, flags: c_int) -> RawFd;
170 fn timerfd_settime(fd: RawFd, flags: c_int,
171 new_ts: *const itimerspec,
172 old_ts: *mut itimerspec) -> c_int;
173 }
174}
175
176#[cfg(any(target_os = "bitrig", target_os = "dragonfly", target_os = "freebsd", target_os = "ios",
177 target_os = "macos", target_os = "netbsd", target_os = "openbsd"))]
178mod imp {
179 extern crate libc;
180 use std::os::unix::io::RawFd;
181 use std::io::Result;
182 use std::time::Duration;
183 use self::libc::{intptr_t, kevent, kqueue};
184 use mio::{Poll, Token, Ready, PollOpt, Evented};
185 use mio::unix::{EventedFd, UnixReady};
186
187 pub struct Timer { fd: RawFd }
188
189 impl Timer {
190 pub fn new() -> Result<Timer> {
191 unsafe {
192 let ret = kqueue();
193 if ret == -1 { return Err(::std::io::Error::last_os_error()); }
194 if libc::ioctl(ret, libc::FIOCLEX) == -1 {
195 libc::fcntl(ret, libc::F_SETFD, libc::FD_CLOEXEC);
196 }
197 Ok(Timer { fd: ret })
198 }
199 }
200
201 pub fn reset(&self, interval: Duration) -> Result<()> {
202 let (ty, dur) = try!(Timer::duration_to_units(interval));
203 let en_flag = if dur == 0 { libc::EV_DISABLE } else { libc::EV_ENABLE };
204 let kevt = kevent {
205 ident: 0,
206 filter: libc::EVFILT_TIMER,
207 flags: libc::EV_ADD | en_flag,
208 fflags: ty,
209 data: dur,
210 udata: ::std::ptr::null_mut()
211 };
212 unsafe {
213 let ret = kevent(self.fd, &kevt, 1,
214 ::std::ptr::null_mut(), 0,
215 ::std::ptr::null());
216 if ret == -1 { return Err(::std::io::Error::last_os_error()); }
217 Ok(())
218 }
219 }
220
221 fn duration_to_units(interval: Duration) -> Result<(u32, intptr_t)> {
223 let secs = interval.as_secs();
224 let nanos = interval.subsec_nanos() as u64;
225 let max = intptr_t::max_value() as u64;
226
227 if nanos > max || secs > max {
228 return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
229 }
230
231 let secs = secs as intptr_t;
232 let nanos = nanos as intptr_t;
233
234 if let Some(val) = secs.checked_mul(1_000_000_000)
235 .and_then(|v| v.checked_add(nanos)) {
236 return Ok((libc::NOTE_NSECONDS, val));
237 }
238
239 if nanos % 1_000 != 0 {
240 return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
241 }
242
243 let micros = nanos / 1_000;
244
245 if let Some(val) = secs.checked_mul(1_000_000)
246 .and_then(|v| v.checked_add(micros)) {
247 return Ok((libc::NOTE_USECONDS, val));
248 }
249
250 if micros % 1_000 != 0 {
251 return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
252 }
253
254 let millis = micros / 1_000;
255
256 if let Some(val) = secs.checked_mul(1_000)
257 .and_then(|v| v.checked_add(millis)) {
258 return Ok((NOTE_MSECONDS, val));
259 }
260
261 if millis % 1_000 != 0 {
262 return Err(::std::io::Error::from_raw_os_error(libc::EINVAL));
263 }
264
265 Ok((libc::NOTE_SECONDS, secs))
266 }
267 }
268
269 #[cfg(any(target_os = "ios", target_os = "macos"))]
271 const NOTE_MSECONDS: u32 = 0;
272 #[cfg(not(any(target_os = "ios", target_os = "macos")))]
273 const NOTE_MSECONDS: u32 = libc::NOTE_MSECONDS;
274
275 impl Evented for Timer {
276 fn register(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
277 EventedFd(&self.fd).register(poll, token, Ready::readable() | UnixReady::error(), opts)
278 }
279
280 fn reregister(&self, poll: &Poll, token: Token, _: Ready, opts: PollOpt) -> Result<()> {
281 EventedFd(&self.fd).reregister(poll, token, Ready::readable() | UnixReady::error(), opts)
282 }
283
284 fn deregister(&self, poll: &Poll) -> Result<()> {
285 EventedFd(&self.fd).deregister(poll)
286 }
287 }
288
289 impl ::std::io::Read for Timer {
290 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
291 unsafe {
292 if buf.len() < 8 { return Err(::std::io::Error::from_raw_os_error(libc::EINVAL)); }
294 let mut kevt: kevent = ::std::mem::uninitialized();
295 let tspec = libc::timespec { tv_sec: 0, tv_nsec: 0 };
296 let evts = kevent(self.fd, ::std::ptr::null(), 0,
297 &mut kevt, 1, &tspec);
298 if evts == -1 {
299 return Err(::std::io::Error::last_os_error());
300 } else if evts == 1 {
301 if kevt.filter != libc::EVFILT_TIMER { return Ok(0) } let ptr = buf.as_mut_ptr() as *mut u64;
303 *ptr = kevt.data as u64;
304 return Ok(8)
305 } else {
306 return Err(::mio::would_block())
307 }
308 }
309 }
310 }
311
312 impl Drop for Timer {
313 fn drop(&mut self) {
314 unsafe {
315 libc::close(self.fd);
316 }
317 }
318 }
319}
320
321#[cfg(target_os="windows")]
322mod imp {
323 extern crate winapi;
324 extern crate kernel32;
325 use std::sync::atomic;
326 use std::ptr;
327 use std::io::Result;
328 use std::time::Duration;
329 use mio::{Poll, Token, Ready, PollOpt, Evented, Registration, SetReadiness};
330
331 pub struct Timer {
332 inner: *mut Inner
333 }
334
335 pub struct Inner {
336 active: atomic::AtomicPtr<winapi::c_void>,
337 times_fired: atomic::AtomicUsize,
338 registration: ::std::sync::Mutex<Option<(Registration, SetReadiness)>>,
339
340 }
341
342 impl Timer {
343 pub fn new() -> Result<Timer> {
344 Ok(Timer {
345 inner: Box::into_raw(Box::new(Inner {
346 active: atomic::AtomicPtr::new(ptr::null_mut()),
347 times_fired: atomic::AtomicUsize::new(0),
348 registration: ::std::sync::Mutex::new(None)
349 }))
350 })
351 }
352
353 pub fn reset(&self, interval: Duration) -> Result<()> {
354 unsafe extern "system" fn callback(data: winapi::PVOID,
355 _: winapi::BOOLEAN) {
356 let this: &Inner = &*(data as *mut _);
357 let _ = this.times_fired.fetch_add(1, atomic::Ordering::SeqCst);
358 match this.registration.lock().unwrap().as_ref() {
359 Some(&(_, ref s)) => {
360 let _ = s.set_readiness(Ready::readable());
362 },
363 None => {}
364 }
365 }
366 unsafe {
367 let mut out = (*self.inner).active.swap(ptr::null_mut(),
368 atomic::Ordering::SeqCst);
369 if !out.is_null() {
370 let ret = kernel32::DeleteTimerQueueTimer(ptr::null_mut(), out,
371 winapi::INVALID_HANDLE_VALUE);
372 if ret == 0 { return Err(::std::io::Error::last_os_error()); }
373 }
374 (*self.inner).times_fired.store(0, atomic::Ordering::SeqCst);
375 let time_in_ms = try!(Timer::interval_to_millis(interval));
376 if time_in_ms == 0 { return Ok(()); }
377 let ret = kernel32::CreateTimerQueueTimer(&mut out, ptr::null_mut(),
378 Some(callback),
379 self.inner as *mut _,
381 time_in_ms, time_in_ms,
382 0);
383 if ret == 0 { return Err(::std::io::Error::last_os_error()); }
384 (*self.inner).active.compare_exchange(ptr::null_mut(), out,
386 atomic::Ordering::SeqCst,
387 atomic::Ordering::SeqCst)
388 .expect("invariant broken");
389 Ok(())
390 }
391 }
392
393 pub fn interval_to_millis(interval: Duration) -> Result<winapi::DWORD> {
394 let max = winapi::DWORD::max_value();
395 let subsec_ns = interval.subsec_nanos() as u64 + 999_999;
397 let ms = interval.as_secs().checked_mul(1_000)
398 .and_then(|v| v.checked_add(subsec_ns / 1_000_000));
399 if let Some(ms) = ms {
400 if ms <= max as _ {
401 return Ok(ms as _);
402 }
403 }
404 Err(::std::io::Error::from_raw_os_error(winapi::ERROR_INVALID_PARAMETER as i32))
405 }
406 }
407
408 impl Evented for Timer {
409 fn register(&self, poll: &Poll, token: Token, rdy: Ready, opts: PollOpt) -> Result<()> {
410 unsafe {
411 let val = Some(Registration::new(poll, token, rdy, opts));
412 *(*self.inner).registration.lock().unwrap() = val;
413 Ok(())
414 }
415 }
416
417 fn reregister(&self, poll: &Poll, token: Token, rdy: Ready, opts: PollOpt) -> Result<()> {
418 unsafe {
419 (*self.inner).registration.lock().unwrap().as_mut().unwrap().0
420 .update(poll, token, rdy, opts)
421 }
422 }
423
424 fn deregister(&self, poll: &Poll) -> Result<()> {
425 unsafe {
426 (*self.inner).registration.lock().unwrap().take().unwrap().0.deregister(poll)
427 }
428 }
429 }
430
431 impl ::std::io::Read for Timer {
432 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
433 unsafe {
434 if buf.len() < 8 { return Err(
435 ::std::io::Error::from_raw_os_error(winapi::ERROR_INSUFFICIENT_BUFFER as i32)
436 ); }
437 let ptr = buf.as_mut_ptr() as *mut u64;
438 *ptr = (*self.inner).times_fired.swap(0, atomic::Ordering::SeqCst) as u64;
439 if *ptr == 0 {
440 return Err(::mio::would_block());
441 }
442 Ok(8)
443 }
444 }
445 }
446
447 impl Drop for Timer {
448 fn drop(&mut self) {
449 unsafe {
450 let out = (*self.inner).active.swap(ptr::null_mut(), atomic::Ordering::SeqCst);
451 if !out.is_null() {
452 let ret = kernel32::DeleteTimerQueueTimer(ptr::null_mut(), out,
453 winapi::INVALID_HANDLE_VALUE);
454 if ret == 0 {
455 panic!("DeleteTimerQueueTimer failed with {:?}",
456 ::std::io::Error::last_os_error());
457 }
458 }
459 Box::from_raw(self.inner);
460 }
461 }
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use std::time;
468 use tokio_core::reactor;
469 use futures::stream::Stream;
470 use futures::future::{self, Future};
471
472 #[cfg(windows)]
473 fn allowed_delta() -> time::Duration {
474 time::Duration::new(0, 20_000_000)
477 }
478
479 #[cfg(any(target_os = "ios", target_os = "macos"))]
480 fn allowed_delta() -> time::Duration {
481 time::Duration::new(0, 10_000_000)
484 }
485
486 #[cfg(not(any(windows, target_os = "ios", target_os = "macos")))]
487 fn allowed_delta() -> time::Duration {
488 time::Duration::new(0, 1_000_000)
491 }
492
493 #[test]
494 fn works() {
495 let mut core = reactor::Core::new().unwrap();
496 let handle = core.handle();
497 let mut timer = super::PeriodicTimer::new(&handle)
498 .expect("periodic timer can be created");
499 let interval = time::Duration::new(0, 32_000_000);
500 timer.reset(interval)
501 .expect("reset works");
502 for _ in 0..3 {
503 let start = time::Instant::now();
504 let res = core.run(timer.into_future());
505 timer = match res {
506 Ok((Some(1), timer)) => timer,
507 Ok((x, _)) => panic!("expected Ok((Some(1), _)), got Ok(({:?}, _))", x),
508 Err((x, _)) => panic!("expected Ok((Some(1), _)), got Err(({:?}, _))", x),
509 };
510 let duration = time::Instant::now().duration_since(start);
511 let absdiff = if duration < interval {
512 interval - duration
513 } else {
514 duration - interval
515 };
516 assert!(absdiff < allowed_delta(), "absdiff is {:?}", absdiff);
517 }
518 }
519
520 #[test]
521 fn reset_works() {
522 let mut core = reactor::Core::new().unwrap();
523 let handle = core.handle();
524 let mut timer = super::PeriodicTimer::new(&handle)
525 .expect("periodic timer can be created");
526 timer.reset(time::Duration::new(0, 5_000_000))
527 .expect("reset works");
528 timer = core.run(timer.into_future()).map_err(|(a, _)| a).unwrap().1;
532 let interval = time::Duration::new(0, 32_000_000);
533 timer.reset(interval)
534 .expect("reset works");
535 for _ in 0..3 {
536 let start = time::Instant::now();
537 let res = core.run(timer.into_future());
538 timer = match res {
539 Ok((Some(1), timer)) => timer,
540 Ok((x, _)) => panic!("expected Ok((Some(1), _)), got Ok(({:?}, _))", x),
541 Err((x, _)) => panic!("expected Ok((Some(1), _)), got Err(({:?}, _))", x),
542 };
543 let duration = time::Instant::now().duration_since(start);
544 let absdiff = if duration < interval {
545 interval - duration
546 } else {
547 duration - interval
548 };
549 assert!(absdiff < allowed_delta(), "absdiff is {:?}", absdiff);
550 }
551 }
552
553 #[test]
554 fn drop_unset_works() {
555 let core = reactor::Core::new().unwrap();
556 let handle = core.handle();
557 super::PeriodicTimer::new(&handle).expect("periodic timer can be created");
558 }
559
560 #[test]
561 fn reset_zero_works() {
562 let mut core = reactor::Core::new().unwrap();
563 let handle = core.handle();
564 let timer = super::PeriodicTimer::new(&handle).expect("periodic timer can be created");
565 timer.reset(time::Duration::new(0, 1_000_000)).expect("reset works");
566 timer.reset(time::Duration::new(0, 0)).expect("reset zero works");
567 ::std::thread::sleep(time::Duration::new(0, 100_000_000));
568 let future = timer.into_future().map(|(a, _)| a).map_err(|(a, _)| a)
569 .select(future::ok(Some(9876)));
570 let resets = core.run(future).map_err(|(a, _)| a).unwrap().0;
571 assert_eq!(resets, Some(9876)); }
573}