timer_deque_rs/deque_timeout/mod.rs
1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 * 4neko.org alex@4neko.org
7 *
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *
13 * 2. The MIT License (MIT)
14 *
15 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18/// A `consumer` type of the timer which consumes the intance and returns it when
19/// timer triggers. The `consumed` instance normally whould be [Send] because it will
20/// be moved into the timer which may be processed in other thread.
21pub mod timer_consumer;
22
23/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
24/// to the timer's queue. The `ticket` can be used to remove the item from queue before the
25/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
26/// in timer's queue until timeout where it will be ignored on timeout event.
27pub mod timer_tickets;
28
29#[cfg(test)]
30mod tests;
31
32pub use std::os::fd::{AsFd, AsRawFd};
33
34use std::
35{
36 borrow::Cow,
37 collections::VecDeque,
38 fmt,
39 marker::PhantomData,
40 os::fd::{BorrowedFd, RawFd},
41};
42
43
44use crate::
45{
46 TimerDequeConsumer,
47 TimerDequeTicket,
48 TimerDequeTicketIssuer,
49 error::{TimerErrorType, TimerResult},
50 map_timer_err,
51 timer_err,
52 timer_portable::
53 {
54 FdTimerMarker,
55 PollEventType,
56 TimerFd,
57 portable_error::TimerPortResult,
58 timer::
59 {
60 AbsoluteTime, FdTimerCom, FdTimerRead, RelativeTime, TimerExpMode,
61 TimerFlags, TimerReadRes, TimerType
62 }
63 }
64};
65
66
67/// A trait which is implemented by the structs which define the operation mode of the deque.
68///
69/// At the moment for: [DequeOnce] and [DequePeriodic].
70pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
71{
72 /// A type of operation. If the item deques once, then this value should be
73 /// `true`. Otherwise, it is periodic.
74 const IS_ONCE: bool;
75
76 /// Checks the current instance against the provided `cmp` [AbsoluteTime].
77 /// It is expected that the current instance's timeout value is actual and
78 /// haven't outlive the `cmp` already. if it does the error:
79 /// [TimerErrorType::Expired] should be returned. Or any other error
80 /// if it is required.
81 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>;
82
83 /// Returns the `absolute` timeout for the instance of the deque.
84 fn get_absolut_timeout(&self) -> AbsoluteTime;
85
86 /// Postpones the time by the relative offset `post_time`. May return error
87 /// (if required) if the absolut timeout value overflows.
88 ///
89 /// May return [TimerErrorType::TicketInstanceGone] if ticket was invalidated.
90 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>;
91
92 /// Updates the timeout when the deque works in periodic mode. By fedault
93 /// it does nothing.
94 fn advance_timeout(&mut self)
95 {
96 return;
97 }
98}
99
100/// This queue mode removes all entries from the queue that have timed out.
101///
102/// The further behaviour is defined by the type of the deque.
103#[derive(Debug, Clone, Copy)]
104pub struct DequeOnce
105{
106 /// A timeout for the item in the queue.
107 absolute_timeout: AbsoluteTime,
108}
109
110impl fmt::Display for DequeOnce
111{
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
113 {
114 write!(f, "{}", self.absolute_timeout)
115 }
116}
117
118impl Ord for DequeOnce
119{
120 fn cmp(&self, other: &Self) -> std::cmp::Ordering
121 {
122 return self.absolute_timeout.cmp(&other.absolute_timeout);
123 }
124}
125
126impl PartialOrd for DequeOnce
127{
128 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
129 {
130 return Some(self.cmp(other));
131 }
132}
133
134impl Eq for DequeOnce {}
135
136impl PartialEq for DequeOnce
137{
138 fn eq(&self, other: &Self) -> bool
139 {
140 return self.absolute_timeout == other.absolute_timeout;
141 }
142}
143
144
145impl OrderedTimerDequeMode for DequeOnce
146{
147 const IS_ONCE: bool = true;
148
149 fn get_absolut_timeout(&self) -> AbsoluteTime
150 {
151 return self.absolute_timeout;
152 }
153
154 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
155 {
156 if cmp > self.absolute_timeout
157 {
158 timer_err!(TimerErrorType::Expired,
159 "deque once time already expired, now: {}, req: {}", cmp, self);
160 }
161
162 return Ok(());
163 }
164
165 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
166 {
167 self.absolute_timeout = self.absolute_timeout + posp_time;
168
169 return Ok(());
170 }
171}
172
173
174impl DequeOnce
175{
176 /// Creates new instacne.
177 #[inline]
178 pub
179 fn new(absolute_timeout: impl Into<AbsoluteTime>) -> Self
180 {
181 return Self{ absolute_timeout: absolute_timeout.into() };
182 }
183}
184
185/// This queue mode does not remove an element that has timed out (by `absolute_timeout`),
186/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
187#[derive(Debug, Clone, Copy)]
188pub struct DequePeriodic
189{
190 /// Extends the timer until the next timeout. This is `relative`
191 /// not absolute.
192 relative_period: RelativeTime,
193
194 /// A timeout value.
195 absolute_timeout: AbsoluteTime,
196}
197
198impl fmt::Display for DequePeriodic
199{
200 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
201 {
202 write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
203 }
204}
205
206impl Ord for DequePeriodic
207{
208 fn cmp(&self, other: &Self) -> std::cmp::Ordering
209 {
210 return self.absolute_timeout.cmp(&other.absolute_timeout);
211 }
212}
213
214impl PartialOrd for DequePeriodic
215{
216 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
217 {
218 return Some(self.cmp(other));
219 }
220}
221
222impl Eq for DequePeriodic {}
223
224impl PartialEq for DequePeriodic
225{
226 fn eq(&self, other: &Self) -> bool
227 {
228 return self.absolute_timeout == other.absolute_timeout;
229 }
230}
231
232
233impl OrderedTimerDequeMode for DequePeriodic
234{
235 const IS_ONCE: bool = false;
236
237 fn get_absolut_timeout(&self) -> AbsoluteTime
238 {
239 return self.absolute_timeout;
240 }
241
242 fn advance_timeout(&mut self)
243 {
244 self.absolute_timeout += self.relative_period;
245 }
246
247 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
248 {
249 if cmp > self.absolute_timeout
250 {
251 timer_err!(TimerErrorType::Expired,
252 "deque periodic absolute time already expired, now: {}, req: {}", cmp, self.absolute_timeout);
253 }
254 else if self.relative_period.is_zero() == false
255 {
256 timer_err!(TimerErrorType::ZeroRelativeTime,
257 "deque periodic relative time is 0, rel_time: {}", self.relative_period);
258 }
259
260 return Ok(());
261 }
262
263 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
264 {
265 self.absolute_timeout = self.absolute_timeout - self.relative_period + posp_time;
266
267 self.relative_period = posp_time;
268
269 return Ok(());
270 }
271}
272
273impl DequePeriodic
274{
275 /// Creates new instance. The `relative_time` is not added to
276 /// `absolute_time` until timeout. The values are unchecked at this
277 /// point and will be checked during placement in queue.
278 pub
279 fn new_from_now(rel_time: impl Into<RelativeTime>) -> Self
280 {
281 let inst =
282 Self
283 {
284 relative_period:
285 rel_time.into(),
286 absolute_timeout:
287 AbsoluteTime::now(),
288 };
289
290 return inst;
291 }
292
293 /// Creates new instance with the initial timeout `abs_time` and
294 /// period `rel_time` which is added to the `abs_time` each time
295 /// the timeout occures. The values are unchecked at this
296 /// point and will be checked during placement in queue.
297 pub
298 fn new(abs_time: impl Into<AbsoluteTime>, rel_time: impl Into<RelativeTime>) -> Self
299 {
300 let inst =
301 Self
302 {
303 relative_period:
304 rel_time.into(),
305 absolute_timeout:
306 abs_time.into(),
307 };
308
309 return inst;
310 }
311}
312
313/// A trait for the generalization of the collection type which is used to
314/// collect the timeout values i.e ticket IDs.
315pub trait TimerTimeoutCollection<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug>
316{
317 /// Initializes the instance.
318 fn new() -> Self;
319
320 /// Store the `ITEM` in the collection.
321 fn push(&mut self, item: ITEM);
322
323 /// Wraps the instance into [Option]. If collection is `empty` a
324 /// [Option::None] should be returned.
325 fn into_option(self) -> Option<Self> where Self: Sized;
326}
327
328/// An implementation for the Vec.
329impl<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
330for Vec<ITEM>
331{
332 fn new() -> Self
333 {
334 return Vec::new();
335 }
336
337 fn push(&mut self, item: ITEM)
338 {
339 self.push(item);
340 }
341
342 fn into_option(self) -> Option<Self>
343 {
344 if self.is_empty() == false
345 {
346 return Some(self);
347 }
348 else
349 {
350 return None;
351 }
352 }
353}
354
355/// A dummy implementation when nothing is returned. Unused at the moment.
356impl <ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
357for ()
358{
359 fn new() -> Self
360 {
361 return ();
362 }
363
364 fn push(&mut self, _item: ITEM)
365 {
366 return;
367 }
368
369 fn into_option(self) -> Option<Self>
370 {
371 return None;
372 }
373}
374
375/// A standart interface for each deque type. Every deque type must implement this
376/// trait.
377pub trait OrderedTimerDequeHandle<MODE: OrderedTimerDequeMode>
378 : Ord + PartialOrd + PartialEq + PartialEq<Self::TimerId> + Eq + fmt::Debug +
379 fmt::Display + OrderedTimerDequeInterf<MODE>
380{
381 /// A timer ID. Normally this is a uniq identificator of the item in the queue.
382 /// i.e `TimerDequeId`.
383 type TimerId: PartialEq + Eq + fmt::Display + fmt::Debug;
384
385 /// A collection which is used to collect the items which are removed from
386 /// the queue due to timeout.
387 type HandleRes: TimerTimeoutCollection<Self::TimerId>;
388
389 /// Postpones the timeout of the current instance. The `postp_time` is a [RelativeTime]
390 /// i.e inroduces the offset.
391 fn postpone(&mut self, postp_time: RelativeTime) -> TimerResult<()>;
392
393 /// Reschedules the current instance. This is different from the `postpone` as the
394 /// instance is assagned with a new time. The `MODE` cannot be changed, only time.
395 fn resched(&mut self, time: MODE) -> TimerResult<()>;
396
397 /// A spefic code which is called during timeout routine handling. Normally is should
398 /// store the result into the `collection` and reschedule the item if it is repeated.
399 fn handle(self, timer_self: &mut OrderTimerDeque<MODE, Self>,
400 timer_ids: &mut Self::HandleRes) -> TimerResult<()>
401 where Self: Sized;
402
403 /// Matches the current instance with provided `other` instances
404 /// [OrderedTimerDequeHandle::TimerId]. But, the `PartialEq<Self::TimerId>` is
405 /// implemented, so the `==` can be used to compare.
406 fn is_same(&self, other: &Self::TimerId) -> bool;
407
408 /// Attempts to acquire the [OrderedTimerDequeHandle::TimerId] by consuming the instance.
409 fn into_timer_id(self) -> Option<Self::TimerId>;
410}
411
412/// A trait which is used by the base [OrderTimerDeque].
413/// This is an interface which provides access to the timeout value of the instance.
414pub trait OrderedTimerDequeInterf<MODE: OrderedTimerDequeMode>
415{
416 /// Returns the absolute time and the timer mode.
417 fn get_timeout_absolute(&self) -> AbsoluteTime;
418}
419
420
421/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout
422/// which is `absolute` time.
423///
424/// The queue automatically manages the `timer` i.e setting, unsetting.
425///
426/// Also for each type of the deque, a event procesing function is providided.
427///
428/// There are two types of queue:
429///
430/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
431///
432/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
433/// until the item is not removed from the queue manually.
434///
435/// And there are 3 types of queue models:
436///
437/// * [`crate::TimerDequeConsumer`] - consumes the item which is stored in the
438/// timeout queue.
439///
440/// * [`crate::TimerDequeTicketIssuer`] - issues a ticket which is referenced
441/// to the item in the queue.
442///
443/// * [`crate::TimerDequeSignalTicket`] - send signal on timeout using `MPSC`
444///
445/// # Implementations
446///
447/// If `feature = enable_mio_compat` is enabled a [mio::event::Source] is
448/// implemented on the current struct.
449///
450/// ## Multithread
451///
452/// Not MT-safe. The external mutex should be used to protect the instance.
453/// The internal `timer` [TimerFd] is MT-safe.
454///
455/// ## Poll
456///
457/// - A built-in [crate::TimerPoll] can be used.
458///
459/// - An external crate MIO [crate::TimerFdMioCompat] if `feature = enable_mio_compat` is
460/// enabled.
461///
462/// - User implemented poll. The FD can be aquired via [AsRawFd] [AsFd].
463///
464/// ## Async
465///
466/// A [Future] is implemented.
467/// # Generics
468///
469/// * `DQI` - a deque type. There are three types are available:
470/// * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
471/// ```ignore
472/// let mut time_list =
473/// OrderedTimerDeque
474/// ::<TimerDequeTicketIssuer<OrderdTimerDequeOnce>>
475/// ::new("test_label".into(), 4, false).unwrap();
476/// ```
477/// or
478/// ```ignore
479/// let mut time_list =
480/// OrderedTimerDeque
481/// ::<TimerDequeTicketIssuer<OrderdTimerDequePeriodic>>
482/// ::new("test_label".into(), 4, false).unwrap();
483/// ```
484/// * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
485/// ```ignore
486/// let mut time_list =
487/// OrderedTimerDeque
488/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequeOnce>>
489/// ::new("test_label".into(), 4, false).unwrap();
490/// ```
491/// or
492/// ```ignore
493/// let mut time_list =
494/// OrderedTimerDeque
495/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequePeriodic>>
496/// ::new("test_label".into(), 4, false).unwrap();
497/// ```
498/// * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
499/// ```ignore
500/// let mut time_list =
501/// OrderedTimerDeque
502/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
503/// ::new("test_label".into(), 4, false).unwrap();
504/// ```
505/// or
506/// ```ignore
507/// let mut time_list =
508/// OrderedTimerDeque
509/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
510/// ::new("test_label".into(), 4, false).unwrap();
511/// ```
512#[derive(Debug, Eq)]
513pub struct OrderTimerDeque<MODE: OrderedTimerDequeMode, INTF: OrderedTimerDequeHandle<MODE>>
514{
515 /// A [VecDeque] list which is sorted by time in ascending order -
516 /// lower first, largest last
517 pub(crate) deque_timeout_list: VecDeque<INTF>,
518
519 /// An instance of the FFI (Kernel Supported Timer)
520 pub(crate) timer: TimerFd,
521
522 p: PhantomData<MODE>
523}
524
525#[cfg(feature = "enable_mio_compat")]
526pub mod mio_compat
527{
528 use std::{io, os::fd::{AsRawFd, RawFd}};
529
530 use mio::{Token, unix::SourceFd};
531
532 use crate::{TimerFdMioCompat, deque_timeout::{OrderTimerDeque, OrderedTimerDequeHandle, OrderedTimerDequeMode}};
533
534 impl<MODE, INTF> mio::event::Source for OrderTimerDeque<MODE, INTF>
535 where
536 MODE: OrderedTimerDequeMode,
537 INTF: OrderedTimerDequeHandle<MODE>
538 {
539 fn register(
540 &mut self,
541 registry: &mio::Registry,
542 token: mio::Token,
543 interests: mio::Interest,
544 ) -> io::Result<()>
545 {
546 return
547 SourceFd(&self.as_raw_fd()).register(registry, token, interests);
548 }
549
550 fn reregister(
551 &mut self,
552 registry: &mio::Registry,
553 token: mio::Token,
554 interests: mio::Interest,
555 ) -> io::Result<()>
556 {
557 return
558 SourceFd(&self.as_raw_fd()).reregister(registry, token, interests);
559 }
560
561 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()>
562 {
563 return
564 SourceFd(&self.as_raw_fd()).deregister(registry)
565 }
566 }
567
568 impl<MODE, INTF> PartialEq<Token> for OrderTimerDeque<MODE, INTF>
569 where
570 MODE: OrderedTimerDequeMode,
571 INTF: OrderedTimerDequeHandle<MODE>
572 {
573 fn eq(&self, other: &Token) -> bool
574 {
575 return self.as_raw_fd() == other.0 as RawFd;
576 }
577 }
578
579 impl<MODE, INTF> TimerFdMioCompat for OrderTimerDeque<MODE, INTF>
580 where
581 MODE: OrderedTimerDequeMode,
582 INTF: OrderedTimerDequeHandle<MODE>
583 {
584 fn get_token(&self) -> Token
585 {
586 return Token(self.as_raw_fd() as usize);
587 }
588 }
589}
590
591impl<MODE, INTF> FdTimerRead for OrderTimerDeque<MODE, INTF>
592where
593 MODE: OrderedTimerDequeMode,
594 INTF: OrderedTimerDequeHandle<MODE>
595{
596 fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
597 {
598 return self.timer.read();
599 }
600}
601
602impl<MODE, INTF> PartialEq<str> for OrderTimerDeque<MODE, INTF>
603where
604 MODE: OrderedTimerDequeMode,
605 INTF: OrderedTimerDequeHandle<MODE>
606{
607 fn eq(&self, other: &str) -> bool
608 {
609 return self.timer.as_ref() == other;
610 }
611}
612
613
614impl<MODE, INTF> FdTimerMarker for OrderTimerDeque<MODE, INTF>
615where
616 MODE: OrderedTimerDequeMode,
617 INTF: OrderedTimerDequeHandle<MODE>
618{
619 fn clone_timer(&self) -> TimerFd
620 {
621 return self.timer.clone_timer();
622 }
623
624 fn get_strong_count(&self) -> usize
625 {
626 return self.timer.get_strong_count();
627 }
628}
629
630impl<MODE, INTF> Drop for OrderTimerDeque<MODE, INTF>
631where
632 MODE: OrderedTimerDequeMode,
633 INTF: OrderedTimerDequeHandle<MODE>
634{
635 fn drop(&mut self)
636 {
637 let _ = self.clean_up_timer();
638 }
639}
640
641impl<MODE, INTF> AsRef<str> for OrderTimerDeque<MODE, INTF>
642where
643 MODE: OrderedTimerDequeMode,
644 INTF: OrderedTimerDequeHandle<MODE>
645{
646 fn as_ref(&self) -> &str
647 {
648 return self.timer.as_ref();
649 }
650}
651
652impl<MODE, INTF> AsFd for OrderTimerDeque<MODE, INTF>
653where
654 MODE: OrderedTimerDequeMode,
655 INTF: OrderedTimerDequeHandle<MODE>
656{
657 fn as_fd(&self) -> BorrowedFd<'_>
658 {
659 return self.timer.as_fd();
660 }
661}
662
663impl<MODE, INTF> AsRawFd for OrderTimerDeque<MODE, INTF>
664where
665 MODE: OrderedTimerDequeMode,
666 INTF: OrderedTimerDequeHandle<MODE>
667{
668 fn as_raw_fd(&self) -> RawFd
669 {
670 return self.timer.as_raw_fd();
671 }
672}
673
674impl<MODE, INTF> fmt::Display for OrderTimerDeque<MODE, INTF>
675where
676 MODE: OrderedTimerDequeMode,
677 INTF: OrderedTimerDequeHandle<MODE>
678{
679 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
680 {
681 write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
682 self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
683 }
684}
685
686impl<MODE, INTF> PartialEq for OrderTimerDeque<MODE, INTF>
687where
688 MODE: OrderedTimerDequeMode,
689 INTF: OrderedTimerDequeHandle<MODE>
690{
691 fn eq(&self, other: &Self) -> bool
692 {
693 return self.timer == other.timer;
694 }
695}
696
697impl<MODE, INTF> PartialEq<RawFd> for OrderTimerDeque<MODE, INTF>
698where
699 MODE: OrderedTimerDequeMode,
700 INTF: OrderedTimerDequeHandle<MODE>
701{
702 fn eq(&self, other: &RawFd) -> bool
703 {
704 return self.timer == *other;
705 }
706}
707
708impl<MODE, R> OrderTimerDeque<MODE, TimerDequeConsumer<R, MODE>>
709where
710 MODE: OrderedTimerDequeMode,
711 R: PartialEq + Eq + fmt::Debug + fmt::Display + Send + Clone
712{
713 /// Adds the new absolute timeout to the timer deque instance.
714 ///
715 /// The `entity` is an item which should be stored in the deque
716 /// amd on timeout - returned.
717 ///
718 /// # Generics
719 ///
720 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
721 /// There are two options: [DequeOnce] and [DequePeriodic].
722 ///
723 /// # Arguemnts
724 ///
725 /// * `item` - `R` which is an item to store in the deque.
726 ///
727 /// * `mode` - `MODE` a timeout value which also defines the
728 /// behaviour of the deque logic.
729 ///
730 /// # Returns
731 ///
732 /// A [Result] as alias [TimerResult] is returned with:
733 ///
734 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
735 ///
736 /// * [Result::Err] with error description.
737 ///
738 /// Possible errors:
739 ///
740 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
741 ///
742 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
743 /// should never be zero.
744 ///
745 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
746 /// `errno`.
747 pub
748 fn add(&mut self, item: R, mode: MODE) -> TimerResult<()>
749 {
750 let inst =
751 TimerDequeConsumer::<R, MODE>::new(item, mode)?;
752
753 //return self.queue_item(inst)
754 let res = self.queue_item(inst);
755 return res;
756 }
757}
758
759
760impl<MODE> OrderTimerDeque<MODE, TimerDequeTicketIssuer<MODE>>
761where
762 MODE: OrderedTimerDequeMode
763{
764 /// Adds the new absolute timeout to the timer deque instance.
765 ///
766 /// This deque assigns the `ticket` for each timeout which is
767 /// used for identification and invalidation.
768 ///
769 /// # Generics
770 ///
771 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
772 /// There are two options: [DequeOnce] and [DequePeriodic].
773 ///
774 /// # Arguemnts
775 ///
776 /// * `mode` - `MODE` a timeout value which also defines the
777 /// behaviour of the deque logic.
778 ///
779 /// # Returns
780 ///
781 /// A [Result] as alias [TimerResult] is returned with:
782 ///
783 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
784 ///
785 /// * [Result::Err] with error description.
786 ///
787 /// Possible errors:
788 ///
789 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
790 ///
791 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
792 /// should never be zero.
793 ///
794 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
795 /// `errno`.
796 pub
797 fn add(&mut self, mode: MODE) -> TimerResult<TimerDequeTicket>
798 {
799 let (inst, ticket) =
800 TimerDequeTicketIssuer::<MODE>::new(mode)?;
801
802 self.queue_item(inst)?;
803
804 return Ok(ticket);
805 }
806}
807
808impl<MODE, INTF> OrderTimerDeque<MODE, INTF>
809where
810 MODE: OrderedTimerDequeMode,
811 INTF: OrderedTimerDequeHandle<MODE>
812{
813 /// Creates new deque instance with the provided parameters.
814 ///
815 /// # Argument
816 ///
817 /// * `timer_label` - a label which helps to identify the timer.
818 ///
819 /// * `deq_len` - a minimal, pre-allocated deque length.
820 ///
821 /// * `cloexec` - when set to `true` sets the `CLOEXEC` flag on FD.
822 ///
823 /// * `non_blocking` - when set to `true` sets the `TFD_NONBLOCK` flag on FD.
824 ///
825 /// # Returns
826 ///
827 /// A [Result] as alias [TimerResult] is returned with
828 ///
829 /// * [Result::Ok] with the instance.
830 ///
831 /// * [Result::Err] with the error description.
832 ///
833 /// The following errors may be returned:
834 ///
835 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
836 pub
837 fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool, non_blocking: bool) -> TimerResult<OrderTimerDeque<MODE, INTF>>
838 {
839 let deq_len =
840 if deq_len == 0
841 {
842 10
843 }
844 else
845 {
846 deq_len
847 };
848
849 let mut tf = TimerFlags::empty();
850
851 tf.set(TimerFlags::TFD_CLOEXEC, cloexec);
852 tf.set(TimerFlags::TFD_NONBLOCK, non_blocking);
853
854 // init timer
855 let timer =
856 TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, tf)
857 .map_err(|e|
858 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
859 )?;
860
861 return Ok(
862 Self
863 {
864 deque_timeout_list: VecDeque::with_capacity(deq_len),
865 timer: timer,
866 p: PhantomData,
867 }
868 );
869 }
870
871 /// internal function which is called from the `add()` functions.
872 pub(super)
873 fn queue_item(&mut self, inst: INTF) -> TimerResult<()>
874 {
875 if self.deque_timeout_list.len() == 0
876 {
877 let timer_stamp =
878 TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
879
880 // setup timer
881 self
882 .timer
883 .get_timer()
884 .set_time(timer_stamp)
885 .map_err(|e|
886 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
887 )?;
888
889 self.deque_timeout_list.push_front(inst);
890 }
891 else
892 {
893 // list can not be empty from this point
894 let front_timeout =
895 self.deque_timeout_list.front().unwrap().get_timeout_absolute();
896
897 // intances timeout
898 let inst_timeout = inst.get_timeout_absolute();
899
900 if front_timeout >= inst_timeout
901 {
902 // push to front
903 self.deque_timeout_list.push_front(inst);
904
905 self.reschedule_timer()?;
906 }
907 else
908 {
909 let back_banuntil =
910 self
911 .deque_timeout_list
912 .back()
913 .unwrap()
914 .get_timeout_absolute();
915
916 if back_banuntil <= inst_timeout
917 {
918 // push to the back
919 self.deque_timeout_list.push_back(inst);
920 }
921 else
922 {
923 let pos =
924 self
925 .deque_timeout_list
926 .binary_search_by( |se|
927 se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
928 )
929 .map_or_else(|e| e, |r| r);
930
931 self.deque_timeout_list.insert(pos, inst);
932 }
933 }
934 }
935
936 return Ok(());
937 }
938
939 /// Removes the instance from the queue by the identification depending on the
940 /// deque type.
941 ///
942 /// # Arguments
943 ///
944 /// `item` - identification of the item which should be removed from the queue.
945 ///
946 /// # Returns
947 ///
948 /// A [Result] as alias [TimerResult] is returned with:
949 ///
950 /// * [Result::Ok] with the inner type [Option] where
951 /// * [Option::Some] is returned with the consumed `item`.
952 /// * [Option::None] is returned when item was not found.
953 ///
954 /// * [Result::Err] with error description.
955 ///
956 /// The following errors may be returned:
957 ///
958 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
959 pub
960 fn remove_from_queue(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF::TimerId>>
961 {
962 return
963 self
964 .remove_from_queue_int(item)
965 .map(|opt_intf|
966 {
967 let Some(intf) = opt_intf
968 else { return None };
969
970 return intf.into_timer_id();
971 }
972 );
973
974 }
975
976 pub(super)
977 fn remove_from_queue_int(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF>>
978 {
979 if self.deque_timeout_list.len() == 0
980 {
981 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
982 }
983 else
984 {
985 // search for the item in list
986 if self.deque_timeout_list.len() == 1
987 {
988 // just pop the from the front
989 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
990
991 if &ret_ent != item
992 {
993 self.deque_timeout_list.push_front(ret_ent);
994
995 return Ok(None);
996 }
997
998 // stop timer
999 self.stop_timer()?;
1000
1001 return Ok(Some(ret_ent));
1002 }
1003 else
1004 {
1005 // in theory the `ticket` is a reference to ARC, so the weak should be upgraded
1006 // succesfully for temoved instance.
1007
1008 for (pos, q_item)
1009 in self.deque_timeout_list.iter().enumerate()
1010 {
1011 if q_item == item
1012 {
1013 // remove by the index
1014 let ret_ent =
1015 self.deque_timeout_list.remove(pos).unwrap();
1016
1017 // call timer reset if index is 0 (front)
1018 if pos == 0
1019 {
1020 self.reschedule_timer()?;
1021 }
1022
1023 return Ok(Some(ret_ent));
1024 }
1025 }
1026
1027 return Ok(None);
1028 }
1029 }
1030 }
1031
1032 /// Reads the timer's FD to retrive the event type and then calling
1033 /// the event handler. Then the result is returned. The result may
1034 /// contain the `items` which were removed from the queue or copied.
1035 ///
1036 /// This function behaves differently when the timer is initialized as
1037 /// a `non-blocking` i.e with [TimerFlags::TFD_NONBLOCK].
1038 ///
1039 /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
1040 /// In case of 'EINTR', the read attempt will be repeated.
1041 ///
1042 /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result
1043 /// immidiatly.
1044 ///
1045 /// # Return
1046 ///
1047 /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned.
1048 ///
1049 /// * In case of `ECANCELLD`, the [TimerReadRes::Cancelled] will be returned.
1050 ///
1051 /// * In case of any other error the [Result::Err] is returned.
1052 ///
1053 /// When a timer fires an event the [Result::Ok] is returned with the amount of
1054 /// timer overrun. Normally it is 1.
1055 pub
1056 fn wait_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1057 {
1058 let res =
1059 self
1060 .timer
1061 .read()
1062 .map_err(|e|
1063 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1064 )?;
1065
1066
1067 // ignore wouldblock
1068 if let TimerReadRes::WouldBlock = res
1069 {
1070 return Ok(None);
1071 }
1072
1073 return Ok(self.internal_handle_timer_event()?);
1074 }
1075
1076 /// Handles the single event received from the `poll` and
1077 /// returns the result. The result may
1078 /// contain the `items` which were removed from the queue or copied.
1079 ///
1080 /// # Arguments
1081 ///
1082 /// `pet` - [PollEventType] an event from the timer to handle.
1083 ///
1084 /// A [Result] as alias [TimerResult] is returned with:
1085 ///
1086 /// * [Result::Ok] witout any innder data.
1087 ///
1088 /// * [Result::Err] with error description.
1089 pub
1090 fn handle_timer_event(&mut self, pet: PollEventType) -> TimerResult<Option<INTF::HandleRes>>
1091 {
1092 match pet
1093 {
1094 PollEventType::TimerRes(_, res) =>
1095 {
1096 // ignore wouldblock
1097 if let TimerReadRes::WouldBlock = res
1098 {
1099 return Ok(None);
1100 }
1101
1102 return self.internal_handle_timer_event();
1103 },
1104 PollEventType::SubError(_, err) =>
1105 {
1106 timer_err!(TimerErrorType::TimerError(err.get_errno()), "{}", err)
1107 }
1108 }
1109
1110 }
1111
1112 fn internal_handle_timer_event(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1113 {
1114 let cur_timestamp = AbsoluteTime::now();
1115 let mut timer_ids: INTF::HandleRes = INTF::HandleRes::new();
1116
1117 loop
1118 {
1119 // get from front of the queue
1120 let Some(front_entity) = self.deque_timeout_list.front()
1121 else { break };
1122
1123 let time_until = front_entity.get_timeout_absolute();
1124
1125 if time_until <= cur_timestamp
1126 {
1127 let deq = self.deque_timeout_list.pop_front().unwrap();
1128
1129 deq.handle(self, &mut timer_ids)?;
1130 }
1131 else
1132 {
1133 break;
1134 }
1135 }
1136
1137 // call timer reschedule
1138 self.reschedule_timer()?;
1139
1140 return Ok(timer_ids.into_option());
1141 }
1142
1143 /// Asynchronious polling. The timer's FD is set to nonblocking,
1144 /// so each time it will return `pending` and load CPU. If you are using
1145 /// `tokio` or `smol` or other crate, the corresponding helpers like
1146 /// tokio's `AsyncFd` can be used to wrap the instance. The timer is read-only
1147 /// so use `read-only interest` to avoid errors.
1148 ///
1149 /// If [TimerReadRes::WouldBlock] is received then returns immidiatly.
1150 pub async
1151 fn async_poll_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1152 {
1153 let res =
1154 self
1155 .timer
1156 .get_timer()
1157 .await
1158 .map_err(|e|
1159 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1160 )?;
1161
1162 // ignore wouldblock
1163 if let TimerReadRes::WouldBlock = res
1164 {
1165 return Ok(None);
1166 }
1167
1168 return Ok(self.internal_handle_timer_event()?);
1169 }
1170
1171 /// Returns the queue length.
1172 pub
1173 fn timer_queue_len(&self) -> usize
1174 {
1175 return self.deque_timeout_list.len();
1176 }
1177
1178 /// Postpones the instace `target` by the relative time `rel_time_off`.
1179 ///
1180 /// # Errors
1181 ///
1182 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1183 ///
1184 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1185 ///
1186 /// * [TimerErrorType::TimerError] - with the suberror code.
1187 pub
1188 fn postpone(&mut self, target: &INTF::TimerId, rel_time_off: RelativeTime) -> TimerResult<()>
1189 {
1190 let mut item =
1191 self
1192 .remove_from_queue_int(target)?
1193 .ok_or(map_timer_err!(TimerErrorType::NotFound, "ticket: {} not found", target))?;
1194
1195
1196 item.postpone(rel_time_off)?;
1197
1198 self.queue_item(item)?;
1199
1200 return Ok(());
1201 }
1202
1203 /// Reschedules the instace `target` by assigning new time `time`
1204 /// which is of type `MODE`.
1205 ///
1206 /// # Errors
1207 ///
1208 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1209 ///
1210 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1211 ///
1212 /// * [TimerErrorType::TimerError] - with the suberror code.
1213 ///
1214 /// * [TimerErrorType::Expired] - if provided `time` have passed.
1215 pub
1216 fn reschedule(&mut self, target: &INTF::TimerId, time: MODE) -> TimerResult<()>
1217 {
1218 let mut item =
1219 self.remove_from_queue_int(target)?
1220 .ok_or(map_timer_err!(TimerErrorType::NotFound, "{} not found", target))?;
1221
1222
1223 item.resched(time)?;
1224
1225 self.queue_item(item)?;
1226
1227 return Ok(());
1228 }
1229
1230 /// Starts the `timer` instance by setting timeout or stops the `timer` if the
1231 /// instnce's `queue` is empty.
1232 pub(super)
1233 fn reschedule_timer(&mut self) -> TimerResult<()>
1234 {
1235 if let Some(front_entity) = self.deque_timeout_list.front()
1236 {
1237 let timer_exp =
1238 TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
1239
1240 return
1241 self
1242 .timer
1243 .get_timer()
1244 .set_time(timer_exp)
1245 .map_err(|e|
1246 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1247 );
1248 }
1249 else
1250 {
1251 // queue is empty, force timer to stop
1252
1253 return
1254 self
1255 .timer
1256 .get_timer()
1257 .unset_time()
1258 .map_err(|e|
1259 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1260 );
1261 }
1262 }
1263
1264 /// Unarms timer and clears the queue.
1265 pub
1266 fn clean_up_timer(&mut self) -> TimerResult<()>
1267 {
1268 self
1269 .timer
1270 .get_timer()
1271 .unset_time()
1272 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
1273
1274 self.deque_timeout_list.clear();
1275
1276 return Ok(());
1277 }
1278
1279 /// Unarms timer only.
1280 pub
1281 fn stop_timer(&mut self) -> TimerResult<()>
1282 {
1283 return
1284 self
1285 .timer
1286 .get_timer()
1287 .unset_time()
1288 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
1289 }
1290
1291}