timer_deque_rs/deque_timeout/mod.rs
1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 * 4neko.org alex@4neko.org
7 *
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *
13 * 2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16/// A `consumer` type of the timer which consumes the intance and returns it when
17/// timer triggers. The `consumed` instance normally whould be [Send] because it will
18/// be moved into the timer.
19pub mod timer_consumer;
20
21/// A `ticket` issuer. Issues a ticket which should be assigned to the instance whcih was added
22/// to the timer's queue. The `ticket` can be used to remove the item from queue before the
23/// timeout event. If ticket is dropped i.e connection closed, the ticket will be
24/// in timer's queue until timeout where it will be ignored on timeout event.
25pub mod timer_tickets;
26
27/// A `signal` sender. Calls the specified callback which must never block the executing thread.
28pub mod timer_signal;
29
30pub use std::os::fd::{AsFd, AsRawFd};
31use std::{borrow::Cow, collections::VecDeque, fmt, os::fd::{BorrowedFd, RawFd}};
32
33use crate::
34{
35 error::{TimerErrorType, TimerResult},
36 map_timer_err,
37 timer_portable::
38 {
39 poll::AsTimerFd,
40 timer::
41 {
42 AbsoluteTime,
43 FdTimerCom,
44 RelativeTime,
45 TimerExpMode,
46 TimerFd,
47 TimerFlags,
48 TimerReadRes,
49 TimerType
50 }
51 }
52};
53
54/// A trait which is implemented by the structs which defines the behaviour
55/// of the timer queue.
56pub trait OrderedTimerDequeIntrf: Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
57{
58 /// A timer item for the queue which is passed as argument. If noting is
59 /// provided the `NoTarget` can be used.
60 type Target: PartialEq + Eq + fmt::Display + fmt::Debug;
61
62 /// A timer queue identification in the queue which may be retuened.
63 /// If nothing is retuned the `NoTicket` can be returned.
64 type Ticket: PartialEq + Eq + fmt::Display + fmt::Debug;
65
66
67 /// Should return the absolute time and the timer mode.
68 fn get_timeout_absolute(&self) -> AbsoluteTime;
69}
70
71/// A trait which is implemented by the struct which defines the operation mode of the deque.
72pub trait OrderedTimerDequeMode: fmt::Debug + fmt::Display + Ord + PartialOrd + Eq + PartialEq
73{
74 /// Returns the `absolute` timeout for the instance of the deque.
75 fn get_absolut_timeout(&self) -> AbsoluteTime;
76
77 /// Updates the timeout when the deque works in periodic mode.
78 fn advance_timeout(&mut self)
79 {
80 return;
81 }
82}
83
84/// This queue mode removes all entries from the queue that have timed out.
85///
86/// The further behaviour is defined by the type of the deque.
87#[derive(Debug)]
88pub struct OrderdTimerDequeOnce
89{
90 /// A timeout for the item in the queue.
91 absolute_timeout: AbsoluteTime,
92}
93
94impl fmt::Display for OrderdTimerDequeOnce
95{
96 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
97 {
98 write!(f, "{}", self.absolute_timeout)
99 }
100}
101
102impl Ord for OrderdTimerDequeOnce
103{
104 fn cmp(&self, other: &Self) -> std::cmp::Ordering
105 {
106 return self.absolute_timeout.cmp(&other.absolute_timeout);
107 }
108}
109
110impl PartialOrd for OrderdTimerDequeOnce
111{
112 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
113 {
114 return Some(self.cmp(other));
115 }
116}
117
118impl Eq for OrderdTimerDequeOnce {}
119
120impl PartialEq for OrderdTimerDequeOnce
121{
122 fn eq(&self, other: &Self) -> bool
123 {
124 return self.absolute_timeout == other.absolute_timeout;
125 }
126}
127
128
129impl OrderedTimerDequeMode for OrderdTimerDequeOnce
130{
131 fn get_absolut_timeout(&self) -> AbsoluteTime
132 {
133 return self.absolute_timeout;
134 }
135}
136
137impl OrderdTimerDequeOnce
138{
139 /// Creates new instacne.
140 pub(crate)
141 fn new(absolute_timeout: AbsoluteTime) -> Self
142 {
143 return
144 Self
145 {
146 absolute_timeout:
147 absolute_timeout
148 };
149 }
150}
151
152/// This queue mode does not remove an element that has timed out (by `absolute_timeout`),
153/// but extends (by `relative_period`) the timeout and returns the element back to the queue.
154///
155/// The further behaviour is defined by the type of the deque.
156#[derive(Debug)]
157pub struct OrderdTimerDequePeriodic
158{
159 /// Extends the timer until the next timeout. This is `relative`
160 /// not absolute.
161 relative_period: RelativeTime,
162
163 /// A timeout value.
164 absolute_timeout: AbsoluteTime,
165}
166
167impl fmt::Display for OrderdTimerDequePeriodic
168{
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
170 {
171 write!(f, "{}, rel: {}", self.absolute_timeout, self.relative_period)
172 }
173}
174
175impl Ord for OrderdTimerDequePeriodic
176{
177 fn cmp(&self, other: &Self) -> std::cmp::Ordering
178 {
179 return self.absolute_timeout.cmp(&other.absolute_timeout);
180 }
181}
182
183impl PartialOrd for OrderdTimerDequePeriodic
184{
185 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering>
186 {
187 return Some(self.cmp(other));
188 }
189}
190
191impl Eq for OrderdTimerDequePeriodic {}
192
193impl PartialEq for OrderdTimerDequePeriodic
194{
195 fn eq(&self, other: &Self) -> bool
196 {
197 return self.absolute_timeout == other.absolute_timeout;
198 }
199}
200
201
202impl OrderedTimerDequeMode for OrderdTimerDequePeriodic
203{
204 fn get_absolut_timeout(&self) -> AbsoluteTime
205 {
206 return self.absolute_timeout;
207 }
208
209 fn advance_timeout(&mut self)
210 {
211 self.absolute_timeout += self.relative_period;
212 }
213}
214
215impl OrderdTimerDequePeriodic
216{
217 /// Creates new instance.
218 pub(crate)
219 fn new(rel_time: RelativeTime) -> Self
220 {
221 let mut inst =
222 Self
223 {
224 relative_period:
225 rel_time,
226 absolute_timeout:
227 AbsoluteTime::now(),
228 };
229
230 inst.advance_timeout();
231
232 return inst;
233 }
234}
235
236/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout
237/// which is `absolute` time.
238///
239/// The queue automatically manages the `timer` i.e setting, unsetting.
240///
241/// Also for each type of the deque, a event procesing function is providided.
242///
243/// There are two types of queue:
244///
245/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
246///
247/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
248/// until the item is not removed from the queue manually.
249///
250/// # Generics
251///
252/// * `DQI` - a deque type. There are three types are available:
253/// * - [crate::TimerDequeueTicketIssuer] issues a ticket for the instance for which the timer was set.
254/// ```ignore
255/// let mut time_list =
256/// OrderedTimerDeque
257/// ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
258/// ::new("test_label".into(), 4, false).unwrap();
259/// ```
260/// or
261/// ```ignore
262/// let mut time_list =
263/// OrderedTimerDeque
264/// ::<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
265/// ::new("test_label".into(), 4, false).unwrap();
266/// ```
267/// * - [crate::TimerDequeueConsumer] consumes the instance for which the timer is set.
268/// ```ignore
269/// let mut time_list =
270/// OrderedTimerDeque
271/// ::<TimerDequeueConsumer<TestItem, OrderdTimerDequeOnce>>
272/// ::new("test_label".into(), 4, false).unwrap();
273/// ```
274/// or
275/// ```ignore
276/// let mut time_list =
277/// OrderedTimerDeque
278/// ::<TimerDequeueConsumer<TestItem, OrderdTimerDequePeriodic>>
279/// ::new("test_label".into(), 4, false).unwrap();
280/// ```
281/// * - [crate::TimerDequeueSignalTicket] sends a signal to destination.
282/// ```ignore
283/// let mut time_list =
284/// OrderedTimerDeque
285/// ::<TimerDequeueSignalTicket<TestSigStruct, OrderdTimerDequeOnce>>
286/// ::new("test_label".into(), 4, false).unwrap();
287/// ```
288/// or
289/// ```ignore
290/// let mut time_list =
291/// OrderedTimerDeque
292/// ::<TimerDequeueSignalTicket<TestSigStruct, OrderdTimerDequePeriodic>>
293/// ::new("test_label".into(), 4, false).unwrap();
294/// ```
295#[derive(Debug)]
296pub struct OrderedTimerDeque<DQI: OrderedTimerDequeIntrf>
297{
298 /// A [VecDeque] list which is sorted by time in ascending order -
299 /// lower first, largest last
300 pub(crate) deque_timeout_list: VecDeque<DQI>,
301
302 /// An instance of the FFI (Kernel Supported Timer)
303 pub(crate) timer: TimerFd,
304}
305
306impl<DQI> AsTimerFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
307{
308 #[inline]
309 fn get_bind(&self) -> Option<std::sync::Arc<crate::timer_portable::DefaultEventWatch>>
310 {
311 return self.timer.get_bind();
312 }
313
314 #[inline]
315 fn bind_poll(&self, timer_weak_ref: std::sync::Weak<crate::timer_portable::DefaultEventWatch>)
316 {
317 return self.timer.bind_poll(timer_weak_ref);
318 }
319
320 fn unbind_poll(&self)
321 {
322 return self.timer.unbind_poll();
323 }
324}
325
326impl<DQI> Drop for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
327{
328 fn drop(&mut self)
329 {
330 let _ = self.clean_up_timer();
331 }
332}
333
334impl<DQI> AsFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
335{
336 fn as_fd(&self) -> BorrowedFd<'_>
337 {
338 return self.timer.as_fd();
339 }
340}
341
342impl<DQI> AsRawFd for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
343{
344 fn as_raw_fd(&self) -> RawFd
345 {
346 return self.timer.as_raw_fd();
347 }
348}
349
350impl<DQI> fmt::Display for OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
351{
352 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
353 {
354 write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
355 self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
356 }
357}
358
359impl<DQI> OrderedTimerDeque<DQI> where DQI: OrderedTimerDequeIntrf
360{
361 /// Creates new deque instance.
362 ///
363 /// # Argument
364 ///
365 /// `timer_label` - a label which helps to identify the timer.
366 ///
367 /// `deq_len` - a minimal, pre-allocated deque length.
368 ///
369 /// `cloexec` - when set to `true` sets the `CLOEXEC` flag to FD.
370 ///
371 /// # Returns
372 ///
373 /// A [Result] as alias [TimerResult] is returned with
374 ///
375 /// * [Result::Ok] with the instance.
376 ///
377 /// * [Result::Err] with the error description.
378 pub
379 fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool) -> TimerResult<OrderedTimerDeque<DQI>>
380 {
381 let deq_len =
382 if deq_len == 0
383 {
384 10
385 }
386 else
387 {
388 deq_len
389 };
390
391 let tf =
392 if cloexec == true
393 {
394 TimerFlags::TFD_CLOEXEC
395 }
396 else
397 {
398 TimerFlags::empty()
399 };
400
401 // init timer
402 let timer =
403 TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK | tf)
404 .map_err(|e|
405 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
406 )?;
407
408 return Ok(
409 Self
410 {
411 deque_timeout_list: VecDeque::with_capacity(deq_len),
412 timer: timer
413 }
414 );
415 }
416
417 /// Reads the FD to retrive the event type.
418 ///
419 /// This function behaves differently when the timer is set with [TimerFlags::TFD_NONBLOCK].
420 ///
421 /// Normally, all timer_* creates nonblocking timer, so this function behaves like when
422 /// the [TimerFlags::TFD_NONBLOCK] is set.
423 ///
424 /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
425 /// In case of 'EINTR', the read attempt will be repeated. It will be reapeated in both cases.
426 ///
427 /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result.
428 ///
429 /// # Return
430 ///
431 /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned.
432 ///
433 /// * In case of `ECANCELLD`, the [TimerReadRes::WouldBlock] will be returned.
434 ///
435 /// * In case of any other error the [Result::Err] is returned.
436 ///
437 /// When a timer fires an event the [Result::Ok] is returned with the amount of
438 /// timer overrun. Normally it is 1.
439 pub
440 fn wait_for_event(&self) -> TimerResult<TimerReadRes<u64>>
441 {
442 let res =
443 self
444 .timer
445 .read()
446 .map_err(|e|
447 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
448 );
449
450 return res;
451 }
452
453 /// Asynchronious polling. The timer's FD is set to nonblocking,
454 /// so each time it will return `pending` and load CPU. If you are using
455 /// `tokio` or `smol` suing corresponding helpers like tokio's `AsyncFd`.
456 pub async
457 fn poll(&self) -> TimerResult<TimerReadRes<u64>>
458 {
459 return
460 (&self.timer)
461 .await
462 .map_err(|e|
463 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
464 );
465 }
466
467 /// Returns the queue length.
468 pub
469 fn timer_queue_len(&self) -> usize
470 {
471 return self.deque_timeout_list.len();
472 }
473
474
475 /// Setting the `timer` instance to the new values or unsets the timer if
476 /// `queue` becomes empty.
477 pub
478 fn reschedule_timer(&mut self) -> TimerResult<()>
479 {
480 if let Some(front_entity) = self.deque_timeout_list.front()
481 {
482 let timer_exp =
483 TimerExpMode::<AbsoluteTime>::new_oneshot(front_entity.get_timeout_absolute());
484
485 return
486 self
487 .timer
488 .set_time(timer_exp)
489 .map_err(|e|
490 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
491 );
492 }
493 else
494 {
495 // queue is empty, force timer to stop
496
497 return
498 self
499 .timer
500 .unset_time()
501 .map_err(|e|
502 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
503 );
504 }
505 }
506
507 /// Unarms timer and clears the queue.
508 pub
509 fn clean_up_timer(&mut self) -> TimerResult<()>
510 {
511 self
512 .timer
513 .unset_time()
514 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e))?;
515
516 self.deque_timeout_list.clear();
517
518 return Ok(());
519 }
520
521 /// Unarms timer only.
522 pub
523 fn stop_timer(&mut self) -> TimerResult<()>
524 {
525 return
526 self
527 .timer
528 .unset_time()
529 .map_err(|e| map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e));
530 }
531
532 /// An internal function to recude code duplication. Each deque item type has its own `add` function
533 /// realization.
534 pub(crate)
535 fn add_to_timer_local(&mut self, inst: DQI) -> TimerResult<()>
536 {
537 if self.deque_timeout_list.len() == 0
538 {
539 let timer_stamp =
540 TimerExpMode::<AbsoluteTime>::new_oneshot(inst.get_timeout_absolute());
541
542 // setup timer
543 self
544 .timer
545 .set_time(timer_stamp)
546 .map_err(|e|
547 map_timer_err!(TimerErrorType::TimerError(e.get_errno()), "{}", e)
548 )?;
549
550 self.deque_timeout_list.push_front(inst);
551 }
552 else
553 {
554 // list can not be empty from this point
555 let front_timeout =
556 self.deque_timeout_list.front().unwrap().get_timeout_absolute();
557
558 // intances timeout
559 let inst_timeout = inst.get_timeout_absolute();
560
561 if front_timeout >= inst_timeout
562 {
563 // push to front
564 self.deque_timeout_list.push_front(inst);
565
566 self.reschedule_timer()?;
567 }
568 else
569 {
570 let back_banuntil =
571 self
572 .deque_timeout_list
573 .back()
574 .unwrap()
575 .get_timeout_absolute();
576
577 if back_banuntil <= inst_timeout
578 {
579 // push to the back
580 self.deque_timeout_list.push_back(inst);
581 }
582 else
583 {
584 let pos =
585 self
586 .deque_timeout_list
587 .binary_search_by( |se|
588 se.get_timeout_absolute().cmp(&inst.get_timeout_absolute())
589 )
590 .map_or_else(|e| e, |r| r);
591
592 self.deque_timeout_list.insert(pos, inst);
593 }
594 }
595 }
596
597 return Ok(());
598 }
599}