timer_deque_rs/timer_portable/linux/
timer_poll.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16use std::
17{
18    fmt, 
19    os::fd::{AsFd, AsRawFd, RawFd}, 
20    sync::{atomic::{AtomicBool, AtomicUsize, Ordering}, Arc}
21};
22
23use nix::
24{
25    poll::PollTimeout, 
26    sys::
27    {
28        epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}, 
29        eventfd::{EfdFlags, EventFd}
30    }
31};
32
33use crate::
34{
35    error::{TimerErrorType, TimerResult}, 
36    map_timer_err, 
37    timer_err, 
38    timer_portable::poll::{AsTimerFd, PollEventType, PollInterrupt, PollResult, TimerPollOps, TimerPollOpsUnregister}
39};
40
41/// A [Epoll] based event monitor.
42/// 
43/// * `0` - [Epoll] instance.
44/// 
45/// * `1` - [AtomicUsize] amount of timers added
46#[derive(Debug)]
47pub struct TimerEventWatch
48{
49    epoll: Epoll, 
50
51    wakeup_event: Arc<EventFd>,
52
53    add_event: EventFd,
54
55    cancel_event: EventFd,
56
57    polling_flag: AtomicBool,
58
59    /// Amount of added FDs without internal
60    epoll_fds: AtomicUsize
61}
62
63impl TimerPollOpsUnregister for TimerEventWatch
64{
65    #[inline]
66    fn unregister<T: AsTimerFd>(&self, timer: &T) -> TimerResult<()> 
67    {
68        return <Self as TimerPollOps>::delete(&self, timer);
69    }
70}
71
72impl fmt::Display for TimerEventWatch
73{
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
75    {
76        write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(), self.epoll_fds.load(Ordering::Acquire))
77    }
78}
79
80impl Eq for TimerEventWatch{}
81
82impl PartialEq for TimerEventWatch
83{
84    fn eq(&self, other: &Self) -> bool 
85    {
86        return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
87    }
88}
89
90impl TimerPollOps for TimerEventWatch
91{
92    fn new() -> TimerResult<Self>
93    {
94        let epoll = 
95            Epoll::new(EpollCreateFlags::empty())
96                .map_err(|e|
97                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
98                )?;
99
100        let add_event = 
101            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
102                .map_err(|e|
103                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
104                )?;
105
106        let cancel_event = 
107            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
108                .map_err(|e|
109                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
110                )?;
111
112        let wakeup_event = 
113            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
114                .map_err(|e|
115                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
116                )?;
117
118        epoll
119            .add(
120                &add_event, 
121                EpollEvent::new(EpollFlags::EPOLLIN,add_event.as_raw_fd() as u64)
122            )
123            .map_err(|e|
124                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
125            )?;
126
127        epoll
128            .add(
129                &cancel_event, 
130                EpollEvent::new(EpollFlags::EPOLLIN,cancel_event.as_raw_fd() as u64)
131            )
132            .map_err(|e|
133                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
134            )?;
135
136         epoll
137            .add(
138                &wakeup_event, 
139                EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
140            )
141            .map_err(|e|
142                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
143            )?;
144
145        return Ok( 
146            Self
147            {
148                epoll: epoll, 
149                wakeup_event: Arc::new(wakeup_event),
150                add_event: add_event, 
151                cancel_event: cancel_event,
152                polling_flag: AtomicBool::new(false),
153                epoll_fds: AtomicUsize::new(0)
154            } 
155        );
156    }
157
158    fn add<T: AsTimerFd>(&self, timer: &T) -> TimerResult<()>
159    {
160        // notify the poll (if running that it should exit)
161        if self.polling_flag.load(Ordering::Acquire) == true
162        {
163            self
164                .add_event
165                .write(timer.as_fd().as_raw_fd() as u64)
166                .map_err(|e|
167                    map_timer_err!(TimerErrorType::EPoll(e), "error while cancel_event for timer {} to epoll", timer)
168                )?;
169        }
170
171        self
172            .epoll
173            .add(
174                timer, 
175                EpollEvent::new(EpollFlags::EPOLLIN, timer.as_fd().as_raw_fd() as u64)
176            )
177            .map_err(|e|
178                map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
179            )?;
180
181        self.epoll_fds.fetch_add(1, Ordering::AcqRel);
182
183        return Ok(());
184    }
185    
186    fn delete<T: AsFd + fmt::Display>(&self, timer: &T) -> TimerResult<()>
187    { 
188        // notify the poll (if running that it should exit)
189        if self.polling_flag.load(Ordering::Acquire) == true
190        {
191            self
192                .cancel_event
193                .write(timer.as_fd().as_raw_fd() as u64)
194                .map_err(|e|
195                    map_timer_err!(TimerErrorType::EPoll(e), "error while cancel_event for timer {} to epoll", timer)
196                )?;
197        }
198
199        self
200            .epoll
201            .delete(timer)
202            .map_err(|e|
203                map_timer_err!(TimerErrorType::EPoll(e), "can not delete timer {} to epoll", timer)
204            )?;
205
206        self.epoll_fds.fetch_sub(1, Ordering::AcqRel);
207
208        return Ok(());
209    }
210
211    fn poll(&self, timeout: Option<i32>) -> TimerResult<PollResult>
212    {
213        if self.polling_flag.swap(true, Ordering::AcqRel) == true
214        {
215            timer_err!(TimerErrorType::EPollAlreadyPolling, 
216                "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
217        }
218
219        let mut events = vec![EpollEvent::empty(); self.epoll_fds.load(Ordering::Acquire) + 1];
220        
221        let poll_timeout =
222            timeout
223                .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
224                .map_err(|e|
225                    map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)   
226                )?;
227
228        // reset previous events which happened before poll
229        //let _ = self.cancel_event.read();
230        
231        let evs_res = 
232            self
233                .epoll
234                .wait(events.as_mut_slice(), poll_timeout)
235                .map_err(|e|
236                    map_timer_err!(TimerErrorType::EPoll(e), "poll error")   
237                );
238
239        //let mut poll_res = PollResult::None;
240
241        self.polling_flag.store(false, Ordering::Release);
242        
243        let evs = evs_res?;
244        if evs == 0
245        {
246            return Ok(PollResult::new_none());
247        }
248        else
249        {
250            let mut poll_res = PollResult::new(evs);
251
252            for event in events[..evs].iter()
253            {
254                if event.data() as RawFd == self.cancel_event.as_raw_fd()
255                {
256                    let ev = 
257                        self
258                            .cancel_event
259                            .read()
260                            .map_or_else(
261                                |err| PollEventType::SubError(map_timer_err!(TimerErrorType::EPoll(err), "cancel_event read error")), 
262                                |timer_fd| PollEventType::TimerRemoved(timer_fd as RawFd)
263                            );
264
265                    poll_res.push(ev);
266                }
267                else if event.data() as RawFd == self.wakeup_event.as_raw_fd()
268                {
269                    let _ = self.wakeup_event.read();
270                    // no event
271                }
272                else if event.data() as RawFd == self.add_event.as_raw_fd()
273                {
274                    let _ = self.add_event.read();
275                    // no event
276                }
277                else
278                {
279                    poll_res.push(PollEventType::Some(event.data() as RawFd));
280                }
281            }
282           
283           return Ok(poll_res);
284        }
285    }
286    
287    fn get_count(&self) -> usize 
288    {
289        return self.epoll_fds.load(Ordering::Acquire);
290    }
291
292    fn get_poll_interruptor(&self) -> PollInterrupt
293    {
294        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
295    }
296    
297    fn interrupt_poll(&self) -> bool 
298    {
299        return self.wakeup_event.write(1).is_ok();
300    }
301}
302
303impl TimerEventWatch
304{
305
306
307}
308
309#[cfg(test)]
310mod tests
311{
312    use std::{cmp::Ordering, os::fd::{AsFd, AsRawFd}, time::Instant};
313
314    use crate::{deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::AbsoluteTime, OrderedTimerDeque, TimerDequeueConsumer};
315
316    use super::*;
317
318    #[test]
319    fn test_epoll0()
320    {
321        let mut time_list = 
322            OrderedTimerDeque
323                ::<TimerDequeueConsumer<i32, OrderdTimerDequeOnce>>
324                ::new("test_label".into(), 4, false).unwrap();
325
326        let epoll = TimerEventWatch::new().unwrap();
327
328        epoll.add(&time_list).unwrap();
329
330        let s = Instant::now();
331
332        let tss_set = AbsoluteTime::now().add_sec(2);
333        let ent1: i32  = 1;
334
335        time_list.add_to_timer(ent1, tss_set).unwrap();
336
337        let res = epoll.poll(Some(2001)).unwrap();
338
339        let e = s.elapsed();
340        let abs_now = AbsoluteTime::now();
341
342        println!("elapsed: {:?}, now: {}, set: {}", e, abs_now, tss_set);
343
344        println!("{:?}", res);
345
346        assert_eq!(res.is_none(), false);
347        assert_eq!(res[0], PollEventType::Some(time_list.as_fd().as_raw_fd()));
348        assert_eq!(abs_now.seconds_cmp(&tss_set) == Ordering::Equal, true);
349    }
350}