timer_deque_rs/timer_portable/linux/
timer_poll.rs1use std::
19{
20 collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
21};
22
23use nix::
24{
25 errno::Errno, poll::PollTimeout, sys::
26 {
27 epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags},
28 eventfd::{EfdFlags, EventFd}
29 }
30};
31
32use crate::
33{
34 error::{TimerErrorType, TimerResult},
35 map_timer_err,
36 timer_err,
37 timer_portable::
38 {
39 TimerFd,
40 poll::{ PollEventType, PollInterrupt, TimerPollOps},
41 timer::FdTimerRead
42 }
43};
44
45
46
47#[derive(Debug)]
49pub struct TimerEventWatch
50{
51 epoll: Epoll,
53
54 wakeup_event: Arc<EventFd>,
56
57 polling_flag: AtomicBool,
59
60 timers: RwLock<BTreeMap<RawFd, TimerFd>>,
62}
63
64impl fmt::Display for TimerEventWatch
65{
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
67 {
68 write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(),
69 self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
70 ) }
72}
73
74impl Eq for TimerEventWatch {}
75
76impl PartialEq for TimerEventWatch
77{
78 fn eq(&self, other: &Self) -> bool
79 {
80 return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
81 }
82}
83
84impl TimerPollOps for TimerEventWatch
85{
86 fn new() -> TimerResult<Self>
87 {
88 let epoll =
89 Epoll::new(EpollCreateFlags::empty())
90 .map_err(|e|
91 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
92 )?;
93
94 let wakeup_event =
95 EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
96 .map_err(|e|
97 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
98 )?;
99
100 epoll
101 .add(
102 &wakeup_event,
103 EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
104 )
105 .map_err(|e|
106 map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
107 )?;
108
109 return Ok(
110 Self
111 {
112 epoll: epoll,
113 wakeup_event: Arc::new(wakeup_event),
114 polling_flag: AtomicBool::new(false),
115 timers: RwLock::new(BTreeMap::new())
116 }
117 );
118 }
119
120 fn add(&self, timer: TimerFd) -> TimerResult<()>
121 {
122 let mut timers_lock =
124 self
125 .timers
126 .write()
127 .map_or_else(|e| e.into_inner(), |v| v);
128
129 let false =
131 timers_lock.contains_key(&timer.as_raw_fd())
132 else
133 {
134 timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate",
135 timer)
136 };
137
138 self
140 .epoll
141 .add(
142 &timer,
143 EpollEvent::new(EpollFlags::EPOLLIN, timer.as_fd().as_raw_fd() as u64)
144 )
145 .map_err(|e|
146 map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
147 )?;
148
149 timers_lock.insert(timer.as_fd().as_raw_fd(), timer);
151
152 return Ok(());
153 }
154
155 fn delete<FD: AsFd>(&self, timer: FD) -> TimerResult<()>
156 {
157 let mut timers_lock =
158 self
159 .timers
160 .write()
161 .map_or_else(|e| e.into_inner(), |v| v);
162
163 let true =
165 timers_lock.contains_key(&timer.as_fd().as_raw_fd())
166 else
167 {
168 timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate",
169 timer.as_fd().as_raw_fd())
170 };
171
172 if let Err(ern) =
173 self
174 .epoll
175 .delete(&timer)
176 {
177 if ern != Errno::ENOENT
178 {
179 timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll",
180 timer.as_fd().as_raw_fd());
181 }
182 }
183
184 let Some(_) = timers_lock.remove(&timer.as_fd().as_raw_fd())
185 else
186 {
187 timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
188 };
189
190 return Ok(());
191 }
192
193 fn poll(&self, timeout: Option<i32>) -> TimerResult<Option<Vec<PollEventType>>>
194 {
195 if self.polling_flag.swap(true, Ordering::SeqCst) == true
196 {
197 timer_err!(TimerErrorType::EPollAlreadyPolling,
198 "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
199 }
200
201 let mut events = vec![EpollEvent::empty(); self.get_count() + 1];
202
203 let poll_timeout =
204 timeout
205 .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
206 .map_err(|e|
207 map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)
208 )?;
209
210 let evs_res =
211 self
212 .epoll
213 .wait(events.as_mut_slice(), poll_timeout)
214 .map_err(|e|
215 map_timer_err!(TimerErrorType::EPoll(e), "poll error")
216 );
217
218 self.polling_flag.store(false, Ordering::SeqCst);
219
220 if let Err(err) = evs_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
222 {
223 return Ok(None);
224 }
225
226 let evs = evs_res?;
227 if evs == 0
228 {
229 return Ok(None);
230 }
231 else
232 {
233 let timers_read =
235 self
236 .timers
237 .read()
238 .map_or_else(|e| e.into_inner(), |v| v);
239
240 let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
241
242 for event
243 in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
244 {
245 if event.data() as RawFd == self.wakeup_event.as_raw_fd()
246 {
247 let _ = self.wakeup_event.read();
248 }
249 else
250 {
251 let Some(timer) =
252 timers_read
253 .get(&(event.data() as RawFd)).map(|c| c)
254 else
255 {
256 continue;
258 };
259
260 match timer.read()
261 {
262 Ok(res) =>
263 poll_res.push(PollEventType::TimerRes(timer.as_raw_fd(), res)),
264 Err(e) =>
265 poll_res.push(PollEventType::SubError(timer.as_raw_fd(), e)),
266 }
267 }
268 }
269
270 if poll_res.is_empty() == false
271 {
272 return Ok(Some(poll_res));
273 }
274 else
275 {
276 return Ok(None);
277 }
278 }
279 }
280
281 fn get_count(&self) -> usize
282 {
283 return
285 self
286 .timers
287 .read()
288 .map_or_else(|e| e.into_inner(), |v| v)
289 .len();
290 }
291
292 fn get_poll_interruptor(&self) -> PollInterrupt
293 {
294 return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
295 }
296
297 fn interrupt_poll(&self) -> bool
299 {
300 return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
301 }
302}
303