timer_deque_rs/timer_portable/unix/linux/
timer_poll.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18use std::
19{
20    collections::BTreeMap, fmt, os::fd::{AsFd, AsRawFd, RawFd}, sync::{Arc, RwLock, atomic::{AtomicBool, Ordering}}
21};
22
23use nix::
24{
25    errno::Errno, poll::PollTimeout, sys::
26    {
27        epoll::{Epoll, EpollCreateFlags, EpollEvent, EpollFlags}, 
28        eventfd::{EfdFlags, EventFd}
29    }
30};
31
32use crate::
33{
34    error::{TimerError, TimerErrorType, TimerResult}, 
35    map_timer_err, 
36    timer_err, 
37    timer_portable::
38    {
39        AsTimerId, FdTimerMarker, TimerFd, TimerId, poll::{ PollEventType, PollInterrupt, TimerPollOps}, timer::FdTimerRead
40    }
41};
42
43
44
45/// A [Epoll] based event monitor.
46#[derive(Debug)]
47pub struct TimerEventWatch
48{
49    /// A event notification facility.
50    epoll: Epoll, 
51
52    /// A waker which interrupts the `poll`.
53    wakeup_event: Arc<EventFd>,
54
55    /// A poll guard which prevents the multiple call to poll.
56    polling_flag: AtomicBool,
57
58    /// All regestered timers.
59    timers: RwLock<BTreeMap<TimerId, TimerFd>>,
60}
61
62impl fmt::Display for TimerEventWatch
63{
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
65    {
66        write!(f, "fd:{}, cnt:{}", self.epoll.0.as_fd().as_raw_fd(), 
67            self.timers.try_read().map_or("locked".to_string(), |f| f.len().to_string())
68        ) //self.epoll_fds.load(Ordering::Acquire))
69    }
70}
71
72impl Eq for TimerEventWatch {}
73
74impl PartialEq for TimerEventWatch
75{
76    fn eq(&self, other: &Self) -> bool 
77    {
78        return self.epoll.0.as_raw_fd() == other.epoll.0.as_raw_fd();
79    }
80}
81
82impl TimerPollOps for TimerEventWatch
83{
84    fn new() -> TimerResult<Self>
85    {
86        let epoll = 
87            Epoll::new(EpollCreateFlags::empty())
88                .map_err(|e|
89                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
90                )?;
91
92        let wakeup_event = 
93            EventFd::from_flags(EfdFlags::EFD_CLOEXEC | EfdFlags::EFD_NONBLOCK)
94                .map_err(|e|
95                    map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
96                )?;
97
98         epoll
99            .add(
100                &wakeup_event, 
101                EpollEvent::new(EpollFlags::EPOLLIN,wakeup_event.as_raw_fd() as u64)
102            )
103            .map_err(|e|
104                map_timer_err!(TimerErrorType::EPoll(e), "epoll new() error")
105            )?;
106
107        return Ok( 
108            Self
109            {
110                epoll: epoll, 
111                wakeup_event: Arc::new(wakeup_event),
112                polling_flag: AtomicBool::new(false),
113                timers: RwLock::new(BTreeMap::new())
114            } 
115        );
116    }
117
118    fn add(&self, timer: TimerFd) -> TimerResult<()>
119    {
120        // lock shared lock for exclusive access
121        let mut timers_lock = 
122            self
123                .timers
124                .write()
125                .map_or_else(|e| e.into_inner(), |v| v);
126
127        // check if timer was added previosly
128        let false = 
129            timers_lock.contains_key(&timer.as_timer_id())
130        else
131        {
132            timer_err!(TimerErrorType::Duplicate, "can not add timer {} to epoll, reason duplicate", 
133                timer)
134        };
135
136        // add to epoll
137        self
138            .epoll
139            .add(
140                &timer, 
141                EpollEvent::new(EpollFlags::EPOLLIN, timer.as_timer_id().0 as u64)
142            )
143            .map_err(|e|
144                map_timer_err!(TimerErrorType::EPoll(e), "can not add timer {} to epoll", timer)
145            )?;
146
147        // store on the list
148        timers_lock.insert(timer.as_timer_id(), timer);
149
150        return Ok(());
151    }
152    
153    fn delete<FD: FdTimerMarker>(&self, timer: &FD) -> TimerResult<()>
154    { 
155        let mut timers_lock = 
156            self
157                .timers
158                .write()
159                .map_or_else(|e| e.into_inner(), |v| v);
160
161        // check if timer was added previosly
162        let true = 
163            timers_lock.contains_key(&timer.as_timer_id())
164        else
165        {
166            timer_err!(TimerErrorType::Duplicate, "can not remove timer {} to epoll, reason duplicate", 
167                timer.as_fd().as_raw_fd())
168        };
169
170        if let Err(ern) = 
171            self
172                .epoll
173                .delete(&timer)
174        {
175            if ern != Errno::ENOENT
176            {
177                timer_err!(TimerErrorType::EPoll(ern), "can not delete timer {} from epoll", 
178                    timer.as_timer_id());
179            }
180        }
181
182        let Some(_) = timers_lock.remove(&timer.as_timer_id())
183        else
184        {
185            timer_err!(TimerErrorType::NotFound, "timer {} not found in the list", timer.as_fd().as_raw_fd())
186        };
187
188        return Ok(());
189    }
190
191    fn poll(&self, timeout: Option<i32>) ->  TimerResult<Option<Vec<PollEventType>>>
192    {
193        if self.polling_flag.swap(true, Ordering::SeqCst) == true
194        {
195            timer_err!(TimerErrorType::EPollAlreadyPolling, 
196                "epoll fd: '{}' other thread already polling", self.epoll.0.as_raw_fd());
197        }
198
199        let mut events = vec![EpollEvent::empty(); self.get_count() + 1];
200
201        let mut poll_funct = || 
202            {               
203                let poll_timeout =
204                    timeout
205                        .map_or(Ok(PollTimeout::NONE), |f| PollTimeout::try_from(f))
206                        .map_err(|e|
207                            map_timer_err!(TimerErrorType::Conversion, "timeout value: '{:?}' is incorrect: '{}'", timeout, e)   
208                        )?;
209                
210                let evs_res = 
211                    self
212                        .epoll
213                        .wait(events.as_mut_slice(), poll_timeout)
214                        .map_err(|e|
215                            map_timer_err!(TimerErrorType::EPoll(e), "poll error")   
216                        )?;
217
218                return Ok(evs_res);
219            };
220
221        let poll_res: Result<usize, TimerError> = poll_funct();
222
223        self.polling_flag.store(false, Ordering::SeqCst);
224        
225        // ignore EINTR
226        if let Err(err) = poll_res.as_ref() && err.get_error_type() == TimerErrorType::EPoll(Errno::EINTR)
227        {
228            return Ok(None);
229        }
230
231        let evs = poll_res?;
232        if evs == 0
233        {
234            return Ok(None);
235        }
236        else
237        {
238            // lock timers in read mode
239            let timers_read = 
240                self
241                    .timers
242                    .read()
243                    .map_or_else(|e| e.into_inner(), |v| v);
244
245            let mut poll_res: Vec<PollEventType> = Vec::with_capacity(evs);
246
247            for event 
248            in events[..evs].iter().filter(|ev| ev.events().intersects(EpollFlags::EPOLLIN))
249            {
250                if event.data() as RawFd == self.wakeup_event.as_raw_fd()
251                {
252                    let _ = self.wakeup_event.read();
253                }
254                else
255                {
256                    let Some(timer) = 
257                        timers_read
258                            .get(&(TimerId::from(event.data()))).map(|c| c)
259                        else
260                        {
261                            // probably removed, just ignore
262                            continue;
263                        };
264                    
265                    match timer.read()
266                    {
267                        Ok(res) => 
268                            poll_res.push(PollEventType::TimerRes(timer.as_timer_id(), res)),
269                        Err(e) => 
270                            poll_res.push(PollEventType::SubError(timer.as_timer_id(), e)),
271                    }
272                }
273            }
274           
275            if poll_res.is_empty() == false
276            {
277                return Ok(Some(poll_res));
278            }
279            else
280            {
281                return Ok(None);
282            }
283        }
284    }
285    
286    fn get_count(&self) -> usize 
287    {
288        //return self.epoll_fds.load(Ordering::SeqCst);
289        return 
290            self
291                .timers
292                .read()
293                .map_or_else(|e| e.into_inner(), |v| v)
294                .len();
295    }
296
297    fn get_poll_interruptor(&self) -> PollInterrupt
298    {
299        return PollInterrupt::new(Arc::downgrade(&self.wakeup_event));
300    }
301    
302    /// Interrupts poll genrally.
303    fn interrupt_poll(&self) -> bool 
304    {
305        return self.wakeup_event.write(self.wakeup_event.as_raw_fd() as u64).is_ok();
306    }
307}
308