timer_deque_rs/
timer_consumer.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::fmt;
16
17use crate::error::{TimerError, TimerErrorType, TimerResult};
18use crate::timer::{OrderedTimerDeque, OrderedTimerDequeIntrf};
19use crate::{common, timer_err, TimerReadRes};
20use crate::timer_portable::timer::TimerExpMode;
21
22
23/// Defines the type of the queue. This type of the queue consumes the 
24/// instance for which the timer is set. The item must be [Send] and can
25/// be used with, for example, [std::sync::Arc]. This is convinient when it
26/// is required to store the instance in the timer and retrive it when the 
27/// timeout happens. It would allow to avoid looking for the instance in the 
28/// lists.
29/// 
30/// # Generics
31/// 
32/// `R` - an instance which should be stored on the timer dequeue. The instance
33///     must implement [PartialEq], [Eq], [fmt::Debug], [fmt::Display], [Send].
34#[derive(Debug)]
35pub struct TimerDequeueConsumer<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send>
36{
37    /// An internal data
38    target: R,
39
40    /// An absolute timestamp in seconds (UTC time) sets the timer.
41    timeout_absolute: TimerExpMode,
42}
43
44impl<R> fmt::Display 
45for TimerDequeueConsumer<R>
46where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
47{  
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
49    { 
50        write!(f, "{} until: {}", self.target, self.timeout_absolute)
51    }
52}
53
54
55impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> TimerDequeueConsumer<R>
56{ 
57    fn new(target: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
58    {
59        let abs_time_nsec = abs_time_nsec & 999_999_999;
60
61        let cur = common::get_current_timestamp();
62        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
63
64        if TimerExpMode::from(cur) > req
65        {
66            timer_err!(TimerErrorType::Expired, "time already expired");
67        }
68
69        return Ok(
70            Self
71            {
72                target: 
73                    target,
74                timeout_absolute: 
75                    req
76            }
77        );
78    }
79
80    fn get_time_until(&self) -> TimerExpMode
81    {
82        return self.timeout_absolute;
83    }
84
85    fn into_inner(self) -> R
86    {
87        return self.target;
88    }
89}
90
91impl<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send> Eq for TimerDequeueConsumer<R> {}
92
93impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq for TimerDequeueConsumer<R> 
94{
95    fn eq(&self, other: &Self) -> bool 
96    {
97        return self.target == other.target;
98    }
99}
100
101impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq<R> for TimerDequeueConsumer<R> 
102{
103    fn eq(&self, other: &R) -> bool 
104    {
105        return &self.target == other;
106    }
107}
108
109impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> Ord for TimerDequeueConsumer<R>
110{
111    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
112    {
113        return self.timeout_absolute.cmp(&other.timeout_absolute);
114    }
115}
116
117impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialOrd for TimerDequeueConsumer<R>
118{
119    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
120    {
121        return Some(self.cmp(other));
122    }
123}
124
125impl<R> OrderedTimerDequeIntrf for TimerDequeueConsumer<R>
126where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
127{
128    /// Input item.
129    type Target = R;
130
131    /// Nothing is returned.
132    type Ticket = common::NoTicket;
133
134    #[inline]
135    fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
136    {
137        return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
138    }
139
140    #[inline]
141    fn get_timeout_absolute(&self) -> TimerExpMode 
142    {
143        return self.timeout_absolute;
144    }
145}
146
147impl<R> OrderedTimerDeque<TimerDequeueConsumer<R>>
148where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
149{
150    /// Adds the new absolute timeout to the timer dequeue instance.
151    /// 
152    /// The `entity` is an item which should be stored in the dequeue.
153    /// 
154    /// # Arguemnts
155    /// 
156    /// `entity` - `R` generic which is an item to store in the dequeu.
157    /// 
158    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
159    /// 
160    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
161    /// 
162    /// # Returns
163    /// 
164    /// A [Result] as alias [TimerResult] is returned with:
165    /// 
166    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
167    /// 
168    /// * [Result::Err] with error description.
169    pub   
170    fn add_to_timer(&mut self, entity: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
171    {
172        return self.add_to_timer_local(entity, abs_time_sec, abs_time_nsec);
173    }
174
175    /// Removes the instance from the queue by the reference to entity.
176    /// 
177    /// # Arguments
178    /// 
179    /// `entity` - `R` reference to added item which will be used to identify the
180    ///     item ont he record.
181    /// 
182    /// # Returns
183    /// 
184    /// A [Result] as alias [TimerResult] is returned with:
185    /// 
186    /// * [Result::Ok] with the inner type [Option] where
187    ///     * [Option::Some] is returned with the consumed `entity`.
188    ///     * [Option::None] is returned when item was not found.
189    /// 
190    /// * [Result::Err] with error description.
191    pub 
192    fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
193    {
194        if self.deque_timeout_list.len() == 0
195        {
196            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
197        }
198        else
199        {
200            // search for the item in list
201            if self.deque_timeout_list.len() == 1
202            {
203                // just pop the from the front
204                let ret_ent = self.deque_timeout_list.pop_front().unwrap();
205
206                // stop timer
207                self.stop_timer()?;
208
209                return Ok(Some(ret_ent.into_inner()));
210            }
211            else
212            {
213                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
214                // succesfully for temoved instance.
215
216                for (pos, q_item) 
217                in self.deque_timeout_list.iter().enumerate()
218                {
219                    if q_item == entity
220                    {
221                        // remove by the index
222                        let ret_ent = 
223                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
224
225                        // call timer reset if index is 0 (front)
226                        if pos == 0 
227                        {
228                            self.reschedule_timer()?;
229                        }
230
231                        return Ok(Some(ret_ent));
232                    }
233                }
234
235                return Ok(None);
236            }
237        }
238    }
239
240    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
241    /// or [Self::poll] can be used to obtain the event.
242    /// 
243    /// # Arguments
244    /// 
245    /// `res` - [TimerReadRes] an event from the timer to handle.
246    /// 
247    /// `timeout_items` - a vector of an enitities for which timeout have happened.
248    /// 
249    /// A [Result] as alias [TimerResult] is returned with:
250    /// 
251    /// * [Result::Ok] witout any innder data.
252    /// 
253    /// * [Result::Err] with error description.
254    pub 
255    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
256    {
257        // ignore wouldblock
258        if let TimerReadRes::WouldBlock = res
259        {
260            return Ok(());
261        }
262        
263        // check if the time reset is required
264        let force_timer_resched = 
265            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
266
267        let mut timeouts: Vec<R> = Vec::new();
268        let mut error: Option<TimerError> = None;
269
270        let cur_timestamp = common::get_current_timestamp();
271
272        loop 
273        {
274            // get from front of the queue
275            let Some(front_entity) = 
276                self.deque_timeout_list.front() 
277                else 
278                { 
279                    let Err(e) = self.stop_timer()
280                    else
281                    {
282                        break;
283                    };
284
285                    error = Some(e);
286
287                    break;
288                };
289
290            let time_until = front_entity.get_time_until();
291
292            if time_until <= TimerExpMode::from(cur_timestamp)
293            {
294                timeouts.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
295            }
296            else
297            {
298                if timeouts.is_empty() == false || force_timer_resched == true
299                {
300                    // call timer reshedule
301                    self.reschedule_timer()?;
302                }
303
304                break;
305            }
306        }
307
308        *timeout_items = timeouts;
309
310        return error.map_or(Ok(()), |f| Err(f));
311    }
312
313}
314
315#[cfg(test)]
316mod tests
317{
318    use std::{fmt, time::{Duration, Instant}};
319
320    use crate::{common, timer::OrderedTimerDeque, timer_consumer::TimerDequeueConsumer, timer_portable::timer::TimerReadRes};
321
322    #[test]
323    fn test_0()
324    {
325        #[derive(Debug, PartialEq, Eq, Clone)]
326        struct TestItem(u64);
327
328        impl fmt::Display for TestItem
329        {
330            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
331            {
332                write!(f, "0 = {}", self.0)
333            }
334        }
335
336        let mut time_list = 
337            OrderedTimerDeque
338                ::<TimerDequeueConsumer<TestItem>>
339                ::new("test_label".into(), 4, false).unwrap();
340
341        let s = Instant::now();
342
343        let tss_set = common::get_current_timestamp().timestamp()+2;
344        let ent1 = TestItem(1);
345        time_list.add_to_timer(ent1, tss_set, 0).unwrap();
346
347
348        for _ in 0..3
349        {
350            let event = time_list.wait_for_event().unwrap();
351            if let TimerReadRes::WouldBlock = event
352            {
353                std::thread::sleep(Duration::from_millis(1001));
354
355                continue;
356            }
357
358            assert_eq!(event, TimerReadRes::Ok(1));
359
360            let e = s.elapsed();
361            let ts = common::get_current_timestamp();
362
363            assert_eq!(ts.timestamp(), tss_set);
364            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
365
366            let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
367
368            time_list.timeout_event_handler(event, &mut tm_items,).unwrap();
369
370            assert_eq!(tm_items.len(), 1);
371            assert_eq!(tm_items.first().is_some(), true);
372            assert_eq!(tm_items.first().unwrap(), &TestItem(1));
373
374            return;
375        }
376
377        panic!("timeout befor timer!");
378    }
379}