timer_deque_rs/deque_timeout/mod.rs
1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 * 4neko.org alex@4neko.org
7 *
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *
13 * 2. The MIT License (MIT)
14 *
15 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18/// A `consumer` type of the timer which consumes the intance and returns it when
19/// timer triggers. The `consumed` instance normally whould be [Send] because it will
20/// be moved into the timer which may be processed in other thread.
21pub mod timer_consumer;
22
23/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
24/// to the timer's queue. The `ticket` can be used to remove the item from queue before the
25/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
26/// in timer's queue until timeout where it will be ignored on timeout event.
27pub mod timer_tickets;
28
29#[cfg(test)]
30mod tests;
31
32pub use std::os::fd::{AsFd, AsRawFd};
33
34use std::
35{
36 borrow::Cow,
37 collections::VecDeque,
38 fmt,
39 marker::PhantomData,
40 os::fd::{BorrowedFd, RawFd},
41};
42
43
44use crate::
45{
46 TimerDequeConsumer,
47 TimerDequeTicket,
48 TimerDequeTicketIssuer,
49 error::{TimerErrorType, TimerResult},
50 map_timer_err,
51 timer_err,
52 timer_portable::
53 {
54 FdTimerMarker,
55 PollEventType,
56 TimerFd,
57 portable_error::TimerPortResult,
58 timer::
59 {
60 AbsoluteTime, FdTimerCom, FdTimerRead, RelativeTime, TimerExpMode,
61 TimerFlags, TimerReadRes, TimerType
62 }
63 }
64};
65
66
67/// A trait which is implemented by the structs which define the operation mode of the deque.
68///
69/// At the moment for: [DequeOnce] and [DequePeriodic].
70pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
71{
72 /// A type of operation. If the item deques once, then this value should be
73 /// `true`. Otherwise, it is periodic.
74 const IS_ONCE: bool;
75
76 /// Checks the current instance against the provided `cmp` [AbsoluteTime].
77 /// It is expected that the current instance's timeout value is actual and
78 /// haven't outlive the `cmp` already. if it does the error:
79 /// [TimerErrorType::Expired] should be returned. Or any other error
80 /// if it is required.
81 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>;
82
83 /// Returns the `absolute` timeout for the instance of the deque.
84 fn get_absolut_timeout(&self) -> AbsoluteTime;
85
86 /// Postpones the time by the relative offset `post_time`. May return error
87 /// (if required) if the absolut timeout value overflows.
88 ///
89 /// May return [TimerErrorType::TicketInstanceGone] if ticket was invalidated.
90 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>;
91
92 /// Updates the timeout when the deque works in periodic mode. By fedault
93 /// it does nothing.
94 fn advance_timeout(&mut self)
95 {
96 return;
97 }
98}
99
100/// This queue mode removes all entries from the queue that have timed out.
101///
102/// The further behaviour is defined by the type of the deque.
103#[derive(Debug, Clone, Copy)]
104pub struct DequeOnce
105{
106 /// A timeout for the item in the queue.
107 absolute_timeout: AbsoluteTime,
108}
109
110impl fmt::Display for DequeOnce
111{
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
113 {
114 write!(f, "{}", self.absolute_timeout)
115 }
116}
117
118impl Ord for DequeOnce
119{
120 fn cmp(&self, other: &Self) -> std::cmp::Ordering
121 {
122 return self.absolute_timeout.cmp(&other.absolute_timeout);
123 }
124}
125
126impl PartialOrd for DequeOnce
127{
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
129 {
130 return Some(self.cmp(other));
131 }
132}
133
134impl Eq for DequeOnce {}
135
136impl PartialEq for DequeOnce
137{
138 fn eq(&self, other: &Self) -> bool
139 {
140 return self.absolute_timeout == other.absolute_timeout;
141 }
142}
143
144
145impl OrderedTimerDequeMode for DequeOnce
146{
147 const IS_ONCE: bool = true;
148
149 fn get_absolut_timeout(&self) -> AbsoluteTime
150 {
151 return self.absolute_timeout;
152 }
153
154 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
155 {
156 if cmp > self.absolute_timeout
157 {
158 timer_err!(TimerErrorType::Expired,
159 "deque once time already expired, now: {}, req: {}", cmp, self);
160 }
161
162 return Ok(());
163 }
164
165 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
166 {
167 self.absolute_timeout = self.absolute_timeout + posp_time;
168
169 return Ok(());
170 }
171}
172
173
174impl DequeOnce
175{
176 /// Creates new instacne.
177 #[inline]
178 pub
179 fn new(absolute_timeout: AbsoluteTime) -> Self
180 {
181 return Self{ absolute_timeout };
182 }
183}
184
185/// This queue mode does not remove an element that has timed out (by `absolute_timeout`),
186/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
187#[derive(Debug, Clone, Copy)]
188pub struct DequePeriodic
189{
190 /// Extends the timer until the next timeout. This is `relative`
191 /// not absolute.
192 relative_period: RelativeTime,
193
194 /// A timeout value.
195 absolute_timeout: AbsoluteTime,
196}
197
198impl fmt::Display for DequePeriodic
199{
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
201 {
202 write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
203 }
204}
205
206impl Ord for DequePeriodic
207{
208 fn cmp(&self, other: &Self) -> std::cmp::Ordering
209 {
210 return self.absolute_timeout.cmp(&other.absolute_timeout);
211 }
212}
213
214impl PartialOrd for DequePeriodic
215{
216 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
217 {
218 return Some(self.cmp(other));
219 }
220}
221
222impl Eq for DequePeriodic {}
223
224impl PartialEq for DequePeriodic
225{
226 fn eq(&self, other: &Self) -> bool
227 {
228 return self.absolute_timeout == other.absolute_timeout;
229 }
230}
231
232
233impl OrderedTimerDequeMode for DequePeriodic
234{
235 const IS_ONCE: bool = false;
236
237 fn get_absolut_timeout(&self) -> AbsoluteTime
238 {
239 return self.absolute_timeout;
240 }
241
242 fn advance_timeout(&mut self)
243 {
244 self.absolute_timeout += self.relative_period;
245 }
246
247 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
248 {
249 if cmp > self.absolute_timeout
250 {
251 timer_err!(TimerErrorType::Expired,
252 "deque periodic absolute time already expired, now: {}, req: {}", cmp, self.absolute_timeout);
253 }
254 else if self.relative_period.is_zero() == false
255 {
256 timer_err!(TimerErrorType::ZeroRelativeTime,
257 "deque periodic relative time is 0, rel_time: {}", self.relative_period);
258 }
259
260 return Ok(());
261 }
262
263 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
264 {
265 self.absolute_timeout = self.absolute_timeout - self.relative_period + posp_time;
266
267 self.relative_period = posp_time;
268
269 return Ok(());
270 }
271}
272
273impl DequePeriodic
274{
275 /// Creates new instance. The `relative_time` is not added to
276 /// `absolute_time` until timeout. The values are unchecked at this
277 /// point and will be checked during placement in queue.
278 pub
279 fn new_from_now(rel_time: RelativeTime) -> Self
280 {
281 let inst =
282 Self
283 {
284 relative_period:
285 rel_time,
286 absolute_timeout:
287 AbsoluteTime::now(),
288 };
289
290 return inst;
291 }
292
293 /// Creates new instance with the initial timeout `abs_time` and
294 /// period `rel_time` which is added to the `abs_time` each time
295 /// the timeout occures. The values are unchecked at this
296 /// point and will be checked during placement in queue.
297 pub
298 fn new(abs_time: AbsoluteTime, rel_time: RelativeTime) -> Self
299 {
300 let inst =
301 Self
302 {
303 relative_period:
304 rel_time,
305 absolute_timeout:
306 abs_time,
307 };
308
309 return inst;
310 }
311}
312
313/// A trait for the generalization of the collection type which is used to
314/// collect the timeout values i.e ticket IDs.
315pub trait TimerTimeoutCollection<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug>
316{
317 /// Initializes the instance.
318 fn new() -> Self;
319
320 /// Store the `ITEM` in the collection.
321 fn push(&mut self, item: ITEM);
322
323 /// Wraps the instance into [Option]. If collection is `empty` a
324 /// [Option::None] should be returned.
325 fn into_option(self) -> Option<Self> where Self: Sized;
326}
327
328/// An implementation for the Vec.
329impl<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
330for Vec<ITEM>
331{
332 fn new() -> Self
333 {
334 return Vec::new();
335 }
336
337 fn push(&mut self, item: ITEM)
338 {
339 self.push(item);
340 }
341
342 fn into_option(self) -> Option<Self>
343 {
344 if self.is_empty() == false
345 {
346 return Some(self);
347 }
348 else
349 {
350 return None;
351 }
352 }
353}
354
355/// A dummy implementation when nothing is returned. Unused at the moment.
356impl <ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
357for ()
358{
359 fn new() -> Self
360 {
361 return ();
362 }
363
364 fn push(&mut self, _item: ITEM)
365 {
366 return;
367 }
368
369 fn into_option(self) -> Option<Self>
370 {
371 return None;
372 }
373}
374
375/// A standart interface for each deque type. Every deque type must implement this
376/// trait.
377pub trait OrderedTimerDequeHandle<MODE: OrderedTimerDequeMode>
378 : Ord + PartialOrd + PartialEq + PartialEq<Self::TimerId> + Eq + fmt::Debug +
379 fmt::Display + OrderedTimerDequeInterf<MODE>
380{
381 /// A timer ID. Normally this is a uniq identificator of the item in the queue.
382 /// i.e `TimerDequeId`.
383 type TimerId: PartialEq + Eq + fmt::Display + fmt::Debug;
384
385 /// A collection which is used to collect the items which are removed from
386 /// the queue due to timeout.
387 type HandleRes: TimerTimeoutCollection<Self::TimerId>;
388
389 /// Postpones the timeout of the current instance. The `postp_time` is a [RelativeTime]
390 /// i.e inroduces the offset.
391 fn postpone(&mut self, postp_time: RelativeTime) -> TimerResult<()>;
392
393 /// Reschedules the current instance. This is different from the `postpone` as the
394 /// instance is assagned with a new time. The `MODE` cannot be changed, only time.
395 fn resched(&mut self, time: MODE) -> TimerResult<()>;
396
397 /// A spefic code which is called during timeout routine handling. Normally is should
398 /// store the result into the `collection` and reschedule the item if it is repeated.
399 fn handle(self, timer_self: &mut OrderTimerDeque<MODE, Self>,
400 timer_ids: &mut Self::HandleRes) -> TimerResult<()>
401 where Self: Sized;
402
403 /// Matches the current instance with provided `other` instances
404 /// [OrderedTimerDequeHandle::TimerId]. But, the `PartialEq<Self::TimerId>` is
405 /// implemented, so the `==` can be used to compare.
406 fn is_same(&self, other: &Self::TimerId) -> bool;
407
408 /// Attempts to acquire the [OrderedTimerDequeHandle::TimerId] by consuming the instance.
409 fn into_timer_id(self) -> Option<Self::TimerId>;
410}
411
412/// A trait which is used by the base [OrderTimerDeque].
413/// This is an interface which provides access to the timeout value of the instance.
414pub trait OrderedTimerDequeInterf<MODE: OrderedTimerDequeMode>
415{
416 /// Returns the absolute time and the timer mode.
417 fn get_timeout_absolute(&self) -> AbsoluteTime;
418}
419
420
421/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout
422/// which is `absolute` time.
423///
424/// The queue automatically manages the `timer` i.e setting, unsetting.
425///
426/// Also for each type of the deque, a event procesing function is providided.
427///
428/// There are two types of queue:
429///
430/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
431///
432/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
433/// until the item is not removed from the queue manually.
434///
435/// And there are 3 types of queue models:
436///
437/// * [`crate::TimerDequeConsumer`] - consumes the item which is stored in the
438/// timeout queue.
439///
440/// * [`crate::TimerDequeTicketIssuer`] - issues a ticket which is referenced
441/// to the item in the queue.
442///
443/// * [`crate::TimerDequeSignalTicket`] - send signal on timeout using `MPSC`
444///
445/// # Generics
446///
447/// * `DQI` - a deque type. There are three types are available:
448/// * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
449/// ```ignore
450/// let mut time_list =
451/// OrderedTimerDeque
452/// ::<TimerDequeTicketIssuer<OrderdTimerDequeOnce>>
453/// ::new("test_label".into(), 4, false).unwrap();
454/// ```
455/// or
456/// ```ignore
457/// let mut time_list =
458/// OrderedTimerDeque
459/// ::<TimerDequeTicketIssuer<OrderdTimerDequePeriodic>>
460/// ::new("test_label".into(), 4, false).unwrap();
461/// ```
462/// * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
463/// ```ignore
464/// let mut time_list =
465/// OrderedTimerDeque
466/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequeOnce>>
467/// ::new("test_label".into(), 4, false).unwrap();
468/// ```
469/// or
470/// ```ignore
471/// let mut time_list =
472/// OrderedTimerDeque
473/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequePeriodic>>
474/// ::new("test_label".into(), 4, false).unwrap();
475/// ```
476/// * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
477/// ```ignore
478/// let mut time_list =
479/// OrderedTimerDeque
480/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
481/// ::new("test_label".into(), 4, false).unwrap();
482/// ```
483/// or
484/// ```ignore
485/// let mut time_list =
486/// OrderedTimerDeque
487/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
488/// ::new("test_label".into(), 4, false).unwrap();
489/// ```
490#[derive(Debug, Eq)]
491pub struct OrderTimerDeque<MODE: OrderedTimerDequeMode, INTF: OrderedTimerDequeHandle<MODE>>
492{
493 /// A [VecDeque] list which is sorted by time in ascending order -
494 /// lower first, largest last
495 pub(crate) deque_timeout_list: VecDeque<INTF>,
496
497 /// An instance of the FFI (Kernel Supported Timer)
498 pub(crate) timer: TimerFd,
499
500 p: PhantomData<MODE>
501}
502
503impl<MODE, INTF> FdTimerRead for OrderTimerDeque<MODE, INTF>
504where
505 MODE: OrderedTimerDequeMode,
506 INTF: OrderedTimerDequeHandle<MODE>
507{
508 fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
509 {
510 return self.timer.read();
511 }
512}
513
514impl<MODE, INTF> PartialEq<str> for OrderTimerDeque<MODE, INTF>
515where
516 MODE: OrderedTimerDequeMode,
517 INTF: OrderedTimerDequeHandle<MODE>
518{
519 fn eq(&self, other: &str) -> bool
520 {
521 return self.timer.as_ref() == other;
522 }
523}
524
525
526impl<MODE, INTF> FdTimerMarker for OrderTimerDeque<MODE, INTF>
527where
528 MODE: OrderedTimerDequeMode,
529 INTF: OrderedTimerDequeHandle<MODE>
530{
531 fn clone_timer(&self) -> TimerFd
532 {
533 return self.timer.clone_timer();
534 }
535
536 fn get_strong_count(&self) -> usize
537 {
538 return self.timer.get_strong_count();
539 }
540}
541
542impl<MODE, INTF> Drop for OrderTimerDeque<MODE, INTF>
543where
544 MODE: OrderedTimerDequeMode,
545 INTF: OrderedTimerDequeHandle<MODE>
546{
547 fn drop(&mut self)
548 {
549 let _ = self.clean_up_timer();
550 }
551}
552
553impl<MODE, INTF> AsRef<str> for OrderTimerDeque<MODE, INTF>
554where
555 MODE: OrderedTimerDequeMode,
556 INTF: OrderedTimerDequeHandle<MODE>
557{
558 fn as_ref(&self) -> &str
559 {
560 return self.timer.as_ref();
561 }
562}
563
564impl<MODE, INTF> AsFd for OrderTimerDeque<MODE, INTF>
565where
566 MODE: OrderedTimerDequeMode,
567 INTF: OrderedTimerDequeHandle<MODE>
568{
569 fn as_fd(&self) -> BorrowedFd<'_>
570 {
571 return self.timer.as_fd();
572 }
573}
574
575impl<MODE, INTF> AsRawFd for OrderTimerDeque<MODE, INTF>
576where
577 MODE: OrderedTimerDequeMode,
578 INTF: OrderedTimerDequeHandle<MODE>
579{
580 fn as_raw_fd(&self) -> RawFd
581 {
582 return self.timer.as_raw_fd();
583 }
584}
585
586impl<MODE, INTF> fmt::Display for OrderTimerDeque<MODE, INTF>
587where
588 MODE: OrderedTimerDequeMode,
589 INTF: OrderedTimerDequeHandle<MODE>
590{
591 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
592 {
593 write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
594 self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
595 }
596}
597
598impl<MODE, INTF> PartialEq for OrderTimerDeque<MODE, INTF>
599where
600 MODE: OrderedTimerDequeMode,
601 INTF: OrderedTimerDequeHandle<MODE>
602{
603 fn eq(&self, other: &Self) -> bool
604 {
605 return self.timer == other.timer;
606 }
607}
608
609impl<MODE, INTF> PartialEq<RawFd> for OrderTimerDeque<MODE, INTF>
610where
611 MODE: OrderedTimerDequeMode,
612 INTF: OrderedTimerDequeHandle<MODE>
613{
614 fn eq(&self, other: &RawFd) -> bool
615 {
616 return self.timer == *other;
617 }
618}
619
620impl<MODE, R> OrderTimerDeque<MODE, TimerDequeConsumer<R, MODE>>
621where
622 MODE: OrderedTimerDequeMode,
623 R: PartialEq + Eq + fmt::Debug + fmt::Display + Send + Clone
624{
625 /// Adds the new absolute timeout to the timer deque instance.
626 ///
627 /// The `entity` is an item which should be stored in the deque
628 /// amd on timeout - returned.
629 ///
630 /// # Generics
631 ///
632 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
633 /// There are two options: [DequeOnce] and [DequePeriodic].
634 ///
635 /// # Arguemnts
636 ///
637 /// * `item` - `R` which is an item to store in the deque.
638 ///
639 /// * `mode` - `MODE` a timeout value which also defines the
640 /// behaviour of the deque logic.
641 ///
642 /// # Returns
643 ///
644 /// A [Result] as alias [TimerResult] is returned with:
645 ///
646 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
647 ///
648 /// * [Result::Err] with error description.
649 ///
650 /// Possible errors:
651 ///
652 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
653 ///
654 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
655 /// should never be zero.
656 ///
657 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
658 /// `errno`.
659 pub
660 fn add(&mut self, item: R, mode: MODE) -> TimerResult<()>
661 {
662 let inst =
663 TimerDequeConsumer::<R, MODE>::new(item, mode)?;
664
665 //return self.queue_item(inst)
666 let res = self.queue_item(inst);
667 return res;
668 }
669}
670
671
672impl<MODE> OrderTimerDeque<MODE, TimerDequeTicketIssuer<MODE>>
673where
674 MODE: OrderedTimerDequeMode
675{
676 /// Adds the new absolute timeout to the timer deque instance.
677 ///
678 /// This deque assigns the `ticket` for each timeout which is
679 /// used for identification and invalidation.
680 ///
681 /// # Generics
682 ///
683 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
684 /// There are two options: [DequeOnce] and [DequePeriodic].
685 ///
686 /// # Arguemnts
687 ///
688 /// * `mode` - `MODE` a timeout value which also defines the
689 /// behaviour of the deque logic.
690 ///
691 /// # Returns
692 ///
693 /// A [Result] as alias [TimerResult] is returned with:
694 ///
695 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
696 ///
697 /// * [Result::Err] with error description.
698 ///
699 /// Possible errors:
700 ///
701 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
702 ///
703 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
704 /// should never be zero.
705 ///
706 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
707 /// `errno`.
708 pub
709 fn add(&mut self, mode: MODE) -> TimerResult<TimerDequeTicket>
710 {
711 let (inst, ticket) =
712 TimerDequeTicketIssuer::<MODE>::new(mode)?;
713
714 self.queue_item(inst)?;
715
716 return Ok(ticket);
717 }
718}
719
720impl<MODE, INTF> OrderTimerDeque<MODE, INTF>
721where
722 MODE: OrderedTimerDequeMode,
723 INTF: OrderedTimerDequeHandle<MODE>
724{
725 /// Creates new deque instance with the provided parameters.
726 ///
727 /// # Argument
728 ///
729 /// * `timer_label` - a label which helps to identify the timer.
730 ///
731 /// * `deq_len` - a minimal, pre-allocated deque length.
732 ///
733 /// * `cloexec` - when set to `true` sets the `CLOEXEC` flag on FD.
734 ///
735 /// * `non_blocking` - when set to `true` sets the `TFD_NONBLOCK` flag on FD.
736 ///
737 /// # Returns
738 ///
739 /// A [Result] as alias [TimerResult] is returned with
740 ///
741 /// * [Result::Ok] with the instance.
742 ///
743 /// * [Result::Err] with the error description.
744 ///
745 /// The following errors may be returned:
746 ///
747 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
748 pub
749 fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool, non_blocking: bool) -> TimerResult<OrderTimerDeque<MODE, INTF>>
750 {
751 let deq_len =
752 if deq_len == 0
753 {
754 10
755 }
756 else
757 {
758 deq_len
759 };
760
761 let mut tf = TimerFlags::empty();
762
763 tf.set(TimerFlags::TFD_CLOEXEC, cloexec);
764 tf.set(TimerFlags::TFD_NONBLOCK, non_blocking);
765
766 // init timer
767 let timer =
768 TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, tf)
769 .map_err(|e|
770 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
771 )?;
772
773 return Ok(
774 Self
775 {
776 deque_timeout_list: VecDeque::with_capacity(deq_len),
777 timer: timer,
778 p: PhantomData,
779 }
780 );
781 }
782
783 /// internal function which is called from the `add()` functions.
784 pub(super)
785 fn queue_item(&mut self, inst: INTF) -> TimerResult<()>
786 {
787 if self.deque_timeout_list.len() == 0
788 {
789 let timer_stamp =
790 TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
791
792 // setup timer
793 self
794 .timer
795 .get_timer()
796 .set_time(timer_stamp)
797 .map_err(|e|
798 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
799 )?;
800
801 self.deque_timeout_list.push_front(inst);
802 }
803 else
804 {
805 // list can not be empty from this point
806 let front_timeout =
807 self.deque_timeout_list.front().unwrap().get_timeout_absolute();
808
809 // intances timeout
810 let inst_timeout = inst.get_timeout_absolute();
811
812 if front_timeout >= inst_timeout
813 {
814 // push to front
815 self.deque_timeout_list.push_front(inst);
816
817 self.reschedule_timer()?;
818 }
819 else
820 {
821 let back_banuntil =
822 self
823 .deque_timeout_list
824 .back()
825 .unwrap()
826 .get_timeout_absolute();
827
828 if back_banuntil <= inst_timeout
829 {
830 // push to the back
831 self.deque_timeout_list.push_back(inst);
832 }
833 else
834 {
835 let pos =
836 self
837 .deque_timeout_list
838 .binary_search_by( |se|
839 se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
840 )
841 .map_or_else(|e| e, |r| r);
842
843 self.deque_timeout_list.insert(pos, inst);
844 }
845 }
846 }
847
848 return Ok(());
849 }
850
851 /// Removes the instance from the queue by the identification depending on the
852 /// deque type.
853 ///
854 /// # Arguments
855 ///
856 /// `item` - identification of the item which should be removed from the queue.
857 ///
858 /// # Returns
859 ///
860 /// A [Result] as alias [TimerResult] is returned with:
861 ///
862 /// * [Result::Ok] with the inner type [Option] where
863 /// * [Option::Some] is returned with the consumed `item`.
864 /// * [Option::None] is returned when item was not found.
865 ///
866 /// * [Result::Err] with error description.
867 ///
868 /// The following errors may be returned:
869 ///
870 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
871 pub
872 fn remove_from_queue(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF::TimerId>>
873 {
874 return
875 self
876 .remove_from_queue_int(item)
877 .map(|opt_intf|
878 {
879 let Some(intf) = opt_intf
880 else { return None };
881
882 return intf.into_timer_id();
883 }
884 );
885
886 }
887
888 pub(super)
889 fn remove_from_queue_int(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF>>
890 {
891 if self.deque_timeout_list.len() == 0
892 {
893 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
894 }
895 else
896 {
897 // search for the item in list
898 if self.deque_timeout_list.len() == 1
899 {
900 // just pop the from the front
901 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
902
903 if &ret_ent != item
904 {
905 self.deque_timeout_list.push_front(ret_ent);
906
907 return Ok(None);
908 }
909
910 // stop timer
911 self.stop_timer()?;
912
913 return Ok(Some(ret_ent));
914 }
915 else
916 {
917 // in theory the `ticket` is a reference to ARC, so the weak should be upgraded
918 // succesfully for temoved instance.
919
920 for (pos, q_item)
921 in self.deque_timeout_list.iter().enumerate()
922 {
923 if q_item == item
924 {
925 // remove by the index
926 let ret_ent =
927 self.deque_timeout_list.remove(pos).unwrap();
928
929 // call timer reset if index is 0 (front)
930 if pos == 0
931 {
932 self.reschedule_timer()?;
933 }
934
935 return Ok(Some(ret_ent));
936 }
937 }
938
939 return Ok(None);
940 }
941 }
942 }
943
944 /// Reads the timer's FD to retrive the event type and then calling
945 /// the event handler. Then the result is returned. The result may
946 /// contain the `items` which were removed from the queue or copied.
947 ///
948 /// This function behaves differently when the timer is initialized as
949 /// a `non-blocking` i.e with [TimerFlags::TFD_NONBLOCK].
950 ///
951 /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
952 /// In case of 'EINTR', the read attempt will be repeated.
953 ///
954 /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result
955 /// immidiatly.
956 ///
957 /// # Return
958 ///
959 /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned.
960 ///
961 /// * In case of `ECANCELLD`, the [TimerReadRes::Cancelled] will be returned.
962 ///
963 /// * In case of any other error the [Result::Err] is returned.
964 ///
965 /// When a timer fires an event the [Result::Ok] is returned with the amount of
966 /// timer overrun. Normally it is 1.
967 pub
968 fn wait_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
969 {
970 let res =
971 self
972 .timer
973 .read()
974 .map_err(|e|
975 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
976 )?;
977
978
979 // ignore wouldblock
980 if let TimerReadRes::WouldBlock = res
981 {
982 return Ok(None);
983 }
984
985 return Ok(self.internal_handle_timer_event()?);
986 }
987
988 /// Handles the single event received from the `poll` and
989 /// returns the result. The result may
990 /// contain the `items` which were removed from the queue or copied.
991 ///
992 /// # Arguments
993 ///
994 /// `pet` - [PollEventType] an event from the timer to handle.
995 ///
996 /// A [Result] as alias [TimerResult] is returned with:
997 ///
998 /// * [Result::Ok] witout any innder data.
999 ///
1000 /// * [Result::Err] with error description.
1001 pub
1002 fn handle_timer_event(&mut self, pet: PollEventType) -> TimerResult<Option<INTF::HandleRes>>
1003 {
1004 match pet
1005 {
1006 PollEventType::TimerRes(_, res) =>
1007 {
1008 // ignore wouldblock
1009 if let TimerReadRes::WouldBlock = res
1010 {
1011 return Ok(None);
1012 }
1013
1014 return self.internal_handle_timer_event();
1015 },
1016 PollEventType::SubError(_, err) =>
1017 {
1018 timer_err!(TimerErrorType::TimerError(err.get_errno()), "{}", err)
1019 }
1020 }
1021
1022 }
1023
1024 fn internal_handle_timer_event(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1025 {
1026 let cur_timestamp = AbsoluteTime::now();
1027 let mut timer_ids: INTF::HandleRes = INTF::HandleRes::new();
1028
1029 loop
1030 {
1031 // get from front of the queue
1032 let Some(front_entity) = self.deque_timeout_list.front()
1033 else { break };
1034
1035 let time_until = front_entity.get_timeout_absolute();
1036
1037 if time_until <= cur_timestamp
1038 {
1039 let deq = self.deque_timeout_list.pop_front().unwrap();
1040
1041 deq.handle(self, &mut timer_ids)?;
1042 }
1043 else
1044 {
1045 break;
1046 }
1047 }
1048
1049 // call timer reschedule
1050 self.reschedule_timer()?;
1051
1052 return Ok(timer_ids.into_option());
1053 }
1054
1055 /// Asynchronious polling. The timer's FD is set to nonblocking,
1056 /// so each time it will return `pending` and load CPU. If you are using
1057 /// `tokio` or `smol` or other crate, the corresponding helpers like
1058 /// tokio's `AsyncFd` can be used to wrap the instance. The timer is read-only
1059 /// so use `read-only interest` to avoid errors.
1060 ///
1061 /// If [TimerReadRes::WouldBlock] is received then returns immidiatly.
1062 pub async
1063 fn async_poll_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1064 {
1065 let res =
1066 self
1067 .timer
1068 .get_timer()
1069 .await
1070 .map_err(|e|
1071 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1072 )?;
1073
1074 // ignore wouldblock
1075 if let TimerReadRes::WouldBlock = res
1076 {
1077 return Ok(None);
1078 }
1079
1080 return Ok(self.internal_handle_timer_event()?);
1081 }
1082
1083 /// Returns the queue length.
1084 pub
1085 fn timer_queue_len(&self) -> usize
1086 {
1087 return self.deque_timeout_list.len();
1088 }
1089
1090 /// Postpones the instace `target` by the relative time `rel_time_off`.
1091 ///
1092 /// # Errors
1093 ///
1094 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1095 ///
1096 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1097 ///
1098 /// * [TimerErrorType::TimerError] - with the suberror code.
1099 pub
1100 fn postpone(&mut self, target: &INTF::TimerId, rel_time_off: RelativeTime) -> TimerResult<()>
1101 {
1102 let mut item =
1103 self
1104 .remove_from_queue_int(target)?
1105 .ok_or(map_timer_err!(TimerErrorType::NotFound, "ticket: {} not found", target))?;
1106
1107
1108 item.postpone(rel_time_off)?;
1109
1110 self.queue_item(item)?;
1111
1112 return Ok(());
1113 }
1114
1115 /// Reschedules the instace `target` by assigning new time `time`
1116 /// which is of type `MODE`.
1117 ///
1118 /// # Errors
1119 ///
1120 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1121 ///
1122 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1123 ///
1124 /// * [TimerErrorType::TimerError] - with the suberror code.
1125 ///
1126 /// * [TimerErrorType::Expired] - if provided `time` have passed.
1127 pub
1128 fn reschedule(&mut self, target: &INTF::TimerId, time: MODE) -> TimerResult<()>
1129 {
1130 let mut item =
1131 self.remove_from_queue_int(target)?
1132 .ok_or(map_timer_err!(TimerErrorType::NotFound, "{} not found", target))?;
1133
1134
1135 item.resched(time)?;
1136
1137 self.queue_item(item)?;
1138
1139 return Ok(());
1140 }
1141
1142 /// Starts the `timer` instance by setting timeout or stops the `timer` if the
1143 /// instnce's `queue` is empty.
1144 pub(super)
1145 fn reschedule_timer(&mut self) -> TimerResult<()>
1146 {
1147 if let Some(front_entity) = self.deque_timeout_list.front()
1148 {
1149 let timer_exp =
1150 TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
1151
1152 return
1153 self
1154 .timer
1155 .get_timer()
1156 .set_time(timer_exp)
1157 .map_err(|e|
1158 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1159 );
1160 }
1161 else
1162 {
1163 // queue is empty, force timer to stop
1164
1165 return
1166 self
1167 .timer
1168 .get_timer()
1169 .unset_time()
1170 .map_err(|e|
1171 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1172 );
1173 }
1174 }
1175
1176 /// Unarms timer and clears the queue.
1177 pub
1178 fn clean_up_timer(&mut self) -> TimerResult<()>
1179 {
1180 self
1181 .timer
1182 .get_timer()
1183 .unset_time()
1184 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
1185
1186 self.deque_timeout_list.clear();
1187
1188 return Ok(());
1189 }
1190
1191 /// Unarms timer only.
1192 pub
1193 fn stop_timer(&mut self) -> TimerResult<()>
1194 {
1195 return
1196 self
1197 .timer
1198 .get_timer()
1199 .unset_time()
1200 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
1201 }
1202
1203}