timer_deque_rs/timer.rs
1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 * functionality.
4 *
5 * Copyright (C) 2025 Aleksandr Morozov
6 *
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 * 1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *
12 * 2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15pub use std::os::fd::{AsFd, AsRawFd};
16use std::{borrow::Cow, collections::VecDeque, fmt, os::fd::{BorrowedFd, RawFd}};
17
18use crate::
19{
20 error::{TimerErrorType, TimerResult},
21 map_timer_err,
22 timer_portable::timer::
23 {
24 FdTimerCom, TimerExpMode, TimerFd, TimerFlags, TimerReadRes, TimerSetTimeFlags, TimerType
25 }
26};
27
28/// A trait which is implemented by the structs which defines the behaviour
29/// of the timer queue.
30pub trait OrderedTimerDequeIntrf
31{
32 /// A timer item for the queue which is passed as argument. If noting is
33 /// provided the `NoTarget` can be used.
34 type Target: PartialEq + Eq + fmt::Display + fmt::Debug;
35
36 /// A timer queue identification in the queue which may be retuened.
37 /// If nothing is retuned the `NoTicket` can be returned.
38 type Ticket: PartialEq + Eq + fmt::Display + fmt::Debug;
39
40 /// Wraps the input and initializes the timer time.
41 ///
42 /// # Returns
43 ///
44 /// Should return the `Self` instance and optionaly a `Self::Ticket` type.
45 fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)>
46 where Self: Sized;
47
48 /// Should return the absolute time and the timer mode.
49 fn get_timeout_absolute(&self) -> TimerExpMode;
50}
51
52/// A [VecDeque] based queue which is sorted (in ascending order) by the timeout
53/// which is `absolute` time.
54///
55/// The queue automatically manages the `timer` i.e setting, unsetting.
56///
57/// Also for each type of the deque, a event procesing function is providided.
58///
59/// # Generics
60///
61/// * `DQI` - a deque type. There are three types are available:
62/// * - [crate::TimerDequeueTicketIssuer] which issues a ticket for the instance for which
63/// the timer was set.
64/// * - [crate::TimerDequeueConsumer] which consumes the instance for which the timer is set.
65/// * - [crate::TimerDequeueSignalTicket] which sends a signal to destination.
66#[derive(Debug)]
67pub struct OrderedTimerDeque<DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display>
68{
69 /// A [VecDeque] list which is sorted by time in ascending order -
70 /// lower first, largest last
71 pub(crate) deque_timeout_list: VecDeque<DQI>,
72
73 /// An instance of the FFI (Kernel Supported Timer)
74 pub(crate) timer: TimerFd,
75}
76
77impl<DQI> Drop for OrderedTimerDeque<DQI>
78where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
79{
80 fn drop(&mut self)
81 {
82 let _ = self.clean_up_timer();
83 }
84}
85
86impl<DQI> AsFd for OrderedTimerDeque<DQI>
87where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
88{
89 fn as_fd(&self) -> BorrowedFd<'_>
90 {
91 return self.timer.as_fd();
92 }
93}
94
95impl<DQI> AsRawFd for OrderedTimerDeque<DQI>
96where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
97{
98 fn as_raw_fd(&self) -> RawFd
99 {
100 return self.timer.as_raw_fd();
101 }
102}
103
104impl<DQI> fmt::Display for OrderedTimerDeque<DQI>
105where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
106{
107 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
108 {
109 write!(f, "timer: '{}', fd: '{}', queue_len: '{}'",
110 self.timer, self.timer.as_fd().as_raw_fd(), self.deque_timeout_list.len())
111 }
112}
113
114impl<DQI> OrderedTimerDeque<DQI>
115where DQI: OrderedTimerDequeIntrf + Ord + PartialOrd + PartialEq + Eq + fmt::Debug + fmt::Display
116{
117 /// Creates new deque instance.
118 ///
119 /// # Argument
120 ///
121 /// `timer_label` - a label which helps to identify the timer.
122 ///
123 /// `deq_len` - a minimal, pre-allocated deque length.
124 ///
125 /// `cloexec` - when set to `true` sets the `CLOEXEC` flag to FD.
126 ///
127 /// # Returns
128 ///
129 /// A [Result] as alias [TimerResult] is returned with
130 ///
131 /// * [Result::Ok] with the instance.
132 ///
133 /// * [Result::Err] with the error description.
134 pub
135 fn new(timer_label: Cow<'static, str>, deq_len: usize, cloexec: bool) -> TimerResult<OrderedTimerDeque<DQI>>
136 {
137 let deq_len =
138 if deq_len == 0
139 {
140 10
141 }
142 else
143 {
144 deq_len
145 };
146
147 let tf =
148 if cloexec == true
149 {
150 TimerFlags::TFD_CLOEXEC
151 }
152 else
153 {
154 TimerFlags::empty()
155 };
156
157 // init timer
158 let timer =
159 TimerFd::new(timer_label, TimerType::CLOCK_REALTIME, TimerFlags::TFD_NONBLOCK | tf)
160 .map_err(|e|
161 map_timer_err!(TimerErrorType::TimerError, "{}", e)
162 )?;
163
164 return Ok(
165 Self
166 {
167 deque_timeout_list: VecDeque::with_capacity(deq_len),
168 timer: timer
169 }
170 );
171 }
172
173 /// Reads the FD to retrive the event type.
174 ///
175 /// This function behaves differently when the timer is set with [TimerFlags::TFD_NONBLOCK].
176 ///
177 /// Normally, all timer_* creates nonblocking timer, so this function behaves like when
178 /// the [TimerFlags::TFD_NONBLOCK] is set.
179 ///
180 /// When [TimerFlags::TFD_NONBLOCK] is not set, this function will block reading the FD.
181 /// In case of 'EINTR', the read attempt will be repeated. It will be reapeated in both cases.
182 ///
183 /// When [TimerFlags::TFD_NONBLOCK] is set, the function will return with some result.
184 ///
185 /// # Return
186 ///
187 /// * In case of `EAGAIN`, the [TimerReadRes::WouldBlock] will be returned.
188 ///
189 /// * In case of `ECANCELLD`, the [TimerReadRes::WouldBlock] will be returned.
190 ///
191 /// * In case of any other error the [Result::Err] is returned.
192 ///
193 /// When a timer fires an event the [Result::Ok] is returned with the amount of
194 /// timer overrun. Normally it is 1.
195 pub
196 fn wait_for_event(&self) -> TimerResult<TimerReadRes<u64>>
197 {
198 let res =
199 self
200 .timer
201 .read()
202 .map_err(|e|
203 map_timer_err!(TimerErrorType::TimerError, "{}", e)
204 );
205
206 return res;
207 }
208
209 /// Asynchronious polling. The timer's FD is set to nonblocking,
210 /// so each time it will return `pending` and load CPU. If you are using
211 /// `tokio` or `smol` suing corresponding helpers like tokio's `AsyncFd`.
212 pub async
213 fn poll(&self) -> TimerResult<TimerReadRes<u64>>
214 {
215 return
216 (&self.timer)
217 .await
218 .map_err(|e|
219 map_timer_err!(TimerErrorType::TimerError, "{}", e)
220 );
221 }
222
223 /// Returns the queue length.
224 pub
225 fn timer_queue_len(&self) -> usize
226 {
227 return self.deque_timeout_list.len();
228 }
229
230
231 /// Setting the `timer` instance to the new values or unsets the timer if
232 /// `queue` becomes empty.
233 pub
234 fn reschedule_timer(&mut self) -> TimerResult<()>
235 {
236 if let Some(front_entity) = self.deque_timeout_list.front()
237 {
238 return
239 self
240 .timer
241 .set_time(
242 TimerSetTimeFlags::TFD_TIMER_ABSTIME | TimerSetTimeFlags::TFD_TIMER_CANCEL_ON_SET,
243 front_entity.get_timeout_absolute()
244 )
245 .map_err(|e|
246 map_timer_err!(TimerErrorType::TimerError, "{}", e)
247 );
248 }
249 else
250 {
251 // queue is empty, force timer to stop
252
253 return
254 self
255 .timer
256 .unset_time()
257 .map_err(|e|
258 map_timer_err!(TimerErrorType::TimerError, "{}", e)
259 );
260 }
261 }
262
263 /// Unarms timer and clears the queue.
264 pub
265 fn clean_up_timer(&mut self) -> TimerResult<()>
266 {
267 self
268 .timer
269 .unset_time()
270 .map_err(|e| map_timer_err!(TimerErrorType::TimerError, "{}", e))?;
271
272 self.deque_timeout_list.clear();
273
274 return Ok(());
275 }
276
277 /// Unarms timer only.
278 pub
279 fn stop_timer(&mut self) -> TimerResult<()>
280 {
281 return
282 self
283 .timer
284 .unset_time()
285 .map_err(|e| map_timer_err!(TimerErrorType::TimerError, "{}", e));
286 }
287
288 /// An internal function to recude code duplication. Each deque item type has its own `add` function
289 /// realization.
290 pub(crate)
291 fn add_to_timer_local(&mut self, entity: DQI::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<DQI::Ticket>
292 {
293 let (wrapped, ext) =
294 DQI::wrap(entity, abs_time_sec, abs_time_nsec)?;
295
296 let flags = TimerSetTimeFlags::TFD_TIMER_ABSTIME | TimerSetTimeFlags::TFD_TIMER_CANCEL_ON_SET;
297
298 if self.deque_timeout_list.len() == 0
299 {
300 let timer_stamp = wrapped.get_timeout_absolute();
301
302 // setup timer
303 self
304 .timer
305 .set_time(flags, timer_stamp)
306 .map_err(|e|
307 map_timer_err!(TimerErrorType::TimerError, "{}", e)
308 )?;
309
310 self.deque_timeout_list.push_front(wrapped);
311 }
312 else
313 {
314 //list can not be empty from this point
315 let front_banuntil =
316 self.deque_timeout_list.front().unwrap().get_timeout_absolute();
317
318 let timer_stamp = wrapped.get_timeout_absolute();
319
320 if front_banuntil >= timer_stamp
321 {
322 self
323 .timer
324 .set_time(flags, timer_stamp)
325 .map_err(|e|
326 map_timer_err!(TimerErrorType::TimerError, "{}", e)
327 )?;
328
329 // push to front
330 self.deque_timeout_list.push_front(wrapped);
331 }
332 else
333 {
334 let back_banuntil =
335 self
336 .deque_timeout_list
337 .back()
338 .unwrap()
339 .get_timeout_absolute();
340
341 if back_banuntil <= timer_stamp
342 {
343 // push to the back
344 self.deque_timeout_list.push_back(wrapped);
345 }
346 else
347 {
348 let pos =
349 self
350 .deque_timeout_list
351 .binary_search_by( |se|
352 se.get_timeout_absolute().cmp(&wrapped.get_timeout_absolute())
353 )
354 .map_or_else(|e| e, |r| r);
355
356 self.deque_timeout_list.insert(pos, wrapped);
357 }
358 }
359 }
360
361 return Ok(ext);
362 }
363}