timer_deque_rs/deque_timeout/
mod.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. The MIT License (MIT)
14 *                     
15 *   3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18/// A `consumer` type of the timer which consumes the intance and returns it when
19/// timer triggers. The `consumed` instance normally whould be [Send] because it will
20/// be moved into the timer which may be processed in other thread.
21pub mod timer_consumer;
22
23/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
24/// to the timer's queue. The `ticket` can be used to remove the item from queue before the 
25/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
26/// in timer's queue until timeout where it will be ignored on timeout event.
27pub mod timer_tickets;
28
29#[cfg(test)]
30mod tests;
31
32pub use std::os::fd::{AsFd, AsRawFd};
33
34use std::
35{
36    borrow::Cow, 
37    collections::VecDeque, 
38    fmt, 
39    marker::PhantomData, 
40    os::fd::{BorrowedFd, RawFd}, 
41};
42
43
44use crate::
45{
46    TimerDequeConsumer, 
47    TimerDequeTicket, 
48    TimerDequeTicketIssuer, 
49    error::{TimerErrorType, TimerResult}, 
50    map_timer_err, 
51    timer_err, 
52    timer_portable::
53    {
54        FdTimerMarker, 
55        PollEventType, 
56        TimerFd, 
57        portable_error::TimerPortResult, 
58        timer::
59        {
60            AbsoluteTime, FdTimerCom, FdTimerRead, RelativeTime, TimerExpMode, 
61            TimerFlags, TimerReadRes, TimerType
62        }
63    }
64};
65
66
67/// A trait which is implemented by the structs which define the operation mode of the deque.
68/// 
69/// At the moment for: [DequeOnce] and [DequePeriodic].
70pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
71{
72    /// A type of operation. If the item deques once, then this value should be
73    /// `true`. Otherwise, it is periodic.
74    const IS_ONCE: bool;
75
76    /// Checks the current instance against the provided `cmp` [AbsoluteTime].
77    /// It is expected that the current instance's timeout value is actual and 
78    /// haven't outlive the `cmp` already. if it does the error:
79    /// [TimerErrorType::Expired] should be returned. Or any other error
80    /// if it is required.
81    fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>;
82    
83    /// Returns the `absolute` timeout for the instance of the deque.
84    fn get_absolut_timeout(&self) -> AbsoluteTime;
85
86    /// Postpones the time by the relative offset `post_time`. May return error
87    /// (if required) if the absolut timeout value overflows.
88    /// 
89    /// May return [TimerErrorType::TicketInstanceGone] if ticket was invalidated.
90    fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>;
91
92    /// Updates the timeout when the deque works in periodic mode. By fedault
93    /// it does nothing.
94    fn advance_timeout(&mut self)
95    {
96        return;
97    }
98}
99
100/// This queue mode removes all entries from the queue that have timed out.
101/// 
102/// The further behaviour is defined by the type of the deque.
103#[derive(Debug, Clone, Copy)]
104pub struct DequeOnce 
105{
106    /// A timeout for the item in the queue.
107    absolute_timeout: AbsoluteTime,
108}
109
110impl fmt::Display for DequeOnce
111{
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
113    {
114        write!(f, "{}", self.absolute_timeout)
115    }
116}
117
118impl Ord for DequeOnce
119{
120    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
121    {
122        return self.absolute_timeout.cmp(&other.absolute_timeout);
123    }
124}
125
126impl PartialOrd for DequeOnce
127{
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
129    {
130        return Some(self.cmp(other));
131    }
132}
133
134impl Eq for DequeOnce {}
135
136impl PartialEq for DequeOnce
137{
138    fn eq(&self, other: &Self) -> bool 
139    {
140        return self.absolute_timeout == other.absolute_timeout;
141    }
142}
143
144
145impl OrderedTimerDequeMode for DequeOnce
146{
147    const IS_ONCE: bool = true;
148
149    fn get_absolut_timeout(&self) -> AbsoluteTime
150    {
151        return self.absolute_timeout;
152    }
153    
154    fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()> 
155    {
156        if cmp > self.absolute_timeout
157        {
158            timer_err!(TimerErrorType::Expired, 
159                "deque once time already expired, now: {}, req: {}", cmp, self);
160        }
161
162        return Ok(());
163    }
164    
165    fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()> 
166    {
167        self.absolute_timeout = self.absolute_timeout + posp_time;
168
169        return Ok(());
170    }
171}
172
173
174impl DequeOnce
175{
176    /// Creates new instacne.
177    #[inline]
178    pub
179    fn new(absolute_timeout: impl Into<AbsoluteTime>) -> Self
180    {
181        return Self{ absolute_timeout: absolute_timeout.into() };
182    }
183}
184
185/// This queue mode does not remove an element that has timed out (by `absolute_timeout`), 
186/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
187#[derive(Debug, Clone, Copy)]
188pub struct DequePeriodic 
189{
190    /// Extends the timer until the next timeout. This is `relative`
191    /// not absolute.
192    relative_period: RelativeTime,
193
194    /// A timeout value.
195    absolute_timeout: AbsoluteTime,
196}
197
198impl fmt::Display for DequePeriodic
199{
200    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
201    {
202        write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
203    }
204}
205
206impl Ord for DequePeriodic
207{
208    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
209    {
210        return self.absolute_timeout.cmp(&other.absolute_timeout);
211    }
212}
213
214impl PartialOrd for DequePeriodic
215{
216    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
217    {
218        return Some(self.cmp(other));
219    }
220}
221
222impl Eq for DequePeriodic {}
223
224impl PartialEq for DequePeriodic
225{
226    fn eq(&self, other: &Self) -> bool 
227    {
228        return self.absolute_timeout == other.absolute_timeout;
229    }
230}
231
232
233impl OrderedTimerDequeMode for DequePeriodic
234{
235    const IS_ONCE: bool = false;
236
237    fn get_absolut_timeout(&self) -> AbsoluteTime
238    {
239        return self.absolute_timeout;
240    }
241
242    fn advance_timeout(&mut self)
243    {
244        self.absolute_timeout += self.relative_period;
245    }
246
247    fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()> 
248    {
249        if cmp > self.absolute_timeout
250        {
251            timer_err!(TimerErrorType::Expired, 
252                "deque periodic absolute time already expired, now: {}, req: {}", cmp, self.absolute_timeout);
253        }
254        else if self.relative_period.is_zero() == false
255        {
256            timer_err!(TimerErrorType::ZeroRelativeTime, 
257                "deque periodic relative time is 0, rel_time: {}", self.relative_period);
258        }
259
260        return Ok(());
261    }
262    
263    fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()> 
264    {
265        self.absolute_timeout = self.absolute_timeout - self.relative_period + posp_time;
266
267        self.relative_period = posp_time;
268
269        return Ok(());
270    }
271}
272
273impl DequePeriodic
274{
275    /// Creates new instance. The `relative_time` is not added to
276    /// `absolute_time` until timeout. The values are unchecked at this
277    /// point and will be checked during placement in queue.
278    pub
279    fn new_from_now(rel_time: impl Into<RelativeTime>) -> Self
280    {
281        let inst = 
282            Self
283            {
284                relative_period:
285                    rel_time.into(),
286                absolute_timeout: 
287                    AbsoluteTime::now(),
288            };
289
290        return inst;
291    }
292
293    /// Creates new instance with the initial timeout `abs_time` and
294    /// period `rel_time` which is added to the `abs_time` each time
295    /// the timeout occures. The values are unchecked at this
296    /// point and will be checked during placement in queue.
297    pub
298    fn new(abs_time: impl Into<AbsoluteTime>, rel_time: impl Into<RelativeTime>) -> Self
299    {
300        let inst = 
301            Self
302            {
303                relative_period:
304                    rel_time.into(),
305                absolute_timeout: 
306                    abs_time.into(),
307            };
308
309        return inst;
310    }
311}
312
313/// A trait for the generalization of the collection type which is used to 
314/// collect the timeout values i.e ticket IDs.
315pub trait TimerTimeoutCollection<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug>
316{
317    /// Initializes the instance.
318    fn new() -> Self;
319
320    /// Store the `ITEM` in the collection.
321    fn push(&mut self, item: ITEM);
322
323    /// Wraps the instance into [Option]. If collection is `empty` a
324    /// [Option::None] should be returned.
325    fn into_option(self) -> Option<Self> where Self: Sized;
326}
327
328/// An implementation for the Vec.
329impl<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM> 
330for Vec<ITEM>
331{
332    fn new() -> Self
333    {
334        return Vec::new();
335    }
336
337    fn push(&mut self, item: ITEM) 
338    {
339        self.push(item);
340    }
341
342    fn into_option(self) -> Option<Self>
343    {
344        if self.is_empty() == false
345        {
346            return Some(self);
347        }
348        else
349        {
350            return None;
351        }
352    }
353}
354
355/// A dummy implementation when nothing is returned. Unused at the moment.
356impl <ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM> 
357for ()
358{
359    fn new() -> Self
360    {
361        return ();
362    }
363
364    fn push(&mut self, _item: ITEM) 
365    {
366        return;    
367    }
368
369    fn into_option(self) -> Option<Self>
370    {
371        return None;
372    }
373}
374
375/// A standart interface for each deque type. Every deque type must implement this 
376/// trait.
377pub trait OrderedTimerDequeHandle<MODE: OrderedTimerDequeMode>
378    : Ord + PartialOrd + PartialEq + PartialEq<Self::TimerId> + Eq + fmt::Debug + 
379        fmt::Display + OrderedTimerDequeInterf<MODE>
380{
381    /// A timer ID. Normally this is a uniq identificator of the item in the queue. 
382    /// i.e `TimerDequeId`.
383    type TimerId: PartialEq + Eq + fmt::Display + fmt::Debug;
384
385    /// A collection which is used to collect the items which are removed from
386    /// the queue due to timeout.
387    type HandleRes: TimerTimeoutCollection<Self::TimerId>;
388
389    /// Postpones the timeout of the current instance. The `postp_time` is a [RelativeTime] 
390    /// i.e inroduces the offset.
391    fn postpone(&mut self, postp_time: RelativeTime) -> TimerResult<()>;
392
393    /// Reschedules the current instance. This is different from the `postpone` as the
394    /// instance is assagned with a new time. The `MODE` cannot be changed, only time.
395    fn resched(&mut self, time: MODE) -> TimerResult<()>;
396
397    /// A spefic code which is called during timeout routine handling. Normally is should 
398    /// store the result into the `collection` and reschedule the item if it is repeated.
399    fn handle(self, timer_self: &mut OrderTimerDeque<MODE, Self>, 
400        timer_ids: &mut Self::HandleRes) -> TimerResult<()>
401    where Self: Sized;
402
403    /// Matches the current instance with provided `other` instances 
404    /// [OrderedTimerDequeHandle::TimerId]. But, the `PartialEq<Self::TimerId>` is 
405    /// implemented, so the `==` can be used to compare.
406    fn is_same(&self, other: &Self::TimerId) -> bool;
407
408    /// Attempts to acquire the [OrderedTimerDequeHandle::TimerId] by consuming the instance.
409    fn into_timer_id(self) -> Option<Self::TimerId>;
410}
411
412/// A trait which is used by the base [OrderTimerDeque]. 
413/// This is an interface which provides access to the timeout value of the instance.
414pub trait OrderedTimerDequeInterf<MODE: OrderedTimerDequeMode>
415{
416    /// Returns the absolute time and the timer mode.
417    fn get_timeout_absolute(&self) -> AbsoluteTime;
418}
419
420
421/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout 
422/// which is `absolute` time.
423/// 
424/// The queue automatically manages the `timer` i.e setting, unsetting.
425/// 
426/// Also for each type of the deque, a event procesing function is providided.
427/// 
428/// There are two types of queue:
429/// 
430/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
431/// 
432/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
433///     until the item is not removed from the queue manually.
434/// 
435/// And there are 3 types of queue models:
436/// 
437/// * [`crate::TimerDequeConsumer`] - consumes the item which is stored in the 
438///     timeout queue.
439/// 
440/// * [`crate::TimerDequeTicketIssuer`] - issues a ticket which is referenced 
441///     to the item in the queue.
442/// 
443/// * [`crate::TimerDequeSignalTicket`] - send signal on timeout using `MPSC`
444/// 
445/// # Implementations
446/// 
447/// If `feature = enable_mio_compat` is enabled a [mio::event::Source] is 
448/// implemented on the current struct.
449/// 
450/// ## Multithread
451/// 
452/// Not MT-safe. The external mutex should be used to protect the instance. 
453/// The internal `timer` [TimerFd] is MT-safe.
454/// 
455/// ## Poll
456/// 
457/// - A built-in [crate::TimerPoll] can be used.
458/// 
459/// - An external crate MIO [crate::TimerFdMioCompat] if `feature = enable_mio_compat` is
460/// enabled.
461/// 
462/// - User implemented poll. The FD can be aquired via [AsRawFd] [AsFd].
463/// 
464/// ## Async
465/// 
466/// A [Future] is implemented.
467/// # Generics
468/// 
469/// * `DQI` - a deque type. There are three types are available:
470///     * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
471///     ```ignore
472///         let mut time_list = 
473///             OrderedTimerDeque
474///                 ::<TimerDequeTicketIssuer<OrderdTimerDequeOnce>>
475///                 ::new("test_label".into(), 4, false).unwrap();
476///     ```  
477///     or
478///     ```ignore
479///         let mut time_list = 
480///             OrderedTimerDeque
481///                 ::<TimerDequeTicketIssuer<OrderdTimerDequePeriodic>>
482///                 ::new("test_label".into(), 4, false).unwrap();
483///     ```     
484///     * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
485///     ```ignore
486///         let mut time_list = 
487///             OrderedTimerDeque
488///                 ::<TimerDequeConsumer<TestItem, OrderdTimerDequeOnce>>
489///                 ::new("test_label".into(), 4, false).unwrap();
490///     ```  
491///     or
492///     ```ignore
493///         let mut time_list = 
494///             OrderedTimerDeque
495///                 ::<TimerDequeConsumer<TestItem, OrderdTimerDequePeriodic>>
496///                 ::new("test_label".into(), 4, false).unwrap();
497///     ``` 
498///     * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
499///     ```ignore
500///         let mut time_list = 
501///             OrderedTimerDeque
502///                 ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
503///                 ::new("test_label".into(), 4, false).unwrap();
504///     ```
505///     or
506///     ```ignore
507///         let mut time_list = 
508///             OrderedTimerDeque
509///                 ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
510///                 ::new("test_label".into(), 4, false).unwrap();
511///     ```
512#[derive(Debug, Eq)]
513pub struct OrderTimerDeque<MODE: OrderedTimerDequeMode, INTF: OrderedTimerDequeHandle<MODE>>
514{
515    /// A [VecDeque] list which is sorted by time in ascending order -
516    /// lower first, largest last
517    pub(crate) deque_timeout_list: VecDeque<INTF>,
518
519    /// An instance of the FFI (Kernel Supported Timer)
520    pub(crate) timer: TimerFd,
521
522    p: PhantomData<MODE>
523}
524
525#[cfg(feature = "enable_mio_compat")]
526pub mod mio_compat
527{
528    use std::{io, os::fd::{AsRawFd, RawFd}};
529
530    use mio::{Token, unix::SourceFd};
531
532    use crate::{TimerFdMioCompat, deque_timeout::{OrderTimerDeque, OrderedTimerDequeHandle, OrderedTimerDequeMode}};
533
534    impl<MODE, INTF> mio::event::Source for OrderTimerDeque<MODE, INTF>
535    where 
536        MODE: OrderedTimerDequeMode, 
537        INTF: OrderedTimerDequeHandle<MODE>
538    {
539        fn register(
540            &mut self,
541            registry: &mio::Registry,
542            token: mio::Token,
543            interests: mio::Interest,
544        ) -> io::Result<()> 
545        {
546            return 
547                SourceFd(&self.as_raw_fd()).register(registry, token, interests);
548        }
549
550        fn reregister(
551            &mut self,
552            registry: &mio::Registry,
553            token: mio::Token,
554            interests: mio::Interest,
555        ) -> io::Result<()> 
556        {
557            return 
558                SourceFd(&self.as_raw_fd()).reregister(registry, token, interests);
559        }
560
561        fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> 
562        {
563            return 
564                SourceFd(&self.as_raw_fd()).deregister(registry)
565        }
566    }
567
568    impl<MODE, INTF> PartialEq<Token> for OrderTimerDeque<MODE, INTF>
569    where 
570        MODE: OrderedTimerDequeMode, 
571        INTF: OrderedTimerDequeHandle<MODE>
572    {
573        fn eq(&self, other: &Token) -> bool 
574        {
575            return self.as_raw_fd() == other.0 as RawFd;
576        }
577    }
578
579    impl<MODE, INTF> TimerFdMioCompat for OrderTimerDeque<MODE, INTF>
580    where 
581        MODE: OrderedTimerDequeMode, 
582        INTF: OrderedTimerDequeHandle<MODE>
583    {
584        fn get_token(&self) -> Token
585        {
586            return Token(self.as_raw_fd() as usize);
587        }
588    }
589}
590
591impl<MODE, INTF> FdTimerRead for OrderTimerDeque<MODE, INTF> 
592where 
593    MODE: OrderedTimerDequeMode, 
594    INTF: OrderedTimerDequeHandle<MODE>
595{
596    fn read(&self) -> TimerPortResult<TimerReadRes<u64>> 
597    {
598        return self.timer.read();
599    }
600}
601
602impl<MODE, INTF> PartialEq<str> for OrderTimerDeque<MODE, INTF> 
603where 
604    MODE: OrderedTimerDequeMode, 
605    INTF: OrderedTimerDequeHandle<MODE>
606{
607    fn eq(&self, other: &str) -> bool 
608    {
609        return self.timer.as_ref() == other;
610    }
611}
612
613
614impl<MODE, INTF> FdTimerMarker for OrderTimerDeque<MODE, INTF> 
615where 
616    MODE: OrderedTimerDequeMode, 
617    INTF: OrderedTimerDequeHandle<MODE>
618{
619    fn clone_timer(&self) -> TimerFd 
620    {
621        return self.timer.clone_timer();
622    }
623
624    fn get_strong_count(&self) -> usize 
625    {
626        return self.timer.get_strong_count();
627    }
628}
629
630impl<MODE, INTF> Drop for OrderTimerDeque<MODE, INTF> 
631where 
632    MODE: OrderedTimerDequeMode, 
633    INTF: OrderedTimerDequeHandle<MODE>
634{
635    fn drop(&mut self) 
636    {
637        let _ = self.clean_up_timer();
638    }
639}
640
641impl<MODE, INTF> AsRef<str> for OrderTimerDeque<MODE, INTF>
642where 
643    MODE: OrderedTimerDequeMode, 
644    INTF: OrderedTimerDequeHandle<MODE>
645{
646    fn as_ref(&self) -> &str 
647    {
648        return self.timer.as_ref();
649    }
650}
651
652impl<MODE, INTF> AsFd for OrderTimerDeque<MODE, INTF> 
653where 
654    MODE: OrderedTimerDequeMode, 
655    INTF: OrderedTimerDequeHandle<MODE>
656{
657    fn as_fd(&self) -> BorrowedFd<'_> 
658    {
659        return self.timer.as_fd();
660    }
661}
662
663impl<MODE, INTF> AsRawFd for OrderTimerDeque<MODE, INTF> 
664where 
665    MODE: OrderedTimerDequeMode, 
666    INTF: OrderedTimerDequeHandle<MODE>
667{
668    fn as_raw_fd(&self) -> RawFd 
669    {
670        return self.timer.as_raw_fd();
671    }
672}
673
674impl<MODE, INTF> fmt::Display for OrderTimerDeque<MODE, INTF> 
675where 
676    MODE: OrderedTimerDequeMode, 
677    INTF: OrderedTimerDequeHandle<MODE>
678{
679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
680    {
681        write!(f, "timer: '{}', fd: '{}', queue_len: '{}'", 
682            self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
683    }
684}
685
686impl<MODE, INTF> PartialEq for OrderTimerDeque<MODE, INTF> 
687where 
688    MODE: OrderedTimerDequeMode, 
689    INTF: OrderedTimerDequeHandle<MODE>
690{
691    fn eq(&self, other: &Self) -> bool 
692    {
693        return self.timer == other.timer;
694    }
695}
696
697impl<MODE, INTF> PartialEq<RawFd> for OrderTimerDeque<MODE, INTF> 
698where 
699    MODE: OrderedTimerDequeMode, 
700    INTF: OrderedTimerDequeHandle<MODE>
701{
702    fn eq(&self, other: &RawFd) -> bool 
703    {
704        return self.timer == *other;
705    }
706}
707
708impl<MODE, R> OrderTimerDeque<MODE, TimerDequeConsumer<R, MODE>> 
709where 
710    MODE: OrderedTimerDequeMode,
711    R: PartialEq + Eq + fmt::Debug + fmt::Display + Send + Clone
712{
713    /// Adds the new absolute timeout to the timer deque instance.
714    /// 
715    /// The `entity` is an item which should be stored in the deque
716    /// amd on timeout - returned.
717    /// 
718    /// # Generics
719    /// 
720    /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
721    ///   There are two options: [DequeOnce] and [DequePeriodic].
722    /// 
723    /// # Arguemnts
724    /// 
725    /// * `item` - `R` which is an item to store in the deque.
726    /// 
727    /// * `mode` - `MODE` a timeout value which also defines the
728    ///  behaviour of the deque logic. 
729    /// 
730    /// # Returns
731    /// 
732    /// A [Result] as alias [TimerResult] is returned with:
733    /// 
734    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
735    /// 
736    /// * [Result::Err] with error description.
737    /// 
738    /// Possible errors: 
739    ///     
740    /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
741    /// 
742    /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time 
743    ///     should never be zero.
744    /// 
745    /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
746    ///     `errno`.
747    pub 
748    fn add(&mut self, item: R, mode: MODE) -> TimerResult<()>
749    {
750        let inst = 
751            TimerDequeConsumer::<R, MODE>::new(item, mode)?;
752
753        //return self.queue_item(inst)
754        let res = self.queue_item(inst);
755        return res;
756    }
757}
758
759
760impl<MODE> OrderTimerDeque<MODE, TimerDequeTicketIssuer<MODE>> 
761where 
762    MODE: OrderedTimerDequeMode
763{
764    /// Adds the new absolute timeout to the timer deque instance.
765    /// 
766    /// This deque assigns the `ticket` for each timeout which is
767    /// used for identification and invalidation.
768    /// 
769    /// # Generics
770    /// 
771    /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
772    ///   There are two options: [DequeOnce] and [DequePeriodic].
773    /// 
774    /// # Arguemnts
775    /// 
776    /// * `mode` - `MODE` a timeout value which also defines the
777    ///  behaviour of the deque logic. 
778    /// 
779    /// # Returns
780    /// 
781    /// A [Result] as alias [TimerResult] is returned with:
782    /// 
783    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
784    /// 
785    /// * [Result::Err] with error description.
786    /// 
787    /// Possible errors: 
788    ///     
789    /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
790    /// 
791    /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time 
792    ///     should never be zero.
793    /// 
794    /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
795    ///     `errno`.
796    pub 
797    fn add(&mut self, mode: MODE) -> TimerResult<TimerDequeTicket>
798    {
799        let (inst, ticket) = 
800            TimerDequeTicketIssuer::<MODE>::new(mode)?;
801
802        self.queue_item(inst)?;
803
804        return Ok(ticket);
805    }
806}
807
808impl<MODE, INTF> OrderTimerDeque<MODE, INTF>  
809where 
810    MODE: OrderedTimerDequeMode, 
811    INTF: OrderedTimerDequeHandle<MODE>
812{
813    /// Creates new deque instance with the provided parameters.
814    /// 
815    /// # Argument
816    /// 
817    /// * `timer_label` - a label which helps to identify the timer.
818    /// 
819    /// * `deq_len` - a minimal, pre-allocated deque length.
820    /// 
821    /// * `cloexec` - when set to `true` sets the `CLOEXEC` flag on FD.
822    /// 
823    /// * `non_blocking` - when set to `true` sets the `TFD_NONBLOCK` flag on FD.
824    /// 
825    /// # Returns
826    /// 
827    /// A [Result] as alias [TimerResult] is returned with
828    /// 
829    /// * [Result::Ok] with the instance.
830    /// 
831    /// * [Result::Err] with the error description.
832    /// 
833    /// The following errors may be returned:
834    ///     
835    /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
836    pub 
837    fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool, non_blocking: bool) -> TimerResult<OrderTimerDeque<MODE, INTF>>
838    {
839        let deq_len = 
840            if deq_len == 0
841            {
842                10
843            }
844            else
845            {
846                deq_len
847            };
848
849        let mut tf = TimerFlags::empty();
850
851        tf.set(TimerFlags::TFD_CLOEXEC, cloexec);
852        tf.set(TimerFlags::TFD_NONBLOCK, non_blocking);
853
854        // init timer
855        let timer = 
856            TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, tf)
857                .map_err(|e|
858                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
859                )?;
860
861        return Ok( 
862            Self
863            { 
864                deque_timeout_list: VecDeque::with_capacity(deq_len), 
865                timer: timer,
866                p: PhantomData,
867            } 
868        );
869    }
870
871    /// internal function which is called from the `add()` functions.
872    pub(super)  
873    fn queue_item(&mut self, inst: INTF) -> TimerResult<()>
874    {
875        if self.deque_timeout_list.len() == 0
876        {
877            let timer_stamp = 
878                TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
879
880            // setup timer
881            self
882                .timer
883                .get_timer()
884                .set_time(timer_stamp)
885                .map_err(|e|
886                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
887                )?;
888
889            self.deque_timeout_list.push_front(inst);      
890        }
891        else
892        {
893            // list can not be empty from this point
894            let front_timeout = 
895                self.deque_timeout_list.front().unwrap().get_timeout_absolute();
896
897            // intances timeout
898            let inst_timeout = inst.get_timeout_absolute();
899
900            if front_timeout >= inst_timeout
901            {
902                // push to front
903                self.deque_timeout_list.push_front(inst);
904
905                self.reschedule_timer()?;
906            }
907            else 
908            {
909                let back_banuntil = 
910                    self
911                        .deque_timeout_list
912                        .back()
913                        .unwrap()
914                        .get_timeout_absolute();
915
916                if back_banuntil <= inst_timeout
917                {
918                    // push to the back
919                    self.deque_timeout_list.push_back(inst);
920                }
921                else
922                {
923                    let pos = 
924                        self
925                            .deque_timeout_list
926                            .binary_search_by( |se| 
927                                se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
928                            )
929                            .map_or_else(|e| e, |r| r);
930
931                    self.deque_timeout_list.insert(pos, inst);
932                }
933            }
934        }
935
936        return Ok(());
937    }
938
939    /// Removes the instance from the queue by the identification depending on the
940    /// deque type.
941    /// 
942    /// # Arguments
943    /// 
944    /// `item` - identification of the item which should be removed from the queue.
945    /// 
946    /// # Returns
947    /// 
948    /// A [Result] as alias [TimerResult] is returned with:
949    /// 
950    /// * [Result::Ok] with the inner type [Option] where
951    ///     * [Option::Some] is returned with the consumed `item`.
952    ///     * [Option::None] is returned when item was not found.
953    /// 
954    /// * [Result::Err] with error description.
955    /// 
956    /// The following errors may be returned:
957    ///     
958    /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
959    pub 
960    fn remove_from_queue(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF::TimerId>>
961    {
962        return 
963            self
964                .remove_from_queue_int(item)
965                .map(|opt_intf|
966                    {
967                        let Some(intf) = opt_intf
968                        else { return None };
969
970                        return intf.into_timer_id();
971                    }
972                );
973                
974    }
975    
976    pub(super)  
977    fn remove_from_queue_int(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF>>
978    {
979        if self.deque_timeout_list.len() == 0
980        {
981            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
982        }
983        else
984        {
985            // search for the item in list
986            if self.deque_timeout_list.len() == 1
987            {
988                // just pop the from the front
989                let ret_ent = self.deque_timeout_list.pop_front().unwrap();
990
991                if &ret_ent != item
992                {
993                    self.deque_timeout_list.push_front(ret_ent);
994
995                    return Ok(None);
996                }
997
998                // stop timer
999                self.stop_timer()?;
1000
1001                return Ok(Some(ret_ent));
1002            }
1003            else
1004            {
1005                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
1006                // succesfully for temoved instance.
1007
1008                for (pos, q_item) 
1009                in self.deque_timeout_list.iter().enumerate()
1010                {
1011                    if q_item == item
1012                    {
1013                        // remove by the index
1014                        let ret_ent = 
1015                            self.deque_timeout_list.remove(pos).unwrap();
1016
1017                        // call timer reset if index is 0 (front)
1018                        if pos == 0 
1019                        {
1020                            self.reschedule_timer()?;
1021                        }
1022
1023                        return Ok(Some(ret_ent));
1024                    }
1025                }
1026
1027                return Ok(None);
1028            }
1029        }
1030    }
1031
1032    /// Reads the timer's FD to retrive the event type and then calling 
1033    /// the event handler. Then the result is returned. The result may 
1034    /// contain the `items` which were removed from the queue or copied.
1035    /// 
1036    /// This function behaves differently when the timer is initialized as
1037    /// a `non-blocking` i.e with [TimerFlags::TFD_NONBLOCK].
1038    /// 
1039    /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
1040    /// In case of 'EINTR', the read attempt will be repeated. 
1041    /// 
1042    /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result 
1043    /// immidiatly.
1044    /// 
1045    /// # Return
1046    /// 
1047    /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned. 
1048    /// 
1049    /// * In case of `ECANCELLD`, the [TimerReadRes::Cancelled] will be returned.
1050    /// 
1051    /// * In case of any other error the [Result::Err] is returned.
1052    /// 
1053    /// When a timer fires an event the [Result::Ok] is returned with the amount of
1054    /// timer overrun. Normally it is 1.
1055    pub 
1056    fn wait_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1057    {
1058        let res = 
1059            self
1060                .timer
1061                .read()
1062                .map_err(|e|
1063                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1064                )?;
1065
1066
1067        // ignore wouldblock
1068        if let TimerReadRes::WouldBlock = res
1069        {
1070            return Ok(None);
1071        }
1072
1073        return Ok(self.internal_handle_timer_event()?);
1074    }
1075
1076    /// Handles the single event received from the `poll` and
1077    /// returns the result. The result may 
1078    /// contain the `items` which were removed from the queue or copied.  
1079    /// 
1080    /// # Arguments
1081    /// 
1082    /// `pet` - [PollEventType] an event from the timer to handle.
1083    /// 
1084    /// A [Result] as alias [TimerResult] is returned with:
1085    /// 
1086    /// * [Result::Ok] witout any innder data.
1087    /// 
1088    /// * [Result::Err] with error description.
1089    pub 
1090    fn handle_timer_event(&mut self, pet: PollEventType) -> TimerResult<Option<INTF::HandleRes>>
1091    {
1092        match pet
1093        {
1094            PollEventType::TimerRes(_, res) => 
1095            {
1096                // ignore wouldblock
1097                if let TimerReadRes::WouldBlock = res
1098                {
1099                    return Ok(None);
1100                }
1101
1102                return self.internal_handle_timer_event();
1103            },
1104            PollEventType::SubError(_, err) => 
1105            {
1106                timer_err!(TimerErrorType::TimerError(err.get_errno()), "{}", err)
1107            }
1108        }
1109        
1110    }
1111
1112    fn internal_handle_timer_event(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1113    {
1114        let cur_timestamp = AbsoluteTime::now();
1115        let mut timer_ids: INTF::HandleRes = INTF::HandleRes::new();
1116
1117        loop 
1118        {
1119            // get from front of the queue
1120            let Some(front_entity) = self.deque_timeout_list.front() 
1121                else { break };
1122
1123            let time_until = front_entity.get_timeout_absolute();
1124
1125            if time_until <= cur_timestamp
1126            {
1127                let deq = self.deque_timeout_list.pop_front().unwrap();
1128
1129                deq.handle(self, &mut timer_ids)?;
1130            }
1131            else
1132            {
1133                break;
1134            }
1135        }
1136
1137        // call timer reschedule
1138        self.reschedule_timer()?;
1139
1140        return Ok(timer_ids.into_option());
1141    }
1142
1143    /// Asynchronious polling. The timer's FD is set to nonblocking,
1144    /// so each time it will return `pending` and load CPU. If you are using
1145    /// `tokio` or `smol` or other crate, the corresponding helpers like 
1146    /// tokio's `AsyncFd` can be used to wrap the instance. The timer is read-only
1147    /// so use `read-only interest` to avoid errors.
1148    /// 
1149    /// If [TimerReadRes::WouldBlock] is received then returns immidiatly.
1150    pub async
1151    fn async_poll_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1152    {
1153        let res =  
1154            self
1155                .timer
1156                .get_timer()
1157                .await
1158                .map_err(|e|
1159                    map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1160                )?;
1161
1162        // ignore wouldblock
1163        if let TimerReadRes::WouldBlock = res
1164        {
1165            return Ok(None);
1166        }
1167
1168        return Ok(self.internal_handle_timer_event()?);
1169    }
1170
1171    /// Returns the queue length.
1172    pub 
1173    fn timer_queue_len(&self) -> usize
1174    {
1175        return self.deque_timeout_list.len();
1176    }
1177    
1178    /// Postpones the instace `target` by the relative time `rel_time_off`.
1179    /// 
1180    /// # Errors
1181    /// 
1182    /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1183    /// 
1184    /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1185    /// 
1186    /// * [TimerErrorType::TimerError] - with the suberror code.
1187    pub 
1188    fn postpone(&mut self, target: &INTF::TimerId, rel_time_off: RelativeTime) -> TimerResult<()>
1189    {
1190        let mut item = 
1191            self
1192                .remove_from_queue_int(target)?
1193                .ok_or(map_timer_err!(TimerErrorType::NotFound, "ticket: {} not found", target))?;
1194
1195        
1196        item.postpone(rel_time_off)?;
1197
1198        self.queue_item(item)?;
1199
1200        return Ok(()); 
1201    }
1202
1203    /// Reschedules the instace `target` by assigning new time `time` 
1204    /// which is of type `MODE`.
1205    /// 
1206    /// # Errors
1207    /// 
1208    /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1209    /// 
1210    /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1211    /// 
1212    /// * [TimerErrorType::TimerError] - with the suberror code.
1213    /// 
1214    /// * [TimerErrorType::Expired] - if provided `time` have passed.
1215    pub 
1216    fn reschedule(&mut self, target: &INTF::TimerId, time: MODE) -> TimerResult<()>
1217    {
1218        let mut item = 
1219            self.remove_from_queue_int(target)?
1220                .ok_or(map_timer_err!(TimerErrorType::NotFound, "{} not found", target))?;
1221
1222        
1223        item.resched(time)?;
1224
1225        self.queue_item(item)?;
1226
1227        return Ok(()); 
1228    }
1229
1230    /// Starts the `timer` instance by setting timeout or stops the `timer` if the
1231    /// instnce's `queue` is empty.
1232    pub(super)   
1233    fn reschedule_timer(&mut self) -> TimerResult<()>
1234    {
1235        if let Some(front_entity) = self.deque_timeout_list.front()
1236        {
1237            let timer_exp = 
1238                TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
1239
1240            return
1241                self
1242                    .timer
1243                    .get_timer()
1244                    .set_time(timer_exp)
1245                    .map_err(|e|
1246                        map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1247                    );
1248        }
1249        else
1250        {
1251            // queue is empty, force timer to stop
1252
1253            return 
1254                self
1255                    .timer
1256                    .get_timer()
1257                    .unset_time()
1258                    .map_err(|e|
1259                        map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1260                    );
1261        }
1262    }
1263
1264    /// Unarms timer and clears the queue.
1265    pub 
1266    fn clean_up_timer(&mut self) -> TimerResult<()>
1267    {
1268        self
1269            .timer
1270            .get_timer()
1271            .unset_time()
1272            .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
1273
1274        self.deque_timeout_list.clear();
1275
1276        return Ok(());
1277    }
1278
1279    /// Unarms timer only.
1280    pub 
1281    fn stop_timer(&mut self) -> TimerResult<()>
1282    {
1283        return 
1284            self
1285                .timer
1286                .get_timer()
1287                .unset_time()
1288                .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
1289    }
1290
1291}