timer_deque_rs/
timer_consumer.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::fmt;
16
17use crate::error::{TimerError, TimerErrorType, TimerResult};
18use crate::timer::{OrderedTimerDeque, OrderedTimerDequeIntrf};
19use crate::{common, timer_err, TimerReadRes};
20use crate::timer_portable::timer::TimerExpMode;
21
22
23/// Defines the type of the queue. This type of the queue consumes the 
24/// instance for which the timer is set. The item must be [Send] and can
25/// be used with, for example, [std::sync::Arc]. This is convinient when it
26/// is required to store the instance in the timer and retrive it when the 
27/// timeout happens. It would allow to avoid looking for the instance in the 
28/// lists.
29/// 
30/// # Generics
31/// 
32/// `R` - an instance which should be stored on the timer dequeue. The instance
33///     must implement [PartialEq], [Eq], [fmt::Debug], [fmt::Display], [Send].
34/// 
35/// # Example
36/// 
37/// ```ignore
38/// let mut time_list = 
39///         OrderedTimerDeque
40///             ::<TimerDequeueConsumer<TestItem>>
41///             ::new("test_label".into(), 4, false).unwrap();
42/// ```
43#[derive(Debug)]
44pub struct TimerDequeueConsumer<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send>
45{
46    /// An internal data
47    target: R,
48
49    /// An absolute timestamp in seconds (UTC time) sets the timer.
50    timeout_absolute: TimerExpMode,
51}
52
53impl<R> fmt::Display 
54for TimerDequeueConsumer<R>
55where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
56{  
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
58    { 
59        write!(f, "{} until: {}", self.target, self.timeout_absolute)
60    }
61}
62
63
64impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> TimerDequeueConsumer<R>
65{ 
66    fn new(target: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
67    {
68        let abs_time_nsec = abs_time_nsec & 999_999_999;
69
70        let cur = common::get_current_timestamp();
71        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
72
73        if TimerExpMode::from(cur) > req
74        {
75            timer_err!(TimerErrorType::Expired, "time already expired");
76        }
77
78        return Ok(
79            Self
80            {
81                target: 
82                    target,
83                timeout_absolute: 
84                    req
85            }
86        );
87    }
88
89    fn get_time_until(&self) -> TimerExpMode
90    {
91        return self.timeout_absolute;
92    }
93
94    fn into_inner(self) -> R
95    {
96        return self.target;
97    }
98}
99
100impl<R: PartialEq + Eq + fmt::Debug + fmt::Display + Send> Eq for TimerDequeueConsumer<R> {}
101
102impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq for TimerDequeueConsumer<R> 
103{
104    fn eq(&self, other: &Self) -> bool 
105    {
106        return self.target == other.target;
107    }
108}
109
110impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialEq<R> for TimerDequeueConsumer<R> 
111{
112    fn eq(&self, other: &R) -> bool 
113    {
114        return &self.target == other;
115    }
116}
117
118impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> Ord for TimerDequeueConsumer<R>
119{
120    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
121    {
122        return self.timeout_absolute.cmp(&other.timeout_absolute);
123    }
124}
125
126impl<R: PartialEq + Eq + fmt::Display + fmt::Debug + Send> PartialOrd for TimerDequeueConsumer<R>
127{
128    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
129    {
130        return Some(self.cmp(other));
131    }
132}
133
134impl<R> OrderedTimerDequeIntrf for TimerDequeueConsumer<R>
135where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
136{
137    /// Input item.
138    type Target = R;
139
140    /// Nothing is returned.
141    type Ticket = common::NoTicket;
142
143    #[inline]
144    fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
145    {
146        return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
147    }
148
149    #[inline]
150    fn get_timeout_absolute(&self) -> TimerExpMode 
151    {
152        return self.timeout_absolute;
153    }
154}
155
156impl<R> OrderedTimerDeque<TimerDequeueConsumer<R>>
157where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
158{
159    /// Adds the new absolute timeout to the timer dequeue instance.
160    /// 
161    /// The `entity` is an item which should be stored in the dequeue.
162    /// 
163    /// # Arguemnts
164    /// 
165    /// `entity` - `R` generic which is an item to store in the dequeu.
166    /// 
167    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
168    /// 
169    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
170    /// 
171    /// # Returns
172    /// 
173    /// A [Result] as alias [TimerResult] is returned with:
174    /// 
175    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
176    /// 
177    /// * [Result::Err] with error description.
178    pub   
179    fn add_to_timer(&mut self, entity: R, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
180    {
181        return self.add_to_timer_local(entity, abs_time_sec, abs_time_nsec);
182    }
183
184    /// Removes the instance from the queue by the reference to entity.
185    /// 
186    /// # Arguments
187    /// 
188    /// `entity` - `R` reference to added item which will be used to identify the
189    ///     item ont he record.
190    /// 
191    /// # Returns
192    /// 
193    /// A [Result] as alias [TimerResult] is returned with:
194    /// 
195    /// * [Result::Ok] with the inner type [Option] where
196    ///     * [Option::Some] is returned with the consumed `entity`.
197    ///     * [Option::None] is returned when item was not found.
198    /// 
199    /// * [Result::Err] with error description.
200    pub 
201    fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
202    {
203        if self.deque_timeout_list.len() == 0
204        {
205            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
206        }
207        else
208        {
209            // search for the item in list
210            if self.deque_timeout_list.len() == 1
211            {
212                // just pop the from the front
213                let ret_ent = self.deque_timeout_list.pop_front().unwrap();
214
215                // stop timer
216                self.stop_timer()?;
217
218                return Ok(Some(ret_ent.into_inner()));
219            }
220            else
221            {
222                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
223                // succesfully for temoved instance.
224
225                for (pos, q_item) 
226                in self.deque_timeout_list.iter().enumerate()
227                {
228                    if q_item == entity
229                    {
230                        // remove by the index
231                        let ret_ent = 
232                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
233
234                        // call timer reset if index is 0 (front)
235                        if pos == 0 
236                        {
237                            self.reschedule_timer()?;
238                        }
239
240                        return Ok(Some(ret_ent));
241                    }
242                }
243
244                return Ok(None);
245            }
246        }
247    }
248
249    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
250    /// or [Self::poll] can be used to obtain the event.
251    /// 
252    /// # Arguments
253    /// 
254    /// `res` - [TimerReadRes] an event from the timer to handle.
255    /// 
256    /// `timeout_items` - a vector of an enitities for which timeout have happened.
257    /// 
258    /// A [Result] as alias [TimerResult] is returned with:
259    /// 
260    /// * [Result::Ok] witout any innder data.
261    /// 
262    /// * [Result::Err] with error description.
263    pub 
264    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
265    {
266        // ignore wouldblock
267        if let TimerReadRes::WouldBlock = res
268        {
269            return Ok(());
270        }
271        
272        // check if the time reset is required
273        let force_timer_resched = 
274            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
275
276        let mut timeouts: Vec<R> = Vec::new();
277        let mut error: Option<TimerError> = None;
278
279        let cur_timestamp = common::get_current_timestamp();
280
281        loop 
282        {
283            // get from front of the queue
284            let Some(front_entity) = 
285                self.deque_timeout_list.front() 
286                else 
287                { 
288                    let Err(e) = self.stop_timer()
289                    else
290                    {
291                        break;
292                    };
293
294                    error = Some(e);
295
296                    break;
297                };
298
299            let time_until = front_entity.get_time_until();
300
301            if time_until <= TimerExpMode::from(cur_timestamp)
302            {
303                timeouts.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
304            }
305            else
306            {
307                if timeouts.is_empty() == false || force_timer_resched == true
308                {
309                    // call timer reshedule
310                    self.reschedule_timer()?;
311                }
312
313                break;
314            }
315        }
316
317        *timeout_items = timeouts;
318
319        return error.map_or(Ok(()), |f| Err(f));
320    }
321
322}
323
324#[cfg(test)]
325mod tests
326{
327    use std::{fmt, time::{Duration, Instant}};
328
329    use crate::{common, timer::OrderedTimerDeque, timer_consumer::TimerDequeueConsumer, timer_portable::timer::TimerReadRes};
330
331    #[test]
332    fn test_0()
333    {
334        #[derive(Debug, PartialEq, Eq, Clone)]
335        struct TestItem(u64);
336
337        impl fmt::Display for TestItem
338        {
339            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
340            {
341                write!(f, "0 = {}", self.0)
342            }
343        }
344
345        let mut time_list = 
346            OrderedTimerDeque
347                ::<TimerDequeueConsumer<TestItem>>
348                ::new("test_label".into(), 4, false).unwrap();
349
350        let s = Instant::now();
351
352        let tss_set = common::get_current_timestamp().timestamp()+2;
353        let ent1 = TestItem(1);
354        time_list.add_to_timer(ent1, tss_set, 0).unwrap();
355
356
357        for _ in 0..3
358        {
359            let event = time_list.wait_for_event().unwrap();
360            if let TimerReadRes::WouldBlock = event
361            {
362                std::thread::sleep(Duration::from_millis(1001));
363
364                continue;
365            }
366
367            assert_eq!(event, TimerReadRes::Ok(1));
368
369            let e = s.elapsed();
370            let ts = common::get_current_timestamp();
371
372            assert_eq!(ts.timestamp(), tss_set);
373            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
374
375            let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
376
377            time_list.timeout_event_handler(event, &mut tm_items,).unwrap();
378
379            assert_eq!(tm_items.len(), 1);
380            assert_eq!(tm_items.first().is_some(), true);
381            assert_eq!(tm_items.first().unwrap(), &TestItem(1));
382
383            return;
384        }
385
386        panic!("timeout befor timer!");
387    }
388}