timer_deque_rs/
timer.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15pub use std::os::fd::{AsFd, AsRawFd};
16use std::{borrow::Cow, collections::VecDeque, fmt, os::fd::{BorrowedFd, RawFd}};
17
18use crate::
19{
20    error::{TimerErrorType, TimerResult}, 
21    map_timer_err, 
22    timer_portable::timer::
23    {
24        FdTimerCom, TimerExpMode, TimerFd, TimerFlags, TimerReadRes, TimerSetTimeFlags, TimerType
25    }
26};
27
28/// A trait which is implemented by the structs which defines the behaviour 
29/// of the timer queue.
30pub trait OrderedTimerDequeIntrf
31{
32    /// A timer item for the queue which is passed as argument. If noting is 
33    /// provided the `NoTarget` can be used.
34    type Target: PartialEq + Eq + fmt::Display + fmt::Debug;
35
36    /// A timer queue identification in the queue which may be retuened.
37    /// If nothing is retuned the `NoTicket` can be returned.
38    type Ticket: PartialEq + Eq + fmt::Display + fmt::Debug;
39
40    /// Wraps the input and initializes the timer time.
41    /// 
42    /// # Returns
43    /// 
44    /// Should return the `Self` instance and optionaly a `Self::Ticket` type.
45    fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
46        where Self: Sized;
47
48    /// Should return the absolute time and the timer mode.
49    fn get_timeout_absolute(&self) -> TimerExpMode;
50}
51
52/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout 
53/// which is `absolute` time.
54/// 
55/// The queue automatically manages the `timer` i.e setting, unsetting.
56/// 
57/// Also for each type of the deque, a event procesing function is providided.
58/// 
59/// # Generics
60/// 
61/// * `DQI` - a deque type. There are three types are available:
62///     * - [crate::TimerDequeueTicketIssuer] which issues a ticket for the instance for which
63///     the timer was set.
64///     * - [crate::TimerDequeueConsumer] which consumes the instance for which the timer is set.
65///     * - [crate::TimerDequeueSignalTicket] which sends a signal to destination.
66#[derive(Debug)]
67pub struct OrderedTimerDeque<DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display>
68{
69    /// A [VecDeque] list which is sorted by time in ascending order -
70    /// lower first, largest last
71    pub(crate) deque_timeout_list: VecDeque<DQI>,
72
73    /// An instance of the FFI (Kernel Supported Timer)
74    pub(crate) timer: TimerFd,
75}
76
77impl<DQI> Drop for OrderedTimerDeque<DQI>
78where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
79{
80    fn drop(&mut self) 
81    {
82        let _ = self.clean_up_timer();
83    }
84}
85
86impl<DQI> AsFd for OrderedTimerDeque<DQI>
87where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
88{
89    fn as_fd(&self) -> BorrowedFd<'_> 
90    {
91        return self.timer.as_fd();
92    }
93}
94
95impl<DQI> AsRawFd for OrderedTimerDeque<DQI>
96where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
97{
98    fn as_raw_fd(&self) -> RawFd 
99    {
100        return self.timer.as_raw_fd();
101    }
102}
103
104impl<DQI> fmt::Display for OrderedTimerDeque<DQI>
105where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
106{
107    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
108    {
109        write!(f, "timer: '{}', fd: '{}', queue_len: '{}'", 
110            self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
111    }
112}
113
114impl<DQI> OrderedTimerDeque<DQI>
115where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
116{
117    /// Creates new deque instance.
118    /// 
119    /// # Argument
120    /// 
121    /// `timer_label` - a label which helps to identify the timer.
122    /// 
123    /// `deq_len` - a minimal, pre-allocated deque length.
124    /// 
125    /// `cloexec` - when set to `true` sets the `CLOEXEC` flag to FD.
126    /// 
127    /// # Returns
128    /// 
129    /// A [Result] as alias [TimerResult] is returned with
130    /// 
131    /// * [Result::Ok] with the instance.
132    /// 
133    /// * [Result::Err] with the error description.
134    pub 
135    fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool) -> TimerResult<OrderedTimerDeque<DQI>>
136    {
137        let deq_len = 
138            if deq_len == 0
139            {
140                10
141            }
142            else
143            {
144                deq_len
145            };
146
147        let tf = 
148            if cloexec == true
149            {
150                TimerFlags::TFD_CLOEXEC
151            }
152            else
153            {
154                TimerFlags::empty()
155            };
156
157        // init timer
158        let timer = 
159            TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK | tf)
160                .map_err(|e|
161                    map_timer_err!(TimerErrorType::TimerError, "{}", e)
162                )?;
163
164        return Ok( 
165            Self
166            { 
167                deque_timeout_list: VecDeque::with_capacity(deq_len), 
168                timer: timer
169            } 
170        );
171    }
172
173    /// Reads the FD to retrive the event type.
174    /// 
175    /// This function behaves differently when the timer is set with [TimerFlags::TFD_NONBLOCK].
176    /// 
177    /// Normally, all timer_* creates nonblocking timer, so this function behaves like when
178    /// the [TimerFlags::TFD_NONBLOCK] is set.
179    /// 
180    /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
181    /// In case of 'EINTR', the read attempt will be repeated. It will be reapeated in both cases.
182    /// 
183    /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result.
184    /// 
185    /// # Return
186    /// 
187    /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned. 
188    /// 
189    /// * In case of `ECANCELLD`, the [TimerReadRes::WouldBlock] will be returned.
190    /// 
191    /// * In case of any other error the [Result::Err] is returned.
192    /// 
193    /// When a timer fires an event the [Result::Ok] is returned with the amount of
194    /// timer overrun. Normally it is 1.
195    pub 
196    fn wait_for_event(&self) -> TimerResult<TimerReadRes<u64>>
197    {
198        let res = 
199            self
200                .timer
201                .read()
202                .map_err(|e|
203                    map_timer_err!(TimerErrorType::TimerError, "{}", e)
204                );
205
206        return res;
207    }
208
209    /// Asynchronious polling. The timer's FD is set to nonblocking,
210    /// so each time it will return `pending` and load CPU. If you are using
211    /// `tokio` or `smol` suing corresponding helpers like tokio's `AsyncFd`.
212    pub async
213    fn poll(&self) -> TimerResult<TimerReadRes<u64>>
214    {
215        return 
216            (&self.timer)
217                .await
218                .map_err(|e|
219                    map_timer_err!(TimerErrorType::TimerError, "{}", e)
220                );
221    }
222
223    /// Returns the queue length.
224    pub 
225    fn timer_queue_len(&self) -> usize
226    {
227        return self.deque_timeout_list.len();
228    }
229    
230
231    /// Setting the `timer` instance to the new values or unsets the timer if
232    /// `queue` becomes empty.
233    pub  
234    fn reschedule_timer(&mut self) -> TimerResult<()>
235    {
236        if let Some(front_entity) = self.deque_timeout_list.front()
237        {
238            return
239                self
240                    .timer
241                    .set_time(
242                    TimerSetTimeFlags::TFD_TIMER_ABSTIME | TimerSetTimeFlags::TFD_TIMER_CANCEL_ON_SET, 
243                    front_entity.get_timeout_absolute()
244                    )
245                    .map_err(|e|
246                        map_timer_err!(TimerErrorType::TimerError, "{}", e)
247                    );
248        }
249        else
250        {
251            // queue is empty, force timer to stop
252
253            return 
254                self
255                    .timer
256                    .unset_time()
257                    .map_err(|e|
258                        map_timer_err!(TimerErrorType::TimerError, "{}", e)
259                    );
260        }
261    }
262
263    /// Unarms timer and clears the queue.
264    pub 
265    fn clean_up_timer(&mut self) -> TimerResult<()>
266    {
267        self
268            .timer
269            .unset_time()
270            .map_err(|e| map_timer_err!(TimerErrorType::TimerError, "{}", e))?;
271
272        self.deque_timeout_list.clear();
273
274        return Ok(());
275    }
276
277    /// Unarms timer only.
278    pub 
279    fn stop_timer(&mut self) -> TimerResult<()>
280    {
281        return 
282            self
283                .timer
284                .unset_time()
285                .map_err(|e| map_timer_err!(TimerErrorType::TimerError, "{}", e));
286    }
287
288    /// An internal function to recude code duplication. Each deque item type has its own `add` function
289    /// realization.
290    pub(crate)   
291    fn add_to_timer_local(&mut self, entity: DQI::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<DQI::Ticket>
292    {
293        let (wrapped, ext) = 
294            DQI::wrap(entity, abs_time_sec, abs_time_nsec)?;
295
296        let flags = TimerSetTimeFlags::TFD_TIMER_ABSTIME | TimerSetTimeFlags::TFD_TIMER_CANCEL_ON_SET;
297
298        if self.deque_timeout_list.len() == 0
299        {
300            let timer_stamp = wrapped.get_timeout_absolute();
301
302            // setup timer
303            self
304                .timer
305                .set_time(flags, timer_stamp)
306                .map_err(|e|
307                    map_timer_err!(TimerErrorType::TimerError, "{}", e)
308                )?;
309
310            self.deque_timeout_list.push_front(wrapped);      
311        }
312        else
313        {
314            //list can not be empty from this point
315            let front_banuntil = 
316                self.deque_timeout_list.front().unwrap().get_timeout_absolute();
317
318            let timer_stamp = wrapped.get_timeout_absolute();
319
320            if front_banuntil >= timer_stamp
321            {
322                self
323                    .timer
324                    .set_time(flags, timer_stamp)
325                    .map_err(|e|
326                        map_timer_err!(TimerErrorType::TimerError, "{}", e)
327                    )?;
328
329                // push to front
330                self.deque_timeout_list.push_front(wrapped);
331            }
332            else 
333            {
334                let back_banuntil = 
335                    self
336                        .deque_timeout_list
337                        .back()
338                        .unwrap()
339                        .get_timeout_absolute();
340
341                if back_banuntil <= timer_stamp
342                {
343                    // push to the back
344                    self.deque_timeout_list.push_back(wrapped);
345                }
346                else
347                {
348                    let pos = 
349                        self
350                            .deque_timeout_list
351                            .binary_search_by( |se| 
352                                se.get_timeout_absolute().cmp(&wrapped.get_timeout_absolute())
353                            )
354                            .map_or_else(|e| e, |r| r);
355
356                    self.deque_timeout_list.insert(pos, wrapped);
357                }
358            }
359        }
360
361        return Ok(ext);
362    }
363}