timer_deque_rs/timer_portable/linux/
timer_poll.rs1use std::
17{
18 fmt,
19 os::fd::{AsFd, AsRawFd, RawFd},
20 sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc}
21};
22
23use nix::
24{
25 poll::PollTimeout,
26 sys::
27 {
28 epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags},
29 eventfd::{EfdFlags, EventFd}
30 }
31};
32
33use crate::
34{
35 error::{TimerErrorType, TimerResult},
36 map_timer_err,
37 timer_err,
38 timer_portable::poll::{AsTimerFd, PollEventType, PollInterrupt, PollResult, TimerPollOps, TimerPollOpsUnregister}
39};
40
41#[derive(Debug)]
47pub struct TimerEventWatch
48{
49 epoll: Epoll,
50
51 wakeup_event: Arc<EventFd>,
52
53 add_event: EventFd,
54
55 cancel_event: EventFd,
56
57 polling_flag: AtomicBool,
58
59 epoll_fds: AtomicUsize
61}
62
63impl TimerPollOpsUnregister for TimerEventWatch
64{
65 #[inline]
66 fn unregister<T: AsTimerFd>(&self, timer: &T) -> TimerResult<()>
67 {
68 return <Self as TimerPollOps>::delete(&self, timer);
69 }
70}
71
72impl fmt::Display for TimerEventWatch
73{
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
75 {
76 write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(), self.epoll_fds.load(Ordering::Acquire))
77 }
78}
79
80impl Eq for TimerEventWatch{}
81
82impl PartialEq for TimerEventWatch
83{
84 fn eq(&self, other: &Self) -> bool
85 {
86 return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
87 }
88}
89
90impl TimerPollOps for TimerEventWatch
91{
92 fn new() -> TimerResult<Self>
93 {
94 let epoll =
95 Epoll::new(EpollCreateFlags::empty())
96 .map_err(|e|
97 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
98 )?;
99
100 let add_event =
101 EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
102 .map_err(|e|
103 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
104 )?;
105
106 let cancel_event =
107 EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
108 .map_err(|e|
109 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
110 )?;
111
112 let wakeup_event =
113 EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
114 .map_err(|e|
115 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
116 )?;
117
118 epoll
119 .add(
120 &add_event,
121 EpollEvent::new(EpollFlags::EPOLLIN,add_event.as_raw_fd() as u64)
122 )
123 .map_err(|e|
124 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
125 )?;
126
127 epoll
128 .add(
129 &cancel_event,
130 EpollEvent::new(EpollFlags::EPOLLIN,cancel_event.as_raw_fd() as u64)
131 )
132 .map_err(|e|
133 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
134 )?;
135
136 epoll
137 .add(
138 &wakeup_event,
139 EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
140 )
141 .map_err(|e|
142 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
143 )?;
144
145 return Ok(
146 Self
147 {
148 epoll: epoll,
149 wakeup_event: Arc::new(wakeup_event),
150 add_event: add_event,
151 cancel_event: cancel_event,
152 polling_flag: AtomicBool::new(false),
153 epoll_fds: AtomicUsize::new(0)
154 }
155 );
156 }
157
158 fn add<T: AsTimerFd>(&self, timer: &T) -> TimerResult<()>
159 {
160 if self.polling_flag.load(Ordering::Acquire) == true
162 {
163 self
164 .add_event
165 .write(timer.as_fd().as_raw_fd() as u64)
166 .map_err(|e|
167 map_timer_err!(TimerErrorType::EPoll(e), "error while cancel_event for timer {} to epoll", timer)
168 )?;
169 }
170
171 self
172 .epoll
173 .add(
174 timer,
175 EpollEvent::new(EpollFlags::EPOLLIN, timer.as_fd().as_raw_fd() as u64)
176 )
177 .map_err(|e|
178 map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
179 )?;
180
181 self.epoll_fds.fetch_add(1, Ordering::AcqRel);
182
183 return Ok(());
184 }
185
186 fn delete<T: AsFd + fmt::Display>(&self, timer: &T) -> TimerResult<()>
187 {
188 if self.polling_flag.load(Ordering::Acquire) == true
190 {
191 self
192 .cancel_event
193 .write(timer.as_fd().as_raw_fd() as u64)
194 .map_err(|e|
195 map_timer_err!(TimerErrorType::EPoll(e), "error while cancel_event for timer {} to epoll", timer)
196 )?;
197 }
198
199 self
200 .epoll
201 .delete(timer)
202 .map_err(|e|
203 map_timer_err!(TimerErrorType::EPoll(e), "can not delete timer {} to epoll", timer)
204 )?;
205
206 self.epoll_fds.fetch_sub(1, Ordering::AcqRel);
207
208 return Ok(());
209 }
210
211 fn poll(&self, timeout: Option<i32>) -> TimerResult<PollResult>
212 {
213 if self.polling_flag.swap(true, Ordering::AcqRel) == true
214 {
215 timer_err!(TimerErrorType::EPollAlreadyPolling,
216 "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
217 }
218
219 let mut events = vec![EpollEvent::empty(); self.epoll_fds.load(Ordering::Acquire) + 1];
220
221 let poll_timeout =
222 timeout
223 .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
224 .map_err(|e|
225 map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)
226 )?;
227
228 let evs_res =
232 self
233 .epoll
234 .wait(events.as_mut_slice(), poll_timeout)
235 .map_err(|e|
236 map_timer_err!(TimerErrorType::EPoll(e), "poll error")
237 );
238
239 self.polling_flag.store(false, Ordering::Release);
242
243 let evs = evs_res?;
244 if evs == 0
245 {
246 return Ok(PollResult::new_none());
247 }
248 else
249 {
250 let mut poll_res = PollResult::new(evs);
251
252 for event in events[..evs].iter()
253 {
254 if event.data() as RawFd == self.cancel_event.as_raw_fd()
255 {
256 let ev =
257 self
258 .cancel_event
259 .read()
260 .map_or_else(
261 |err| PollEventType::SubError(map_timer_err!(TimerErrorType::EPoll(err), "cancel_event read error")),
262 |timer_fd| PollEventType::TimerRemoved(timer_fd as RawFd)
263 );
264
265 poll_res.push(ev);
266 }
267 else if event.data() as RawFd == self.wakeup_event.as_raw_fd()
268 {
269 let _ = self.wakeup_event.read();
270 }
272 else if event.data() as RawFd == self.add_event.as_raw_fd()
273 {
274 let _ = self.add_event.read();
275 }
277 else
278 {
279 poll_res.push(PollEventType::Some(event.data() as RawFd));
280 }
281 }
282
283 return Ok(poll_res);
284 }
285 }
286
287 fn get_count(&self) -> usize
288 {
289 return self.epoll_fds.load(Ordering::Acquire);
290 }
291
292 fn get_poll_interruptor(&self) -> PollInterrupt
293 {
294 return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
295 }
296
297 fn interrupt_poll(&self) -> bool
298 {
299 return self.wakeup_event.write(1).is_ok();
300 }
301}
302
303impl TimerEventWatch
304{
305
306
307}
308
309#[cfg(test)]
310mod tests
311{
312 use std::{cmp::Ordering, os::fd::{AsFd, AsRawFd}, time::Instant};
313
314 use crate::{deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::AbsoluteTime, OrderedTimerDeque, TimerDequeueConsumer};
315
316 use super::*;
317
318 #[test]
319 fn test_epoll0()
320 {
321 let mut time_list =
322 OrderedTimerDeque
323 ::<TimerDequeueConsumer<i32, OrderdTimerDequeOnce>>
324 ::new("test_label".into(), 4, false).unwrap();
325
326 let epoll = TimerEventWatch::new().unwrap();
327
328 epoll.add(&time_list).unwrap();
329
330 let s = Instant::now();
331
332 let tss_set = AbsoluteTime::now().add_sec(2);
333 let ent1: i32 = 1;
334
335 time_list.add_to_timer(ent1, tss_set).unwrap();
336
337 let res = epoll.poll(Some(2001)).unwrap();
338
339 let e = s.elapsed();
340 let abs_now = AbsoluteTime::now();
341
342 println!("elapsed: {:?}, now: {}, set: {}", e, abs_now, tss_set);
343
344 println!("{:?}", res);
345
346 assert_eq!(res.is_none(), false);
347 assert_eq!(res[0], PollEventType::Some(time_list.as_fd().as_raw_fd()));
348 assert_eq!(abs_now.seconds_cmp(&tss_set) == Ordering::Equal, true);
349 }
350}