timer_deque_rs/deque_timeout/
mod.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16/// A `consumer` type of the timer which consumes the intance and returns it when
17/// timer triggers. The `consumed` instance normally whould be [Send] because it will
18/// be moved into the timer.
19pub mod timer_consumer;
20
21/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
22/// to the timer's queue. The `ticket` can be used to remove the item from queue before the 
23/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
24/// in timer's queue until timeout where it will be ignored on timeout event.
25pub mod timer_tickets;
26
27/// A `signal` sender. Calls the specified callback which must never block the executing thread.
28pub mod timer_signal;
29
30pub use std::os::fd::{AsFd, AsRawFd};
31use std::{borrow::Cow, collections::VecDeque, fmt, os::fd::{BorrowedFd, RawFd}};
32
33use crate::
34{
35    error::{TimerErrorType, TimerResult}, 
36    map_timer_err, 
37    timer_portable::
38    {
39        poll::AsTimerFd, 
40        timer::
41        {
42            AbsoluteTime, 
43            FdTimerCom, 
44            RelativeTime, 
45            TimerExpMode, 
46            TimerFd, 
47            TimerFlags, 
48            TimerReadRes, 
49            TimerType
50        }
51    }
52};
53
54/// A trait which is implemented by the structs which defines the behaviour 
55/// of the timer queue.
56pub trait OrderedTimerDequeIntrf: Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
57{
58    /// A timer item for the queue which is passed as argument. If noting is 
59    /// provided the `NoTarget` can be used.
60    type Target: PartialEq + Eq + fmt::Display + fmt::Debug;
61
62    /// A timer queue identification in the queue which may be retuened.
63    /// If nothing is retuned the `NoTicket` can be returned.
64    type Ticket: PartialEq + Eq + fmt::Display + fmt::Debug;
65
66
67    /// Should return the absolute time and the timer mode.
68    fn get_timeout_absolute(&self) -> AbsoluteTime;
69}
70
71/// A trait which is implemented by the struct which defines the operation mode of the deque.
72pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
73{
74    /// Returns the `absolute` timeout for the instance of the deque.
75    fn get_absolut_timeout(&self) -> AbsoluteTime;
76
77    /// Updates the timeout when the deque works in periodic mode.
78    fn advance_timeout(&mut self)
79    {
80        return;
81    }
82}
83
84/// This queue mode removes all entries from the queue that have timed out.
85/// 
86/// The further behaviour is defined by the type of the deque.
87#[derive(Debug)]
88pub struct OrderdTimerDequeOnce 
89{
90    /// A timeout for the item in the queue.
91    absolute_timeout: AbsoluteTime,
92}
93
94impl fmt::Display for OrderdTimerDequeOnce
95{
96    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
97    {
98        write!(f, "{}", self.absolute_timeout)
99    }
100}
101
102impl Ord for OrderdTimerDequeOnce
103{
104    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
105    {
106        return self.absolute_timeout.cmp(&other.absolute_timeout);
107    }
108}
109
110impl PartialOrd for OrderdTimerDequeOnce
111{
112    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
113    {
114        return Some(self.cmp(other));
115    }
116}
117
118impl Eq for OrderdTimerDequeOnce {}
119
120impl PartialEq for OrderdTimerDequeOnce
121{
122    fn eq(&self, other: &Self) -> bool 
123    {
124        return self.absolute_timeout == other.absolute_timeout;
125    }
126}
127
128
129impl OrderedTimerDequeMode for OrderdTimerDequeOnce
130{
131    fn get_absolut_timeout(&self) -> AbsoluteTime
132    {
133        return self.absolute_timeout;
134    }
135}
136
137impl OrderdTimerDequeOnce
138{
139    /// Creates new instacne.
140    pub(crate)
141    fn new(absolute_timeout: AbsoluteTime) -> Self
142    {
143        return 
144            Self
145            {
146                absolute_timeout: 
147                    absolute_timeout
148            };
149    }
150}
151
152/// This queue mode does not remove an element that has timed out (by `absolute_timeout`), 
153/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
154/// 
155/// The further behaviour is defined by the type of the deque.
156#[derive(Debug)]
157pub struct OrderdTimerDequePeriodic 
158{
159    /// Extends the timer until the next timeout. This is `relative`
160    /// not absolute.
161    relative_period: RelativeTime,
162
163    /// A timeout value.
164    absolute_timeout: AbsoluteTime,
165}
166
167impl fmt::Display for OrderdTimerDequePeriodic
168{
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
170    {
171        write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
172    }
173}
174
175impl Ord for OrderdTimerDequePeriodic
176{
177    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
178    {
179        return self.absolute_timeout.cmp(&other.absolute_timeout);
180    }
181}
182
183impl PartialOrd for OrderdTimerDequePeriodic
184{
185    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
186    {
187        return Some(self.cmp(other));
188    }
189}
190
191impl Eq for OrderdTimerDequePeriodic {}
192
193impl PartialEq for OrderdTimerDequePeriodic
194{
195    fn eq(&self, other: &Self) -> bool 
196    {
197        return self.absolute_timeout == other.absolute_timeout;
198    }
199}
200
201
202impl OrderedTimerDequeMode for OrderdTimerDequePeriodic
203{
204    fn get_absolut_timeout(&self) -> AbsoluteTime
205    {
206        return self.absolute_timeout;
207    }
208
209    fn advance_timeout(&mut self)
210    {
211        self.absolute_timeout += self.relative_period;
212    }
213}
214
215impl OrderdTimerDequePeriodic
216{
217    /// Creates new instance.
218    pub(crate)
219    fn new(rel_time: RelativeTime) -> Self
220    {
221        let mut inst = 
222            Self
223            {
224                relative_period:
225                    rel_time,
226                absolute_timeout: 
227                    AbsoluteTime::now(),
228            };
229
230        inst.advance_timeout();
231
232        return inst;
233    }
234}
235
236/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout 
237/// which is `absolute` time.
238/// 
239/// The queue automatically manages the `timer` i.e setting, unsetting.
240/// 
241/// Also for each type of the deque, a event procesing function is providided.
242/// 
243/// There are two types of queue:
244/// 
245/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
246/// 
247/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
248///     until the item is not removed from the queue manually.
249/// 
250/// # Generics
251/// 
252/// * `DQI` - a deque type. There are three types are available:
253///     * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
254///     ```ignore
255///         let mut time_list = 
256///             OrderedTimerDeque
257///                 ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
258///                 ::new("test_label".into(), 4, false).unwrap();
259///     ```  
260///     or
261///     ```ignore
262///         let mut time_list = 
263///             OrderedTimerDeque
264///                 ::<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
265///                 ::new("test_label".into(), 4, false).unwrap();
266///     ```     
267///     * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
268///     ```ignore
269///         let mut time_list = 
270///             OrderedTimerDeque
271///                 ::<TimerDequeueConsumer<TestItem, OrderdTimerDequeOnce>>
272///                 ::new("test_label".into(), 4, false).unwrap();
273///     ```  
274///     or
275///     ```ignore
276///         let mut time_list = 
277///             OrderedTimerDeque
278///                 ::<TimerDequeueConsumer<TestItem, OrderdTimerDequePeriodic>>
279///                 ::new("test_label".into(), 4, false).unwrap();
280///     ``` 
281///     * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
282///     ```ignore
283///         let mut time_list = 
284///             OrderedTimerDeque
285///                 ::<TimerDequeueSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
286///                 ::new("test_label".into(), 4, false).unwrap();
287///     ```
288///     or
289///     ```ignore
290///         let mut time_list = 
291///             OrderedTimerDeque
292///                 ::<TimerDequeueSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
293///                 ::new("test_label".into(), 4, false).unwrap();
294///     ```
295#[derive(Debug)]
296pub struct OrderedTimerDeque<DQI: OrderedTimerDequeIntrf>
297{
298    /// A [VecDeque] list which is sorted by time in ascending order -
299    /// lower first, largest last
300    pub(crate) deque_timeout_list: VecDeque<DQI>,
301
302    /// An instance of the FFI (Kernel Supported Timer)
303    pub(crate) timer: TimerFd,
304}
305
306impl<DQI> AsTimerFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
307{
308    #[inline]
309    fn get_bind(&self) -> Option<std::sync::Arc<crate::timer_portable::DefaultEventWatch>> 
310    {
311        return self.timer.get_bind();
312    }
313
314    #[inline]
315    fn bind_poll(&self, timer_weak_ref: std::sync::Weak<crate::timer_portable::DefaultEventWatch>) 
316    {
317        return self.timer.bind_poll(timer_weak_ref);
318    }
319
320    fn unbind_poll(&self) 
321    {
322        return self.timer.unbind_poll();
323    }
324}   
325
326impl<DQI> Drop for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
327{
328    fn drop(&mut self) 
329    {
330        let _ = self.clean_up_timer();
331    }
332}
333
334impl<DQI> AsFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf 
335{
336    fn as_fd(&self) -> BorrowedFd<'_> 
337    {
338        return self.timer.as_fd();
339    }
340}
341
342impl<DQI> AsRawFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
343{
344    fn as_raw_fd(&self) -> RawFd 
345    {
346        return self.timer.as_raw_fd();
347    }
348}
349
350impl<DQI> fmt::Display for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf 
351{
352    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
353    {
354        write!(f, "timer: '{}', fd: '{}', queue_len: '{}'", 
355            self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
356    }
357}
358
359impl<DQI> OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf 
360{
361    /// Creates new deque instance.
362    /// 
363    /// # Argument
364    /// 
365    /// `timer_label` - a label which helps to identify the timer.
366    /// 
367    /// `deq_len` - a minimal, pre-allocated deque length.
368    /// 
369    /// `cloexec` - when set to `true` sets the `CLOEXEC` flag to FD.
370    /// 
371    /// # Returns
372    /// 
373    /// A [Result] as alias [TimerResult] is returned with
374    /// 
375    /// * [Result::Ok] with the instance.
376    /// 
377    /// * [Result::Err] with the error description.
378    pub 
379    fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool) -> TimerResult<OrderedTimerDeque<DQI>>
380    {
381        let deq_len = 
382            if deq_len == 0
383            {
384                10
385            }
386            else
387            {
388                deq_len
389            };
390
391        let tf = 
392            if cloexec == true
393            {
394                TimerFlags::TFD_CLOEXEC
395            }
396            else
397            {
398                TimerFlags::empty()
399            };
400
401        // init timer
402        let timer = 
403            TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK | tf)
404                .map_err(|e|
405                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
406                )?;
407
408        return Ok( 
409            Self
410            { 
411                deque_timeout_list: VecDeque::with_capacity(deq_len), 
412                timer: timer
413            } 
414        );
415    }
416
417    /// Reads the FD to retrive the event type.
418    /// 
419    /// This function behaves differently when the timer is set with [TimerFlags::TFD_NONBLOCK].
420    /// 
421    /// Normally, all timer_* creates nonblocking timer, so this function behaves like when
422    /// the [TimerFlags::TFD_NONBLOCK] is set.
423    /// 
424    /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
425    /// In case of 'EINTR', the read attempt will be repeated. It will be reapeated in both cases.
426    /// 
427    /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result.
428    /// 
429    /// # Return
430    /// 
431    /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned. 
432    /// 
433    /// * In case of `ECANCELLD`, the [TimerReadRes::WouldBlock] will be returned.
434    /// 
435    /// * In case of any other error the [Result::Err] is returned.
436    /// 
437    /// When a timer fires an event the [Result::Ok] is returned with the amount of
438    /// timer overrun. Normally it is 1.
439    pub 
440    fn wait_for_event(&self) -> TimerResult<TimerReadRes<u64>>
441    {
442        let res = 
443            self
444                .timer
445                .read()
446                .map_err(|e|
447                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
448                );
449
450        return res;
451    }
452
453    /// Asynchronious polling. The timer's FD is set to nonblocking,
454    /// so each time it will return `pending` and load CPU. If you are using
455    /// `tokio` or `smol` suing corresponding helpers like tokio's `AsyncFd`.
456    pub async
457    fn poll(&self) -> TimerResult<TimerReadRes<u64>>
458    {
459        return 
460            (&self.timer)
461                .await
462                .map_err(|e|
463                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
464                );
465    }
466
467    /// Returns the queue length.
468    pub 
469    fn timer_queue_len(&self) -> usize
470    {
471        return self.deque_timeout_list.len();
472    }
473    
474
475    /// Setting the `timer` instance to the new values or unsets the timer if
476    /// `queue` becomes empty.
477    pub  
478    fn reschedule_timer(&mut self) -> TimerResult<()>
479    {
480        if let Some(front_entity) = self.deque_timeout_list.front()
481        {
482            let timer_exp = 
483                TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
484
485            return
486                self
487                    .timer
488                    .set_time(timer_exp)
489                    .map_err(|e|
490                        map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
491                    );
492        }
493        else
494        {
495            // queue is empty, force timer to stop
496
497            return 
498                self
499                    .timer
500                    .unset_time()
501                    .map_err(|e|
502                        map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
503                    );
504        }
505    }
506
507    /// Unarms timer and clears the queue.
508    pub 
509    fn clean_up_timer(&mut self) -> TimerResult<()>
510    {
511        self
512            .timer
513            .unset_time()
514            .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
515
516        self.deque_timeout_list.clear();
517
518        return Ok(());
519    }
520
521    /// Unarms timer only.
522    pub 
523    fn stop_timer(&mut self) -> TimerResult<()>
524    {
525        return 
526            self
527                .timer
528                .unset_time()
529                .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
530    }
531
532    /// An internal function to recude code duplication. Each deque item type has its own `add` function
533    /// realization.
534    pub(crate)   
535    fn add_to_timer_local(&mut self, inst: DQI) -> TimerResult<()>
536    {
537        if self.deque_timeout_list.len() == 0
538        {
539            let timer_stamp = 
540                TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
541
542            // setup timer
543            self
544                .timer
545                .set_time(timer_stamp)
546                .map_err(|e|
547                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
548                )?;
549
550            self.deque_timeout_list.push_front(inst);      
551        }
552        else
553        {
554            // list can not be empty from this point
555            let front_timeout = 
556                self.deque_timeout_list.front().unwrap().get_timeout_absolute();
557
558            // intances timeout
559            let inst_timeout = inst.get_timeout_absolute();
560
561            if front_timeout >= inst_timeout
562            {
563                // push to front
564                self.deque_timeout_list.push_front(inst);
565
566                self.reschedule_timer()?;
567            }
568            else 
569            {
570                let back_banuntil = 
571                    self
572                        .deque_timeout_list
573                        .back()
574                        .unwrap()
575                        .get_timeout_absolute();
576
577                if back_banuntil <= inst_timeout
578                {
579                    // push to the back
580                    self.deque_timeout_list.push_back(inst);
581                }
582                else
583                {
584                    let pos = 
585                        self
586                            .deque_timeout_list
587                            .binary_search_by( |se| 
588                                se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
589                            )
590                            .map_or_else(|e| e, |r| r);
591
592                    self.deque_timeout_list.insert(pos, inst);
593                }
594            }
595        }
596
597        return Ok(());
598    }
599}