timer_deque_rs/timer_portable/linux/
timer_fd_linux.rs1use std::borrow::Cow;
19use std::{fmt};
20use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd};
21use std::os::unix::prelude::RawFd;
22use std::sync::{Arc, RwLock, TryLockResult, Weak};
23use std::task::Poll;
24
25use nix::fcntl::{FcntlArg, OFlag};
26use nix::{fcntl, libc};
27use nix::libc::itimerspec;
28
29
30use crate::nix::errno::Errno;
31use crate::timer_portable::poll::{AsTimerFd, TimerPollOpsUnregister};
32use crate::timer_portable::portable_error::TimerPortResult;
33use crate::timer_portable::timer::{ModeTimeType, TimerReadRes};
34use crate::{map_portable_err, portable_err, AbsoluteTime};
35use crate::timer_portable::
36{
37 DefaultEventWatch, FdTimerCom, TimerExpMode, TimerFlags, TimerType
38};
39
40
41#[derive(Debug)]
47pub struct TimerFd
48{
49 label: Cow<'static, str>,
51
52 timer_fd: OwnedFd,
54
55 poll_bind: RwLock<Option<Weak<DefaultEventWatch>>>,
59}
60
61impl fmt::Display for TimerFd
62{
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
64 {
65 let poller =
66 if let TryLockResult::Ok(val) = self.poll_bind.try_read()
67 {
68 val
69 .as_ref()
70 .map_or(
71 Cow::Borrowed("instance removed"),
72 |ev_watch|
73 ev_watch
74 .upgrade()
75 .map_or(
76 Cow::Borrowed("instance removed"),
77 |f| Cow::Owned(f.as_ref().to_string())
78 )
79 )
80 }
81 else
82 {
83 Cow::Borrowed("RWlock would block")
84 };
85
86 write!(f, "{}({}) watch: {}", self.label, self.timer_fd.as_raw_fd(), poller)
87 }
88}
89
90impl AsFd for TimerFd
91{
92 fn as_fd(&self) -> BorrowedFd<'_>
93 {
94 return self.timer_fd.as_fd();
95 }
96}
97
98impl AsRawFd for TimerFd
99{
100 fn as_raw_fd(&self) -> RawFd
101 {
102 return self.timer_fd.as_raw_fd();
103 }
104}
105
106impl AsTimerFd for TimerFd
107{
108 fn get_bind(&self) -> Option<Arc<DefaultEventWatch>>
109 {
110 return
111 self
112 .poll_bind
113 .read()
114 .unwrap()
115 .as_ref()
116 .map_or(None, |f| f.upgrade());
117 }
118
119 fn bind_poll(&self, timer_weak_ref: Weak<DefaultEventWatch>)
120 {
121 self.poll_bind.write().unwrap().replace(timer_weak_ref);
122
123 return;
124 }
125
126 fn unbind_poll(&self)
127 {
128 let _ = self.poll_bind.write().unwrap().take();
129
130 return;
131 }
132}
133
134impl Drop for TimerFd
135{
136 fn drop(&mut self)
137 {
138 let _ = self.unset_time();
139
140 let Some(bind) = self.poll_bind.write().unwrap().take()
141 else { return };
142
143 let Some(bind_arc) = bind.upgrade()
144 else {return };
145
146 let _ = bind_arc.unregister(self);
147
148 return;
149 }
150}
151
152impl Eq for TimerFd {}
153
154impl PartialEq for TimerFd
155{
156 fn eq(&self, other: &Self) -> bool
157 {
158 return self.timer_fd.as_raw_fd() == other.timer_fd.as_raw_fd();
159 }
160}
161
162impl PartialEq<RawFd> for TimerFd
163{
164 fn eq(&self, other: &RawFd) -> bool
165 {
166 return self.timer_fd.as_raw_fd() == *other;
167 }
168}
169
170impl PartialEq<str> for TimerFd
171{
172 fn eq(&self, other: &str) -> bool
173 {
174 return self.label == other;
175 }
176}
177
178impl AsRef<str> for TimerFd
179{
180 fn as_ref(&self) -> &str
181 {
182 return &self.label;
183 }
184}
185
186
187impl Ord for TimerFd
188{
189 fn cmp(&self, other: &Self) -> std::cmp::Ordering
190 {
191 return self.timer_fd.as_raw_fd().cmp(&other.timer_fd.as_raw_fd());
192 }
193}
194
195impl PartialOrd for TimerFd
196{
197 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
198 {
199 return Some(self.cmp(other));
200 }
201}
202
203impl FdTimerCom for TimerFd
204{
205 fn new(label: Cow<'static, str>, timer_type: TimerType, timer_flags: TimerFlags) -> TimerPortResult<Self>
206 {
207 let timer_fd =
208 unsafe { libc::timerfd_create(timer_type.into(), timer_flags.bits()) };
209
210 if timer_fd == -1
211 {
212 portable_err!(Errno::last(), "timer: '{}', timerfd_create failed with error", label);
213 }
214
215 return Ok(
216 Self
217 {
218 label:
219 label,
220 timer_fd:
221 unsafe { OwnedFd::from_raw_fd(timer_fd) },
222 poll_bind:
223 RwLock::new(None),
224 }
225 );
226 }
227
228 fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
247 {
248 let mut timer_overfl = [0u8; size_of::<u64>()];
249
250 loop
251 {
252 let ret =
253 nix::unistd::read(&self.timer_fd, &mut timer_overfl);
254 if let Ok(_) = ret
259 {
260 let overl: u64 = u64::from_ne_bytes(timer_overfl);
261
262 return Ok(TimerReadRes::Ok(overl));
263 }
264 else if let Err(Errno::EINTR) = ret
265 {
266 continue;
267 }
268 else if let Err(Errno::EAGAIN) = ret
269 {
270 return Ok(TimerReadRes::WouldBlock);
272 }
273 else if let Err(Errno::ECANCELED) = ret
274 {
275 return Ok(TimerReadRes::Cancelled);
276 }
277 else if let Err(e) = ret
278 {
279 portable_err!(e, "read timer overflow error for timer: '{}'", self.label)
280 }
281 }
282
283
284 }
285
286 fn set_time<TIMERTYPE: ModeTimeType>(&self, timer_exp: TimerExpMode<TIMERTYPE>) -> TimerPortResult<()>
313 {
314 let flags = TIMERTYPE::get_flags().bits();
318 let timer_value: itimerspec = timer_exp.into();
319
320 let res =
321 unsafe
322 {
323 libc::timerfd_settime(self.timer_fd.as_raw_fd(), flags,
324 &timer_value, core::ptr::null_mut())
325 };
326
327 if res == -1
328 {
329 portable_err!(Errno::last(), "can not init timer: '{}'", self.label);
330 }
331
332 return Ok(());
333 }
334
335 fn unset_time(&self) -> TimerPortResult<()>
336 {
337 let mut timer_value: itimerspec = unsafe { std::mem::zeroed() };
339
340 let res =
341 unsafe
342 {
343 libc::timerfd_gettime(self.timer_fd.as_raw_fd(),
344 &mut timer_value as *mut _ as *mut itimerspec)
345 };
346
347 if res == -1
348 {
349 portable_err!(Errno::last(), "can not timerfd_gettime for timer: '{}'", self.label);
350 }
351
352 let timer_mode = TimerExpMode::<AbsoluteTime>::from(timer_value);
353
354 if TimerExpMode::<AbsoluteTime>::reset() == timer_mode
355 {
356 return Ok(());
358 }
359
360 let timer_value: itimerspec = TimerExpMode::<AbsoluteTime>::reset().into();
363
364 let res =
365 unsafe
366 {
367 libc::timerfd_settime(self.timer_fd.as_raw_fd(), 0,
368 &timer_value, core::ptr::null_mut())
369 };
370
371 if res == -1
372 {
373 portable_err!(Errno::last(), "can not unset timer: '{}'", self.label);
374 }
375
376 return Ok(());
377 }
378}
379
380
381impl TimerFd
382{
383 pub
386 fn set_nonblocking(&self) -> TimerPortResult<()>
387 {
388 let fl = OFlag::from_bits_retain(
389 fcntl::fcntl(&self.timer_fd, FcntlArg::F_GETFL)
390 .map_err(|e|
391 map_portable_err!(e, "timer: '{}', fcntl F_GETFL failed", self.label)
392 )?
393 );
394
395 fcntl::fcntl(&self.timer_fd, FcntlArg::F_SETFL(fl | OFlag::O_NONBLOCK))
396 .map_err(|e|
397 map_portable_err!(e, "timer: '{}', fcntl F_SETFL failed", self.label)
398 )?;
399
400 return Ok(());
401 }
402}
403
404impl Future for &TimerFd
405{
406 type Output = TimerPortResult<TimerReadRes<u64>>;
407
408 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
409 {
410 let res = self.read();
411
412 if let Ok(TimerReadRes::WouldBlock) = res
413 {
414 cx.waker().wake_by_ref();
415
416 return Poll::Pending;
417 }
418 else
419 {
420 return Poll::Ready(res);
421 }
422 }
423}
424
425impl Future for TimerFd
426{
427 type Output = TimerPortResult<TimerReadRes<u64>>;
428
429 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output>
430 {
431 let res = self.read();
432
433 if let Ok(TimerReadRes::WouldBlock) = res
434 {
435 cx.waker().wake_by_ref();
436
437 return Poll::Pending;
438 }
439 else
440 {
441 return Poll::Ready(res);
442 }
443 }
444}
445
446
447#[cfg(test)]
448mod tests
449{
450 use std::time::Instant;
451
452 use tokio::io::{unix::AsyncFd, Interest};
453
454 use crate::timer_portable::timer::AbsoluteTime;
455
456 use super::*;
457
458 #[test]
459 fn test1()
460 {
461 let timer =
462 TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
463 TimerFlags::empty()).unwrap();
464
465 let now = chrono::offset::Local::now().timestamp();
466 let snow = now + 3;
467 let s = Instant::now();
468
469 let timer_mode1 =
470 TimerExpMode::<AbsoluteTime>::new_oneshot(
471 AbsoluteTime::new_time(snow, 0).unwrap()
472 );
473
474 let res =
475 timer
476 .set_time(timer_mode1);
477 println!("timer was set: '{}' '{}'", now, snow);
478
479 assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
480
481
482
483 let ovf = timer.read().unwrap().unwrap();
484
485 let ts = chrono::offset::Local::now().timestamp();
486 let e = s.elapsed();
487
488 assert_eq!(ovf, 1);
489 assert_eq!(ts, snow);
490
491 println!("elapsed: {:?}, ts: {}", e, ts);
492
493 assert_eq!((e.as_millis() <= 3100), true);
494
495 println!("Success");
496 return;
497 }
498
499
500 #[tokio::test]
501 async fn test2_fut()
502 {
503 let timer =
504 TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
505 TimerFlags::empty()).unwrap();
506
507 let now = chrono::offset::Local::now().timestamp();
508 let snow = now + 3;
509 let s = Instant::now();
510
511 let timer_mode1 =
512 TimerExpMode::<AbsoluteTime>::new_oneshot(
513 AbsoluteTime::new_time(snow, 0).unwrap()
514 );
515
516
517 let res =
518 timer
519 .set_time(timer_mode1);
520
521 println!("timer was set: '{}' '{}'", now, snow);
522
523 assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
524
525 tokio::select! {
529 ovf = timer => {
530 let ts = chrono::offset::Local::now().timestamp();
531 let e = s.elapsed();
532
533 assert_eq!(ovf, Ok(TimerReadRes::Ok(1)));
534 assert_eq!(ts, snow);
535
536 println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
537 }
538 }
539 }
540
541 #[tokio::test]
542 async fn test3_tokio()
543 {
544 let mut timer: AsyncFd<TimerFd> =
545 AsyncFd::with_interest(
546 TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
547 TimerFlags::empty()).unwrap(),
548 Interest::READABLE
549 ).unwrap();
550
551
552 let now = chrono::offset::Local::now().timestamp();
553 let snow = now + 3;
554 let s = Instant::now();
555
556 let timer_mode1 =
557 TimerExpMode::<AbsoluteTime>::new_oneshot(
558 AbsoluteTime::new_time(snow, 0).unwrap()
559 );
560
561 let res =
562 timer
563 .get_mut()
564 .set_time(timer_mode1);
565
566 println!("timer was set: '{}' '{}'", now, snow);
567
568 assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
569
570 tokio::select! {
571 read_guard_res = timer.ready(Interest::READABLE) =>
572 {
573 let read_guard = read_guard_res.unwrap();
574
575 let res = read_guard.get_inner().read();
576
577 let ts = chrono::offset::Local::now().timestamp();
578 let e = s.elapsed();
579
580 assert_eq!(res, Ok(TimerReadRes::Ok(1)));
581 assert_eq!(ts, snow);
582
583 println!("timeout e: {:?}, ts:{} snow:{}", e, ts, snow);
584 }
585 }
586 }
587
588 #[test]
589 fn test4_cancel()
590 {
591 let timer =
592 TimerFd::new(Cow::Borrowed("test"), TimerType::CLOCK_REALTIME,
593 TimerFlags::TFD_NONBLOCK).unwrap();
594
595 let now = chrono::offset::Local::now().timestamp();
596 let snow = now + 3;
597 let s = Instant::now();
598
599 let timer_mode1 =
600 TimerExpMode::<AbsoluteTime>::new_oneshot(
601 AbsoluteTime::new_time(snow, 0).unwrap()
602 );
603
604 let res =
605 timer
606 .set_time(timer_mode1);
607 println!("timer was set: '{}' '{}'", now, snow);
608
609 assert_eq!(res.is_ok(), true, "{}", res.err().unwrap());
610
611
612
613 let ovf = timer.read().unwrap();
614
615 println!("{}", ovf);
616
617 timer.unset_time().unwrap();
618
619 let ovf = timer.read().unwrap();
620
621 println!("{}", ovf);
622 }
635
636}