timer_deque_rs/timer_portable/unix/linux/
timer_poll.rs1use std::
19{
20 collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
21};
22
23use nix::
24{
25 errno::Errno, poll::PollTimeout, sys::
26 {
27 epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags},
28 eventfd::{EfdFlags, EventFd}
29 }
30};
31
32use crate::
33{
34 error::{TimerError, TimerErrorType, TimerResult},
35 map_timer_err,
36 timer_err,
37 timer_portable::
38 {
39 AsTimerId, FdTimerMarker, TimerFd, TimerId, poll::{ PollEventType, PollInterrupt, TimerPollOps}, timer::FdTimerRead
40 }
41};
42
43
44
45#[derive(Debug)]
47pub struct TimerEventWatch
48{
49 epoll: Epoll,
51
52 wakeup_event: Arc<EventFd>,
54
55 polling_flag: AtomicBool,
57
58 timers: RwLock<BTreeMap<TimerId, TimerFd>>,
60}
61
62impl fmt::Display for TimerEventWatch
63{
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
65 {
66 write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(),
67 self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
68 ) }
70}
71
72impl Eq for TimerEventWatch {}
73
74impl PartialEq for TimerEventWatch
75{
76 fn eq(&self, other: &Self) -> bool
77 {
78 return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
79 }
80}
81
82impl TimerPollOps for TimerEventWatch
83{
84 fn new() -> TimerResult<Self>
85 {
86 let epoll =
87 Epoll::new(EpollCreateFlags::empty())
88 .map_err(|e|
89 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
90 )?;
91
92 let wakeup_event =
93 EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
94 .map_err(|e|
95 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
96 )?;
97
98 epoll
99 .add(
100 &wakeup_event,
101 EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
102 )
103 .map_err(|e|
104 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
105 )?;
106
107 return Ok(
108 Self
109 {
110 epoll: epoll,
111 wakeup_event: Arc::new(wakeup_event),
112 polling_flag: AtomicBool::new(false),
113 timers: RwLock::new(BTreeMap::new())
114 }
115 );
116 }
117
118 fn add(&self, timer: TimerFd) -> TimerResult<()>
119 {
120 let mut timers_lock =
122 self
123 .timers
124 .write()
125 .map_or_else(|e| e.into_inner(), |v| v);
126
127 let false =
129 timers_lock.contains_key(&timer.as_timer_id())
130 else
131 {
132 timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
133 timer)
134 };
135
136 self
138 .epoll
139 .add(
140 &timer,
141 EpollEvent::new(EpollFlags::EPOLLIN, timer.as_timer_id().0 as u64)
142 )
143 .map_err(|e|
144 map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
145 )?;
146
147 timers_lock.insert(timer.as_timer_id(), timer);
149
150 return Ok(());
151 }
152
153 fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
154 {
155 let mut timers_lock =
156 self
157 .timers
158 .write()
159 .map_or_else(|e| e.into_inner(), |v| v);
160
161 let true =
163 timers_lock.contains_key(&timer.as_timer_id())
164 else
165 {
166 timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate",
167 timer.as_fd().as_raw_fd())
168 };
169
170 if let Err(ern) =
171 self
172 .epoll
173 .delete(&timer)
174 {
175 if ern != Errno::ENOENT
176 {
177 timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll",
178 timer.as_timer_id());
179 }
180 }
181
182 let Some(_) = timers_lock.remove(&timer.as_timer_id())
183 else
184 {
185 timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
186 };
187
188 return Ok(());
189 }
190
191 fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
192 {
193 if self.polling_flag.swap(true, Ordering::SeqCst) == true
194 {
195 timer_err!(TimerErrorType::EPollAlreadyPolling,
196 "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
197 }
198
199 let mut events = vec![EpollEvent::empty(); self.get_count() + 1];
200
201 let mut poll_funct = ||
202 {
203 let poll_timeout =
204 timeout
205 .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
206 .map_err(|e|
207 map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)
208 )?;
209
210 let evs_res =
211 self
212 .epoll
213 .wait(events.as_mut_slice(), poll_timeout)
214 .map_err(|e|
215 map_timer_err!(TimerErrorType::EPoll(e), "poll error")
216 )?;
217
218 return Ok(evs_res);
219 };
220
221 let poll_res: Result<usize, TimerError> = poll_funct();
222
223 self.polling_flag.store(false, Ordering::SeqCst);
224
225 if let Err(err) = poll_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
227 {
228 return Ok(None);
229 }
230
231 let evs = poll_res?;
232 if evs == 0
233 {
234 return Ok(None);
235 }
236 else
237 {
238 let timers_read =
240 self
241 .timers
242 .read()
243 .map_or_else(|e| e.into_inner(), |v| v);
244
245 let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
246
247 for event
248 in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
249 {
250 if event.data() as RawFd == self.wakeup_event.as_raw_fd()
251 {
252 let _ = self.wakeup_event.read();
253 }
254 else
255 {
256 let Some(timer) =
257 timers_read
258 .get(&(TimerId::from(event.data()))).map(|c| c)
259 else
260 {
261 continue;
263 };
264
265 match timer.read()
266 {
267 Ok(res) =>
268 poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
269 Err(e) =>
270 poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
271 }
272 }
273 }
274
275 if poll_res.is_empty() == false
276 {
277 return Ok(Some(poll_res));
278 }
279 else
280 {
281 return Ok(None);
282 }
283 }
284 }
285
286 fn get_count(&self) -> usize
287 {
288 return
290 self
291 .timers
292 .read()
293 .map_or_else(|e| e.into_inner(), |v| v)
294 .len();
295 }
296
297 fn get_poll_interruptor(&self) -> PollInterrupt
298 {
299 return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
300 }
301
302 fn interrupt_poll(&self) -> bool
304 {
305 return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
306 }
307}
308