timer_deque_rs/deque_timeout/mod.rs
1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 * 4neko.org alex@4neko.org
7 *
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *
13 * 2. The MIT License (MIT)
14 *
15 * 3. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
16 */
17
18/// A `consumer` type of the timer which consumes the intance and returns it when
19/// timer triggers. The `consumed` instance normally whould be [Send] because it will
20/// be moved into the timer which may be processed in other thread.
21pub mod timer_consumer;
22
23/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
24/// to the timer's queue. The `ticket` can be used to remove the item from queue before the
25/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
26/// in timer's queue until timeout where it will be ignored on timeout event.
27pub mod timer_tickets;
28
29#[cfg(test)]
30mod tests;
31
32#[cfg(target_family = "unix")]
33pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd};
34
35use std::
36{
37 borrow::Cow,
38 collections::VecDeque,
39 fmt,
40 marker::PhantomData,
41};
42
43
44use crate::
45{
46 TimerDequeConsumer,
47 TimerDequeTicket,
48 TimerDequeTicketIssuer,
49 error::{TimerErrorType, TimerResult},
50 map_timer_err,
51 timer_err,
52 timer_portable::
53 {
54 AsTimerId, FdTimerMarker, PollEventType, TimerFd, TimerFlags, TimerId, TimerType, UnixFd, portable_error::TimerPortResult, timer::
55 {
56 AbsoluteTime, FdTimerCom, FdTimerRead, RelativeTime, TimerExpMode,
57 TimerReadRes
58 }
59 }
60};
61
62
63/// A trait which is implemented by the structs which define the operation mode of the deque.
64///
65/// At the moment for: [DequeOnce] and [DequePeriodic].
66pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
67{
68 /// A type of operation. If the item deques once, then this value should be
69 /// `true`. Otherwise, it is periodic.
70 const IS_ONCE: bool;
71
72 /// Checks the current instance against the provided `cmp` [AbsoluteTime].
73 /// It is expected that the current instance's timeout value is actual and
74 /// haven't outlive the `cmp` already. if it does the error:
75 /// [TimerErrorType::Expired] should be returned. Or any other error
76 /// if it is required.
77 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>;
78
79 /// Returns the `absolute` timeout for the instance of the deque.
80 fn get_absolut_timeout(&self) -> AbsoluteTime;
81
82 /// Postpones the time by the relative offset `post_time`. May return error
83 /// (if required) if the absolut timeout value overflows.
84 ///
85 /// May return [TimerErrorType::TicketInstanceGone] if ticket was invalidated.
86 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>;
87
88 /// Updates the timeout when the deque works in periodic mode. By fedault
89 /// it does nothing.
90 fn advance_timeout(&mut self)
91 {
92 return;
93 }
94}
95
96/// This queue mode removes all entries from the queue that have timed out.
97///
98/// The further behaviour is defined by the type of the deque.
99#[derive(Debug, Clone, Copy)]
100pub struct DequeOnce
101{
102 /// A timeout for the item in the queue.
103 absolute_timeout: AbsoluteTime,
104}
105
106impl fmt::Display for DequeOnce
107{
108 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
109 {
110 write!(f, "{}", self.absolute_timeout)
111 }
112}
113
114impl Ord for DequeOnce
115{
116 fn cmp(&self, other: &Self) -> std::cmp::Ordering
117 {
118 return self.absolute_timeout.cmp(&other.absolute_timeout);
119 }
120}
121
122impl PartialOrd for DequeOnce
123{
124 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
125 {
126 return Some(self.cmp(other));
127 }
128}
129
130impl Eq for DequeOnce {}
131
132impl PartialEq for DequeOnce
133{
134 fn eq(&self, other: &Self) -> bool
135 {
136 return self.absolute_timeout == other.absolute_timeout;
137 }
138}
139
140
141impl OrderedTimerDequeMode for DequeOnce
142{
143 const IS_ONCE: bool = true;
144
145 fn get_absolut_timeout(&self) -> AbsoluteTime
146 {
147 return self.absolute_timeout;
148 }
149
150 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
151 {
152 if cmp > self.absolute_timeout
153 {
154 timer_err!(TimerErrorType::Expired,
155 "deque once time already expired, now: {}, req: {}", cmp, self);
156 }
157
158 return Ok(());
159 }
160
161 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
162 {
163 self.absolute_timeout = self.absolute_timeout + posp_time;
164
165 return Ok(());
166 }
167}
168
169
170impl DequeOnce
171{
172 /// Creates new instacne.
173 #[inline]
174 pub
175 fn new(absolute_timeout: impl Into<AbsoluteTime>) -> Self
176 {
177 return Self{ absolute_timeout: absolute_timeout.into() };
178 }
179}
180
181/// This queue mode does not remove an element that has timed out (by `absolute_timeout`),
182/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
183#[derive(Debug, Clone, Copy)]
184pub struct DequePeriodic
185{
186 /// Extends the timer until the next timeout. This is `relative`
187 /// not absolute.
188 relative_period: RelativeTime,
189
190 /// A timeout value.
191 absolute_timeout: AbsoluteTime,
192}
193
194impl fmt::Display for DequePeriodic
195{
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
197 {
198 write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
199 }
200}
201
202impl Ord for DequePeriodic
203{
204 fn cmp(&self, other: &Self) -> std::cmp::Ordering
205 {
206 return self.absolute_timeout.cmp(&other.absolute_timeout);
207 }
208}
209
210impl PartialOrd for DequePeriodic
211{
212 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
213 {
214 return Some(self.cmp(other));
215 }
216}
217
218impl Eq for DequePeriodic {}
219
220impl PartialEq for DequePeriodic
221{
222 fn eq(&self, other: &Self) -> bool
223 {
224 return self.absolute_timeout == other.absolute_timeout;
225 }
226}
227
228
229impl OrderedTimerDequeMode for DequePeriodic
230{
231 const IS_ONCE: bool = false;
232
233 fn get_absolut_timeout(&self) -> AbsoluteTime
234 {
235 return self.absolute_timeout;
236 }
237
238 fn advance_timeout(&mut self)
239 {
240 self.absolute_timeout += self.relative_period;
241 }
242
243 fn validate_time(&self, cmp: AbsoluteTime) -> TimerResult<()>
244 {
245 if cmp > self.absolute_timeout
246 {
247 timer_err!(TimerErrorType::Expired,
248 "deque periodic absolute time already expired, now: {}, req: {}", cmp, self.absolute_timeout);
249 }
250 else if self.relative_period.is_zero() == false
251 {
252 timer_err!(TimerErrorType::ZeroRelativeTime,
253 "deque periodic relative time is 0, rel_time: {}", self.relative_period);
254 }
255
256 return Ok(());
257 }
258
259 fn postpone(&mut self, posp_time: RelativeTime) -> TimerResult<()>
260 {
261 self.absolute_timeout = self.absolute_timeout - self.relative_period + posp_time;
262
263 self.relative_period = posp_time;
264
265 return Ok(());
266 }
267}
268
269impl DequePeriodic
270{
271 /// Creates new instance. The `relative_time` is not added to
272 /// `absolute_time` until timeout. The values are unchecked at this
273 /// point and will be checked during placement in queue.
274 pub
275 fn new_from_now(rel_time: impl Into<RelativeTime>) -> Self
276 {
277 let inst =
278 Self
279 {
280 relative_period:
281 rel_time.into(),
282 absolute_timeout:
283 AbsoluteTime::now(),
284 };
285
286 return inst;
287 }
288
289 /// Creates new instance with the initial timeout `abs_time` and
290 /// period `rel_time` which is added to the `abs_time` each time
291 /// the timeout occures. The values are unchecked at this
292 /// point and will be checked during placement in queue.
293 pub
294 fn new(abs_time: impl Into<AbsoluteTime>, rel_time: impl Into<RelativeTime>) -> Self
295 {
296 let inst =
297 Self
298 {
299 relative_period:
300 rel_time.into(),
301 absolute_timeout:
302 abs_time.into(),
303 };
304
305 return inst;
306 }
307}
308
309/// A trait for the generalization of the collection type which is used to
310/// collect the timeout values i.e ticket IDs.
311pub trait TimerTimeoutCollection<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug>
312{
313 /// Initializes the instance.
314 fn new() -> Self;
315
316 /// Store the `ITEM` in the collection.
317 fn push(&mut self, item: ITEM);
318
319 /// Wraps the instance into [Option]. If collection is `empty` a
320 /// [Option::None] should be returned.
321 fn into_option(self) -> Option<Self> where Self: Sized;
322}
323
324/// An implementation for the Vec.
325impl<ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
326for Vec<ITEM>
327{
328 fn new() -> Self
329 {
330 return Vec::new();
331 }
332
333 fn push(&mut self, item: ITEM)
334 {
335 self.push(item);
336 }
337
338 fn into_option(self) -> Option<Self>
339 {
340 if self.is_empty() == false
341 {
342 return Some(self);
343 }
344 else
345 {
346 return None;
347 }
348 }
349}
350
351/// A dummy implementation when nothing is returned. Unused at the moment.
352impl <ITEM: PartialEq + Eq + fmt::Display + fmt::Debug> TimerTimeoutCollection<ITEM>
353for ()
354{
355 fn new() -> Self
356 {
357 return ();
358 }
359
360 fn push(&mut self, _item: ITEM)
361 {
362 return;
363 }
364
365 fn into_option(self) -> Option<Self>
366 {
367 return None;
368 }
369}
370
371/// A standart interface for each deque type. Every deque type must implement this
372/// trait.
373pub trait OrderedTimerDequeHandle<MODE: OrderedTimerDequeMode>
374 : Ord + PartialOrd + PartialEq + PartialEq<Self::TimerId> + Eq + fmt::Debug +
375 fmt::Display + OrderedTimerDequeInterf<MODE>
376{
377 /// A timer ID. Normally this is a uniq identificator of the item in the queue.
378 /// i.e `TimerDequeId`.
379 type TimerId: PartialEq + Eq + fmt::Display + fmt::Debug;
380
381 /// A collection which is used to collect the items which are removed from
382 /// the queue due to timeout.
383 type HandleRes: TimerTimeoutCollection<Self::TimerId>;
384
385 /// Postpones the timeout of the current instance. The `postp_time` is a [RelativeTime]
386 /// i.e inroduces the offset.
387 fn postpone(&mut self, postp_time: RelativeTime) -> TimerResult<()>;
388
389 /// Reschedules the current instance. This is different from the `postpone` as the
390 /// instance is assagned with a new time. The `MODE` cannot be changed, only time.
391 fn resched(&mut self, time: MODE) -> TimerResult<()>;
392
393 /// A spefic code which is called during timeout routine handling. Normally is should
394 /// store the result into the `collection` and reschedule the item if it is repeated.
395 fn handle(self, timer_self: &mut OrderTimerDeque<MODE, Self>,
396 timer_ids: &mut Self::HandleRes) -> TimerResult<()>
397 where Self: Sized;
398
399 /// Matches the current instance with provided `other` instances
400 /// [OrderedTimerDequeHandle::TimerId]. But, the `PartialEq<Self::TimerId>` is
401 /// implemented, so the `==` can be used to compare.
402 fn is_same(&self, other: &Self::TimerId) -> bool;
403
404 /// Attempts to acquire the [OrderedTimerDequeHandle::TimerId] by consuming the instance.
405 fn into_timer_id(self) -> Option<Self::TimerId>;
406}
407
408/// A trait which is used by the base [OrderTimerDeque].
409/// This is an interface which provides access to the timeout value of the instance.
410pub trait OrderedTimerDequeInterf<MODE: OrderedTimerDequeMode>
411{
412 /// Returns the absolute time and the timer mode.
413 fn get_timeout_absolute(&self) -> AbsoluteTime;
414}
415
416
417/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout
418/// which is `absolute` time.
419///
420/// The queue automatically manages the `timer` i.e setting, unsetting.
421///
422/// Also for each type of the deque, a event procesing function is providided.
423///
424/// There are two types of queue:
425///
426/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
427///
428/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
429/// until the item is not removed from the queue manually.
430///
431/// And there are 3 types of queue models:
432///
433/// * [`crate::TimerDequeConsumer`] - consumes the item which is stored in the
434/// timeout queue.
435///
436/// * [`crate::TimerDequeTicketIssuer`] - issues a ticket which is referenced
437/// to the item in the queue.
438///
439/// * [`crate::TimerDequeSignalTicket`] - send signal on timeout using `MPSC`
440///
441/// # Implementations
442///
443/// If `feature = enable_mio_compat` is enabled a [mio::event::Source] is
444/// implemented on the current struct.
445///
446/// ## Multithread
447///
448/// Not MT-safe. The external mutex should be used to protect the instance.
449/// The internal `timer` [TimerFd] is MT-safe.
450///
451/// ## Poll
452///
453/// - A built-in [crate::TimerPoll] can be used.
454///
455/// - An external crate MIO [crate::TimerFdMioCompat] if `feature = enable_mio_compat` is
456/// enabled.
457///
458/// - User implemented poll. The FD can be aquired via [AsRawFd] [AsFd].
459///
460/// ## Async
461///
462/// A [Future] is implemented.
463/// # Generics
464///
465/// * `DQI` - a deque type. There are three types are available:
466/// * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
467/// ```ignore
468/// let mut time_list =
469/// OrderedTimerDeque
470/// ::<TimerDequeTicketIssuer<OrderdTimerDequeOnce>>
471/// ::new("test_label".into(), 4, false).unwrap();
472/// ```
473/// or
474/// ```ignore
475/// let mut time_list =
476/// OrderedTimerDeque
477/// ::<TimerDequeTicketIssuer<OrderdTimerDequePeriodic>>
478/// ::new("test_label".into(), 4, false).unwrap();
479/// ```
480/// * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
481/// ```ignore
482/// let mut time_list =
483/// OrderedTimerDeque
484/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequeOnce>>
485/// ::new("test_label".into(), 4, false).unwrap();
486/// ```
487/// or
488/// ```ignore
489/// let mut time_list =
490/// OrderedTimerDeque
491/// ::<TimerDequeConsumer<TestItem, OrderdTimerDequePeriodic>>
492/// ::new("test_label".into(), 4, false).unwrap();
493/// ```
494/// * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
495/// ```ignore
496/// let mut time_list =
497/// OrderedTimerDeque
498/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
499/// ::new("test_label".into(), 4, false).unwrap();
500/// ```
501/// or
502/// ```ignore
503/// let mut time_list =
504/// OrderedTimerDeque
505/// ::<TimerDequeSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
506/// ::new("test_label".into(), 4, false).unwrap();
507/// ```
508#[derive(Debug, Eq)]
509pub struct OrderTimerDeque<MODE: OrderedTimerDequeMode, INTF: OrderedTimerDequeHandle<MODE>>
510{
511 /// A [VecDeque] list which is sorted by time in ascending order -
512 /// lower first, largest last
513 pub(crate) deque_timeout_list: VecDeque<INTF>,
514
515 /// An instance of the FFI (Kernel Supported Timer)
516 pub(crate) timer: TimerFd,
517
518 p: PhantomData<MODE>
519}
520
521#[cfg(all(target_family = "unix", feature = "enable_mio_compat"))]
522pub mod mio_compat
523{
524 use std::{io, os::fd::{AsRawFd, RawFd}};
525
526 use mio::{Token, unix::SourceFd};
527
528 use crate::{TimerFdMioCompat, deque_timeout::{OrderTimerDeque, OrderedTimerDequeHandle, OrderedTimerDequeMode}};
529
530 impl<MODE, INTF> mio::event::Source for OrderTimerDeque<MODE, INTF>
531 where
532 MODE: OrderedTimerDequeMode,
533 INTF: OrderedTimerDequeHandle<MODE>
534 {
535 fn register(
536 &mut self,
537 registry: &mio::Registry,
538 token: mio::Token,
539 interests: mio::Interest,
540 ) -> io::Result<()>
541 {
542 return
543 SourceFd(&self.as_raw_fd()).register(registry, token, interests);
544 }
545
546 fn reregister(
547 &mut self,
548 registry: &mio::Registry,
549 token: mio::Token,
550 interests: mio::Interest,
551 ) -> io::Result<()>
552 {
553 return
554 SourceFd(&self.as_raw_fd()).reregister(registry, token, interests);
555 }
556
557 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()>
558 {
559 return
560 SourceFd(&self.as_raw_fd()).deregister(registry)
561 }
562 }
563
564 impl<MODE, INTF> PartialEq<Token> for OrderTimerDeque<MODE, INTF>
565 where
566 MODE: OrderedTimerDequeMode,
567 INTF: OrderedTimerDequeHandle<MODE>
568 {
569 fn eq(&self, other: &Token) -> bool
570 {
571 return self.as_raw_fd() == other.0 as RawFd;
572 }
573 }
574
575 impl<MODE, INTF> TimerFdMioCompat for OrderTimerDeque<MODE, INTF>
576 where
577 MODE: OrderedTimerDequeMode,
578 INTF: OrderedTimerDequeHandle<MODE>
579 {
580 fn get_token(&self) -> Token
581 {
582 return Token(self.as_raw_fd() as usize);
583 }
584 }
585}
586
587impl<MODE, INTF> UnixFd for OrderTimerDeque<MODE, INTF>
588where
589 MODE: OrderedTimerDequeMode,
590 INTF: OrderedTimerDequeHandle<MODE>
591{
592
593}
594
595impl<MODE, INTF> AsTimerId for OrderTimerDeque<MODE, INTF>
596where
597 MODE: OrderedTimerDequeMode,
598 INTF: OrderedTimerDequeHandle<MODE>
599{
600 fn as_timer_id(&self) -> TimerId
601 {
602 return self.timer.as_timer_id();
603 }
604}
605
606impl<MODE, INTF> FdTimerRead for OrderTimerDeque<MODE, INTF>
607where
608 MODE: OrderedTimerDequeMode,
609 INTF: OrderedTimerDequeHandle<MODE>
610{
611 fn read(&self) -> TimerPortResult<TimerReadRes<u64>>
612 {
613 return self.timer.read();
614 }
615}
616
617impl<MODE, INTF> PartialEq<str> for OrderTimerDeque<MODE, INTF>
618where
619 MODE: OrderedTimerDequeMode,
620 INTF: OrderedTimerDequeHandle<MODE>
621{
622 fn eq(&self, other: &str) -> bool
623 {
624 return self.timer.as_ref() == other;
625 }
626}
627
628
629impl<MODE, INTF> FdTimerMarker for OrderTimerDeque<MODE, INTF>
630where
631 MODE: OrderedTimerDequeMode,
632 INTF: OrderedTimerDequeHandle<MODE>
633{
634 fn clone_timer(&self) -> TimerFd
635 {
636 return self.timer.clone_timer();
637 }
638
639 fn get_strong_count(&self) -> usize
640 {
641 return self.timer.get_strong_count();
642 }
643}
644
645impl<MODE, INTF> Drop for OrderTimerDeque<MODE, INTF>
646where
647 MODE: OrderedTimerDequeMode,
648 INTF: OrderedTimerDequeHandle<MODE>
649{
650 fn drop(&mut self)
651 {
652 let _ = self.clean_up_timer();
653 }
654}
655
656impl<MODE, INTF> AsRef<str> for OrderTimerDeque<MODE, INTF>
657where
658 MODE: OrderedTimerDequeMode,
659 INTF: OrderedTimerDequeHandle<MODE>
660{
661 fn as_ref(&self) -> &str
662 {
663 return self.timer.as_ref();
664 }
665}
666
667#[cfg(target_family = "unix")]
668pub mod deque_timeout_os
669{
670 use super::*;
671
672 impl<MODE, INTF> AsFd for OrderTimerDeque<MODE, INTF>
673 where
674 MODE: OrderedTimerDequeMode,
675 INTF: OrderedTimerDequeHandle<MODE>
676 {
677 fn as_fd(&self) -> BorrowedFd<'_>
678 {
679 return self.timer.as_fd();
680 }
681 }
682
683 impl<MODE, INTF> AsRawFd for OrderTimerDeque<MODE, INTF>
684 where
685 MODE: OrderedTimerDequeMode,
686 INTF: OrderedTimerDequeHandle<MODE>
687 {
688 fn as_raw_fd(&self) -> RawFd
689 {
690 return self.timer.as_raw_fd();
691 }
692 }
693
694 impl<MODE, INTF> PartialEq<RawFd> for OrderTimerDeque<MODE, INTF>
695 where
696 MODE: OrderedTimerDequeMode,
697 INTF: OrderedTimerDequeHandle<MODE>
698 {
699 fn eq(&self, other: &RawFd) -> bool
700 {
701 return self.timer == *other;
702 }
703 }
704}
705
706#[cfg(target_family = "windows")]
707pub mod deque_timeout_os
708{
709 use std::os::windows::io::{AsHandle, AsRawHandle, BorrowedHandle, RawHandle};
710
711 use super::*;
712
713 impl<MODE, INTF> AsHandle for OrderTimerDeque<MODE, INTF>
714 where
715 MODE: OrderedTimerDequeMode,
716 INTF: OrderedTimerDequeHandle<MODE>
717 {
718 fn as_handle(&self) -> BorrowedHandle<'_>
719 {
720 return self.timer.as_handle();
721 }
722 }
723
724 impl<MODE, INTF> AsRawHandle for OrderTimerDeque<MODE, INTF>
725 where
726 MODE: OrderedTimerDequeMode,
727 INTF: OrderedTimerDequeHandle<MODE>
728 {
729 fn as_raw_handle(&self) -> RawHandle
730 {
731 return self.timer.as_raw_handle();
732 }
733 }
734
735 impl<MODE, INTF> PartialEq<RawHandle> for OrderTimerDeque<MODE, INTF>
736 where
737 MODE: OrderedTimerDequeMode,
738 INTF: OrderedTimerDequeHandle<MODE>
739 {
740 fn eq(&self, other: &RawHandle) -> bool
741 {
742 return self.timer == *other;
743 }
744}
745}
746
747impl<MODE, INTF> fmt::Display for OrderTimerDeque<MODE, INTF>
748where
749 MODE: OrderedTimerDequeMode,
750 INTF: OrderedTimerDequeHandle<MODE>
751{
752 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
753 {
754 write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
755 self.timer, self.timer.as_timer_id(), self.deque_timeout_list.len())
756 }
757}
758
759impl<MODE, INTF> PartialEq for OrderTimerDeque<MODE, INTF>
760where
761 MODE: OrderedTimerDequeMode,
762 INTF: OrderedTimerDequeHandle<MODE>
763{
764 fn eq(&self, other: &Self) -> bool
765 {
766 return self.timer == other.timer;
767 }
768}
769
770
771
772impl<MODE, R> OrderTimerDeque<MODE, TimerDequeConsumer<R, MODE>>
773where
774 MODE: OrderedTimerDequeMode,
775 R: PartialEq + Eq + fmt::Debug + fmt::Display + Send + Clone
776{
777 /// Adds the new absolute timeout to the timer deque instance.
778 ///
779 /// The `entity` is an item which should be stored in the deque
780 /// amd on timeout - returned.
781 ///
782 /// # Generics
783 ///
784 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
785 /// There are two options: [DequeOnce] and [DequePeriodic].
786 ///
787 /// # Arguemnts
788 ///
789 /// * `item` - `R` which is an item to store in the deque.
790 ///
791 /// * `mode` - `MODE` a timeout value which also defines the
792 /// behaviour of the deque logic.
793 ///
794 /// # Returns
795 ///
796 /// A [Result] as alias [TimerResult] is returned with:
797 ///
798 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
799 ///
800 /// * [Result::Err] with error description.
801 ///
802 /// Possible errors:
803 ///
804 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
805 ///
806 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
807 /// should never be zero.
808 ///
809 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
810 /// `errno`.
811 pub
812 fn add(&mut self, item: R, mode: MODE) -> TimerResult<()>
813 {
814 let inst =
815 TimerDequeConsumer::<R, MODE>::new(item, mode)?;
816
817 //return self.queue_item(inst)
818 let res = self.queue_item(inst);
819 return res;
820 }
821}
822
823
824impl<MODE> OrderTimerDeque<MODE, TimerDequeTicketIssuer<MODE>>
825where
826 MODE: OrderedTimerDequeMode
827{
828 /// Adds the new absolute timeout to the timer deque instance.
829 ///
830 /// This deque assigns the `ticket` for each timeout which is
831 /// used for identification and invalidation.
832 ///
833 /// # Generics
834 ///
835 /// * `MODE` - is any type which implements the [OrderedTimerDequeMode].
836 /// There are two options: [DequeOnce] and [DequePeriodic].
837 ///
838 /// # Arguemnts
839 ///
840 /// * `mode` - `MODE` a timeout value which also defines the
841 /// behaviour of the deque logic.
842 ///
843 /// # Returns
844 ///
845 /// A [Result] as alias [TimerResult] is returned with:
846 ///
847 /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
848 ///
849 /// * [Result::Err] with error description.
850 ///
851 /// Possible errors:
852 ///
853 /// * [TimerErrorType::Expired] - `mode` contains time which have alrealy expired.
854 ///
855 /// * [TimerErrorType::ZeroRelativeTime] - `mode` [DequePeriodic] relative time
856 /// should never be zero.
857 ///
858 /// * [TimerErrorType::TimerError] - a error with the OS timer. Contains subcode
859 /// `errno`.
860 pub
861 fn add(&mut self, mode: MODE) -> TimerResult<TimerDequeTicket>
862 {
863 let (inst, ticket) =
864 TimerDequeTicketIssuer::<MODE>::new(mode)?;
865
866 self.queue_item(inst)?;
867
868 return Ok(ticket);
869 }
870}
871
872impl<MODE, INTF> OrderTimerDeque<MODE, INTF>
873where
874 MODE: OrderedTimerDequeMode,
875 INTF: OrderedTimerDequeHandle<MODE>
876{
877 /// Creates new deque instance with the provided parameters.
878 ///
879 /// # Argument
880 ///
881 /// * `timer_label` - a label which helps to identify the timer.
882 ///
883 /// * `deq_len` - a minimal, pre-allocated deque length.
884 ///
885 /// * `cloexec` - when set to `true` sets the `CLOEXEC` flag on FD.
886 ///
887 /// * `non_blocking` - when set to `true` sets the `TFD_NONBLOCK` flag on FD.
888 ///
889 /// # Returns
890 ///
891 /// A [Result] as alias [TimerResult] is returned with
892 ///
893 /// * [Result::Ok] with the instance.
894 ///
895 /// * [Result::Err] with the error description.
896 ///
897 /// The following errors may be returned:
898 ///
899 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
900 pub
901 fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool, non_blocking: bool) -> TimerResult<OrderTimerDeque<MODE, INTF>>
902 {
903 let deq_len =
904 if deq_len == 0
905 {
906 10
907 }
908 else
909 {
910 deq_len
911 };
912
913 let mut tf = TimerFlags::empty();
914
915 tf.set(TimerFlags::TFD_CLOEXEC, cloexec);
916 tf.set(TimerFlags::TFD_NONBLOCK, non_blocking);
917
918 // init timer
919 let timer =
920 TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, tf)
921 .map_err(|e|
922 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
923 )?;
924
925 return Ok(
926 Self
927 {
928 deque_timeout_list: VecDeque::with_capacity(deq_len),
929 timer: timer,
930 p: PhantomData,
931 }
932 );
933 }
934
935 /// internal function which is called from the `add()` functions.
936 pub(super)
937 fn queue_item(&mut self, inst: INTF) -> TimerResult<()>
938 {
939 if self.deque_timeout_list.len() == 0
940 {
941 let timer_stamp =
942 TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
943
944 // setup timer
945 self
946 .timer
947 .get_timer()
948 .set_time(timer_stamp)
949 .map_err(|e|
950 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
951 )?;
952
953 self.deque_timeout_list.push_front(inst);
954 }
955 else
956 {
957 // list can not be empty from this point
958 let front_timeout =
959 self.deque_timeout_list.front().unwrap().get_timeout_absolute();
960
961 // intances timeout
962 let inst_timeout = inst.get_timeout_absolute();
963
964 if front_timeout >= inst_timeout
965 {
966 // push to front
967 self.deque_timeout_list.push_front(inst);
968
969 self.reschedule_timer()?;
970 }
971 else
972 {
973 let back_banuntil =
974 self
975 .deque_timeout_list
976 .back()
977 .unwrap()
978 .get_timeout_absolute();
979
980 if back_banuntil <= inst_timeout
981 {
982 // push to the back
983 self.deque_timeout_list.push_back(inst);
984 }
985 else
986 {
987 let pos =
988 self
989 .deque_timeout_list
990 .binary_search_by( |se|
991 se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
992 )
993 .map_or_else(|e| e, |r| r);
994
995 self.deque_timeout_list.insert(pos, inst);
996 }
997 }
998 }
999
1000 return Ok(());
1001 }
1002
1003 /// Removes the instance from the queue by the identification depending on the
1004 /// deque type.
1005 ///
1006 /// # Arguments
1007 ///
1008 /// `item` - identification of the item which should be removed from the queue.
1009 ///
1010 /// # Returns
1011 ///
1012 /// A [Result] as alias [TimerResult] is returned with:
1013 ///
1014 /// * [Result::Ok] with the inner type [Option] where
1015 /// * [Option::Some] is returned with the consumed `item`.
1016 /// * [Option::None] is returned when item was not found.
1017 ///
1018 /// * [Result::Err] with error description.
1019 ///
1020 /// The following errors may be returned:
1021 ///
1022 /// * [TimerErrorType::TimerError] - error in the OS timer. A `errno` subcode is provided.
1023 pub
1024 fn remove_from_queue(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF::TimerId>>
1025 {
1026 return
1027 self
1028 .remove_from_queue_int(item)
1029 .map(|opt_intf|
1030 {
1031 let Some(intf) = opt_intf
1032 else { return None };
1033
1034 return intf.into_timer_id();
1035 }
1036 );
1037
1038 }
1039
1040 pub(super)
1041 fn remove_from_queue_int(&mut self, item: &INTF::TimerId) -> TimerResult<Option<INTF>>
1042 {
1043 if self.deque_timeout_list.len() == 0
1044 {
1045 timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
1046 }
1047 else
1048 {
1049 // search for the item in list
1050 if self.deque_timeout_list.len() == 1
1051 {
1052 // just pop the from the front
1053 let ret_ent = self.deque_timeout_list.pop_front().unwrap();
1054
1055 if &ret_ent != item
1056 {
1057 self.deque_timeout_list.push_front(ret_ent);
1058
1059 return Ok(None);
1060 }
1061
1062 // stop timer
1063 self.stop_timer()?;
1064
1065 return Ok(Some(ret_ent));
1066 }
1067 else
1068 {
1069 // in theory the `ticket` is a reference to ARC, so the weak should be upgraded
1070 // succesfully for temoved instance.
1071
1072 for (pos, q_item)
1073 in self.deque_timeout_list.iter().enumerate()
1074 {
1075 if q_item == item
1076 {
1077 // remove by the index
1078 let ret_ent =
1079 self.deque_timeout_list.remove(pos).unwrap();
1080
1081 // call timer reset if index is 0 (front)
1082 if pos == 0
1083 {
1084 self.reschedule_timer()?;
1085 }
1086
1087 return Ok(Some(ret_ent));
1088 }
1089 }
1090
1091 return Ok(None);
1092 }
1093 }
1094 }
1095
1096 /// Reads the timer's FD to retrive the event type and then calling
1097 /// the event handler. Then the result is returned. The result may
1098 /// contain the `items` which were removed from the queue or copied.
1099 ///
1100 /// This function behaves differently when the timer is initialized as
1101 /// a `non-blocking` i.e with [TimerFlags::TFD_NONBLOCK].
1102 ///
1103 /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
1104 /// In case of 'EINTR', the read attempt will be repeated.
1105 ///
1106 /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result
1107 /// immidiatly.
1108 ///
1109 /// # Return
1110 ///
1111 /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned.
1112 ///
1113 /// * In case of `ECANCELLD`, the [TimerReadRes::Cancelled] will be returned.
1114 ///
1115 /// * In case of any other error the [Result::Err] is returned.
1116 ///
1117 /// When a timer fires an event the [Result::Ok] is returned with the amount of
1118 /// timer overrun. Normally it is 1.
1119 pub
1120 fn wait_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1121 {
1122 let res =
1123 self
1124 .timer
1125 .read()
1126 .map_err(|e|
1127 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1128 )?;
1129
1130
1131 // ignore wouldblock
1132 if let TimerReadRes::WouldBlock = res
1133 {
1134 return Ok(None);
1135 }
1136
1137 return Ok(self.internal_handle_timer_event()?);
1138 }
1139
1140 /// Handles the single event received from the `poll` and
1141 /// returns the result. The result may
1142 /// contain the `items` which were removed from the queue or copied.
1143 ///
1144 /// # Arguments
1145 ///
1146 /// `pet` - [PollEventType] an event from the timer to handle.
1147 ///
1148 /// A [Result] as alias [TimerResult] is returned with:
1149 ///
1150 /// * [Result::Ok] witout any innder data.
1151 ///
1152 /// * [Result::Err] with error description.
1153 pub
1154 fn handle_timer_event(&mut self, pet: PollEventType) -> TimerResult<Option<INTF::HandleRes>>
1155 {
1156 match pet
1157 {
1158 PollEventType::TimerRes(_, res) =>
1159 {
1160 // ignore wouldblock
1161 if let TimerReadRes::WouldBlock = res
1162 {
1163 return Ok(None);
1164 }
1165
1166 return self.internal_handle_timer_event();
1167 },
1168 PollEventType::SubError(_, err) =>
1169 {
1170 timer_err!(TimerErrorType::TimerError(err.get_errno()), "{}", err)
1171 }
1172 }
1173
1174 }
1175
1176 fn internal_handle_timer_event(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1177 {
1178 let cur_timestamp = AbsoluteTime::now();
1179 let mut timer_ids: INTF::HandleRes = INTF::HandleRes::new();
1180
1181 loop
1182 {
1183 // get from front of the queue
1184 let Some(front_entity) = self.deque_timeout_list.front()
1185 else { break };
1186
1187 let time_until = front_entity.get_timeout_absolute();
1188
1189 if time_until <= cur_timestamp
1190 {
1191 let deq = self.deque_timeout_list.pop_front().unwrap();
1192
1193 deq.handle(self, &mut timer_ids)?;
1194 }
1195 else
1196 {
1197 break;
1198 }
1199 }
1200
1201 // call timer reschedule
1202 self.reschedule_timer()?;
1203
1204 return Ok(timer_ids.into_option());
1205 }
1206
1207 /// Asynchronious polling. The timer's FD is set to nonblocking,
1208 /// so each time it will return `pending` and load CPU. If you are using
1209 /// `tokio` or `smol` or other crate, the corresponding helpers like
1210 /// tokio's `AsyncFd` can be used to wrap the instance. The timer is read-only
1211 /// so use `read-only interest` to avoid errors.
1212 ///
1213 /// If [TimerReadRes::WouldBlock] is received then returns immidiatly.
1214 pub async
1215 fn async_poll_for_event_and_process(&mut self) -> TimerResult<Option<INTF::HandleRes>>
1216 {
1217 let res =
1218 self
1219 .timer
1220 .get_timer()
1221 .await
1222 .map_err(|e|
1223 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1224 )?;
1225
1226 // ignore wouldblock
1227 if let TimerReadRes::WouldBlock = res
1228 {
1229 return Ok(None);
1230 }
1231
1232 return Ok(self.internal_handle_timer_event()?);
1233 }
1234
1235 /// Returns the queue length.
1236 pub
1237 fn timer_queue_len(&self) -> usize
1238 {
1239 return self.deque_timeout_list.len();
1240 }
1241
1242 /// Postpones the instace `target` by the relative time `rel_time_off`.
1243 ///
1244 /// # Errors
1245 ///
1246 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1247 ///
1248 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1249 ///
1250 /// * [TimerErrorType::TimerError] - with the suberror code.
1251 pub
1252 fn postpone(&mut self, target: &INTF::TimerId, rel_time_off: RelativeTime) -> TimerResult<()>
1253 {
1254 let mut item =
1255 self
1256 .remove_from_queue_int(target)?
1257 .ok_or(map_timer_err!(TimerErrorType::NotFound, "ticket: {} not found", target))?;
1258
1259
1260 item.postpone(rel_time_off)?;
1261
1262 self.queue_item(item)?;
1263
1264 return Ok(());
1265 }
1266
1267 /// Reschedules the instace `target` by assigning new time `time`
1268 /// which is of type `MODE`.
1269 ///
1270 /// # Errors
1271 ///
1272 /// * [TimerErrorType::NotFound] - if instance `target` was not found.
1273 ///
1274 /// * [TimerErrorType::TicketInstanceGone] - if ticket was invalidated.
1275 ///
1276 /// * [TimerErrorType::TimerError] - with the suberror code.
1277 ///
1278 /// * [TimerErrorType::Expired] - if provided `time` have passed.
1279 pub
1280 fn reschedule(&mut self, target: &INTF::TimerId, time: MODE) -> TimerResult<()>
1281 {
1282 let mut item =
1283 self.remove_from_queue_int(target)?
1284 .ok_or(map_timer_err!(TimerErrorType::NotFound, "{} not found", target))?;
1285
1286
1287 item.resched(time)?;
1288
1289 self.queue_item(item)?;
1290
1291 return Ok(());
1292 }
1293
1294 /// Starts the `timer` instance by setting timeout or stops the `timer` if the
1295 /// instnce's `queue` is empty.
1296 pub(super)
1297 fn reschedule_timer(&mut self) -> TimerResult<()>
1298 {
1299 if let Some(front_entity) = self.deque_timeout_list.front()
1300 {
1301 let timer_exp =
1302 TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
1303
1304 return
1305 self
1306 .timer
1307 .get_timer()
1308 .set_time(timer_exp)
1309 .map_err(|e|
1310 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1311 );
1312 }
1313 else
1314 {
1315 // queue is empty, force timer to stop
1316
1317 return
1318 self
1319 .timer
1320 .get_timer()
1321 .unset_time()
1322 .map_err(|e|
1323 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
1324 );
1325 }
1326 }
1327
1328 /// Unarms timer and clears the queue.
1329 pub
1330 fn clean_up_timer(&mut self) -> TimerResult<()>
1331 {
1332 self
1333 .timer
1334 .get_timer()
1335 .unset_time()
1336 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
1337
1338 self.deque_timeout_list.clear();
1339
1340 return Ok(());
1341 }
1342
1343 /// Unarms timer only.
1344 pub
1345 fn stop_timer(&mut self) -> TimerResult<()>
1346 {
1347 return
1348 self
1349 .timer
1350 .get_timer()
1351 .unset_time()
1352 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
1353 }
1354
1355}