timer_deque_rs/timer_portable/linux/
timer_poll.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20    collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
21};
22
23use nix::
24{
25    errno::Errno, poll::PollTimeout, sys::
26    {
27        epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}, 
28        eventfd::{EfdFlags, EventFd}
29    }
30};
31
32use crate::
33{
34    error::{TimerErrorType, TimerResult}, 
35    map_timer_err, 
36    timer_err, 
37    timer_portable::
38    {
39        TimerFd, 
40        poll::{ PollEventType, PollInterrupt, TimerPollOps}, 
41        timer::FdTimerRead
42    }
43};
44
45
46
47/// A [Epoll] based event monitor.
48#[derive(Debug)]
49pub struct TimerEventWatch
50{
51    /// A event notification facility.
52    epoll: Epoll, 
53
54    /// A waker which interrupts the `poll`.
55    wakeup_event: Arc<EventFd>,
56
57    /// A poll guard which prevents the multiple call to poll.
58    polling_flag: AtomicBool,
59
60    /// All regestered timers.
61    timers: RwLock<BTreeMap<RawFd, TimerFd>>,
62}
63
64impl fmt::Display for TimerEventWatch
65{
66    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
67    {
68        write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(), 
69            self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
70        ) //self.epoll_fds.load(Ordering::Acquire))
71    }
72}
73
74impl Eq for TimerEventWatch {}
75
76impl PartialEq for TimerEventWatch
77{
78    fn eq(&self, other: &Self) -> bool 
79    {
80        return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
81    }
82}
83
84impl TimerPollOps for TimerEventWatch
85{
86    fn new() -> TimerResult<Self>
87    {
88        let epoll = 
89            Epoll::new(EpollCreateFlags::empty())
90                .map_err(|e|
91                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
92                )?;
93
94        let wakeup_event = 
95            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
96                .map_err(|e|
97                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
98                )?;
99
100         epoll
101            .add(
102                &wakeup_event, 
103                EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
104            )
105            .map_err(|e|
106                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
107            )?;
108
109        return Ok( 
110            Self
111            {
112                epoll: epoll, 
113                wakeup_event: Arc::new(wakeup_event),
114                polling_flag: AtomicBool::new(false),
115                timers: RwLock::new(BTreeMap::new())
116            } 
117        );
118    }
119
120    fn add(&self, timer: TimerFd) -> TimerResult<()>
121    {
122        // lock shared lock for exclusive access
123        let mut timers_lock = 
124            self
125                .timers
126                .write()
127                .map_or_else(|e| e.into_inner(), |v| v);
128
129        // check if timer was added previosly
130        let false = 
131            timers_lock.contains_key(&timer.as_raw_fd())
132        else
133        {
134            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate", 
135                timer)
136        };
137
138        // add to epoll
139        self
140            .epoll
141            .add(
142                &timer, 
143                EpollEvent::new(EpollFlags::EPOLLIN, timer.as_fd().as_raw_fd() as u64)
144            )
145            .map_err(|e|
146                map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
147            )?;
148
149        // store on the list
150        timers_lock.insert(timer.as_fd().as_raw_fd(), timer);
151
152        return Ok(());
153    }
154    
155    fn delete<FD: AsFd>(&self, timer: FD) -> TimerResult<()>
156    { 
157        let mut timers_lock = 
158            self
159                .timers
160                .write()
161                .map_or_else(|e| e.into_inner(), |v| v);
162
163        // check if timer was added previosly
164        let true = 
165            timers_lock.contains_key(&timer.as_fd().as_raw_fd())
166        else
167        {
168            timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate", 
169                timer.as_fd().as_raw_fd())
170        };
171
172        if let Err(ern) = 
173            self
174                .epoll
175                .delete(&timer)
176        {
177            if ern != Errno::ENOENT
178            {
179                timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll", 
180                    timer.as_fd().as_raw_fd());
181            }
182        }
183
184        let Some(_) = timers_lock.remove(&timer.as_fd().as_raw_fd())
185        else
186        {
187            timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
188        };
189
190        return Ok(());
191    }
192
193    fn poll(&self, timeout: Option<i32>) ->  TimerResult<Option<Vec<PollEventType>>>
194    {
195        if self.polling_flag.swap(true, Ordering::SeqCst) == true
196        {
197            timer_err!(TimerErrorType::EPollAlreadyPolling, 
198                "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
199        }
200
201        let mut events = vec![EpollEvent::empty(); self.get_count() + 1];
202        
203        let poll_timeout =
204            timeout
205                .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
206                .map_err(|e|
207                    map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)   
208                )?;
209        
210        let evs_res = 
211            self
212                .epoll
213                .wait(events.as_mut_slice(), poll_timeout)
214                .map_err(|e|
215                    map_timer_err!(TimerErrorType::EPoll(e), "poll error")   
216                );
217
218        self.polling_flag.store(false, Ordering::SeqCst);
219        
220        // ignore EINTR
221        if let Err(err) = evs_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
222        {
223            return Ok(None);
224        }
225
226        let evs = evs_res?;
227        if evs == 0
228        {
229            return Ok(None);
230        }
231        else
232        {
233            // lock timers in read mode
234            let timers_read = 
235                self
236                    .timers
237                    .read()
238                    .map_or_else(|e| e.into_inner(), |v| v);
239
240            let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
241
242            for event 
243            in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
244            {
245                if event.data() as RawFd == self.wakeup_event.as_raw_fd()
246                {
247                    let _ = self.wakeup_event.read();
248                }
249                else
250                {
251                    let Some(timer) = 
252                        timers_read
253                            .get(&(event.data() as RawFd)).map(|c| c)
254                        else
255                        {
256                            // probably removed, just ignore
257                            continue;
258                        };
259                    
260                    match timer.read()
261                    {
262                        Ok(res) => 
263                            poll_res.push(PollEventType::TimerRes(timer.as_raw_fd(), res)),
264                        Err(e) => 
265                            poll_res.push(PollEventType::SubError(timer.as_raw_fd(), e)),
266                    }
267                }
268            }
269           
270            if poll_res.is_empty() == false
271            {
272                return Ok(Some(poll_res));
273            }
274            else
275            {
276                return Ok(None);
277            }
278        }
279    }
280    
281    fn get_count(&self) -> usize 
282    {
283        //return self.epoll_fds.load(Ordering::SeqCst);
284        return 
285            self
286                .timers
287                .read()
288                .map_or_else(|e| e.into_inner(), |v| v)
289                .len();
290    }
291
292    fn get_poll_interruptor(&self) -> PollInterrupt
293    {
294        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
295    }
296    
297    /// Interrupts poll genrally.
298    fn interrupt_poll(&self) -> bool 
299    {
300        return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
301    }
302}
303