timer_deque_rs/deque_timeout/
timer_consumer.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16use std::fmt;
17
18use crate::deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode};
19use crate::error::{TimerErrorType, TimerResult};
20use crate::{common, timer_err, TimerReadRes};
21use crate::timer_portable::timer::{AbsoluteTime, RelativeTime};
22
23use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
24
25/// Defines the type of the queue. This type of the queue consumes the 
26/// instance for which the timer is set. The item must be [Send] and can
27/// be used with, for example, [std::sync::Arc]. This is convinient when it
28/// is required to store the instance in the timer and retrive it when the 
29/// timeout happens. It would allow to avoid looking for the instance in the 
30/// lists.
31/// 
32/// # Generics
33/// 
34/// `R` - an instance which should be stored on the timer dequeue. The instance
35///     must implement [PartialEq], [Eq], [fmt::Debug], [fmt::Display], [Send].
36/// 
37/// `MODE` - a [OrderedTimerDequeMode] which defines the deque behaviour. There are
38///     two types of the operation:
39/// 
40/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
41/// 
42/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
43///     until the item is not removed from the queue manually.
44/// 
45/// # Example
46/// 
47/// ```ignore
48/// let mut time_list = 
49///         OrderedTimerDeque
50///             ::<TimerDequeueConsumer<TestItem, OrderdTimerDequeOnce>>
51///             ::new("test_label".into(), 4, false).unwrap();
52/// ```
53/// or
54/// ```ignore
55/// let mut time_list = 
56///         OrderedTimerDeque
57///             ::<TimerDequeueConsumer<TestItem, OrderdTimerDequePeriodic>>
58///             ::new("test_label".into(), 4, false).unwrap();
59/// ```
60#[derive(Debug)]
61pub struct TimerDequeueConsumer<R, MODE>
62where 
63    R: PartialEq + Eq + fmt::Debug + fmt::Display + Send,
64    MODE: OrderedTimerDequeMode
65{
66    /// An internal data
67    target: R,
68
69    /// An absolute timestamp in seconds (UTC time) sets the timer.
70    timeout_mode: MODE,
71}
72
73impl<R, MODE> fmt::Display 
74for TimerDequeueConsumer<R, MODE>
75where 
76    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
77    MODE: OrderedTimerDequeMode
78{  
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
80    { 
81        write!(f, "{} until: {}", self.target, self.timeout_mode)
82    }
83}
84
85impl<R, MODE> TimerDequeueConsumer<R, MODE>
86where 
87    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
88    MODE: OrderedTimerDequeMode
89{ 
90    fn into_inner(self) -> R
91    {
92        return self.target;
93    }
94}
95
96
97impl<R> TimerDequeueConsumer<R, OrderdTimerDequeOnce>
98where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
99{ 
100    fn new(target: R, abs_time: AbsoluteTime) -> TimerResult<Self>
101    {
102        let cur = AbsoluteTime::now();
103
104        if cur > abs_time
105        {
106            timer_err!(TimerErrorType::Expired, 
107                "time already expired now: {}, req: {}", cur, abs_time);
108        }
109
110        return Ok(
111            Self
112            {
113                target: 
114                    target,
115                timeout_mode: 
116                    OrderdTimerDequeOnce::new(abs_time),
117            }
118        );
119    }
120}
121
122impl<R> TimerDequeueConsumer<R, OrderdTimerDequePeriodic>
123where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send + Clone
124{ 
125    fn new(target: R, rel_time: RelativeTime) -> TimerResult<Self>
126    {
127        return Ok(
128            Self
129            {
130                target: 
131                    target,
132                timeout_mode: 
133                    OrderdTimerDequePeriodic::new(rel_time),
134            }
135        );
136    }
137
138    fn clone_inner(&self) -> R
139    {
140        return self.target.clone();
141    }
142}
143
144impl<R, MODE> Eq for TimerDequeueConsumer<R, MODE>
145where 
146    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
147    MODE: OrderedTimerDequeMode
148{
149
150}
151
152impl<R, MODE> PartialEq for TimerDequeueConsumer<R, MODE> 
153where 
154    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
155    MODE: OrderedTimerDequeMode
156{
157    fn eq(&self, other: &Self) -> bool 
158    {
159        return self.target == other.target;
160    }
161}
162
163impl<R, MODE> PartialEq<R> for TimerDequeueConsumer<R, MODE>
164where 
165    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
166    MODE: OrderedTimerDequeMode 
167{
168    fn eq(&self, other: &R) -> bool 
169    {
170        return &self.target == other;
171    }
172}
173
174impl<R, MODE> Ord for TimerDequeueConsumer<R, MODE>
175where 
176    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
177    MODE: OrderedTimerDequeMode 
178{
179    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
180    {
181        return self.timeout_mode.cmp(&other.timeout_mode);
182    }
183}
184
185impl<R, MODE> PartialOrd for TimerDequeueConsumer<R, MODE>
186where 
187    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
188    MODE: OrderedTimerDequeMode 
189{
190    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
191    {
192        return Some(self.cmp(other));
193    }
194}
195
196impl<R, MODE> OrderedTimerDequeIntrf for TimerDequeueConsumer<R, MODE>
197where 
198    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send,
199    MODE: OrderedTimerDequeMode
200{
201    /// Input item.
202    type Target = R;
203
204    /// Nothing is returned.
205    type Ticket = common::NoTicket;
206
207    #[inline]
208    fn get_timeout_absolute(&self) -> AbsoluteTime 
209    {
210        return self.timeout_mode.get_absolut_timeout();
211    }
212}
213
214impl<R> OrderedTimerDeque<TimerDequeueConsumer<R, OrderdTimerDequeOnce>>
215where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send
216{
217    /// Adds the new absolute timeout to the timer dequeue instance.
218    /// 
219    /// The `entity` is an item which should be stored in the dequeue.
220    /// 
221    /// # Arguemnts
222    /// 
223    /// `entity` - `R` generic which is an item to store in the dequeu.
224    /// 
225    /// `abs_time` - a [AbsoluteTime] absolute time in future when the timeout
226    ///     should happen.
227    /// 
228    /// # Returns
229    /// 
230    /// A [Result] as alias [TimerResult] is returned with:
231    /// 
232    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
233    /// 
234    /// * [Result::Err] with error description.
235    pub   
236    fn add_to_timer(&mut self, entity: R, abs_time: AbsoluteTime) -> TimerResult<common::NoTicket>
237    {
238        let inst = 
239            TimerDequeueConsumer::<R, OrderdTimerDequeOnce>::new(entity, abs_time)?;
240
241        return self.add_to_timer_local(inst).map(|_| common::NoTicket);
242    }
243
244    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
245    /// or [Self::poll] can be used to obtain the event.
246    /// 
247    /// # Arguments
248    /// 
249    /// `res` - [TimerReadRes] an event from the timer to handle.
250    /// 
251    /// `timeout_items` - a vector of an enitities for which timeout have happened.
252    /// 
253    /// A [Result] as alias [TimerResult] is returned with:
254    /// 
255    /// * [Result::Ok] witout any innder data.
256    /// 
257    /// * [Result::Err] with error description.
258    pub 
259    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
260    {
261        // ignore wouldblock
262        if let TimerReadRes::WouldBlock = res
263        {
264            return Ok(());
265        }
266
267        let cur_timestamp = AbsoluteTime::now();
268
269        loop 
270        {
271            // get from front of the queue
272            let Some(front_entity) = 
273                self.deque_timeout_list.front() 
274                else 
275                { 
276                    break;
277                };
278
279            let time_until = front_entity.get_timeout_absolute();
280
281            if time_until <= cur_timestamp
282            {
283                timeout_items.push(self.deque_timeout_list.pop_front().unwrap().into_inner());
284            }
285            else
286            {
287                break;
288            }
289        }
290
291        // call timer reschedule
292        self.reschedule_timer()?;
293
294        return Ok(());
295    }
296
297}
298
299impl<R> OrderedTimerDeque<TimerDequeueConsumer<R, OrderdTimerDequePeriodic>>
300where R: PartialEq + Eq + fmt::Display + fmt::Debug + Send + Clone
301{
302    /// Adds the new entry to the timout queue which after alarm
303    /// extends the timeout and sets the instance in the queue again.
304    /// 
305    /// The `entity` is an item which should be stored in the dequeue and
306    /// impl [Clone].
307    /// 
308    /// # Arguemnts
309    /// 
310    /// `entity` - `R` generic which is an item to store in the dequeu.
311    /// 
312    /// `rel_time` - a [RelativeTime] which will be used to extend the further 
313    ///     timeouts. 
314    /// 
315    /// # Returns
316    /// 
317    /// A [Result] as alias [TimerResult] is returned with:
318    /// 
319    /// * [Result::Ok] with the [common::NoTicket] ticket which is dummy value.
320    /// 
321    /// * [Result::Err] with error description.
322    pub   
323    fn add_to_timer(&mut self, entity: R, rel_time: RelativeTime) -> TimerResult<common::NoTicket>
324    {
325        let inst = 
326            TimerDequeueConsumer::<R, OrderdTimerDequePeriodic>::new(entity, rel_time)?;
327
328        return self.add_to_timer_local(inst).map(|_| common::NoTicket);
329    }
330
331    
332
333    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
334    /// or [Self::poll] can be used to obtain the event.
335    /// 
336    /// # Arguments
337    /// 
338    /// `res` - [TimerReadRes] an event from the timer to handle.
339    /// 
340    /// `timeout_items` - a vector of an enitities for which timeout have happened.
341    /// 
342    /// A [Result] as alias [TimerResult] is returned with:
343    /// 
344    /// * [Result::Ok] witout any innder data.
345    /// 
346    /// * [Result::Err] with error description.
347    pub 
348    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<R>) -> TimerResult<()>
349    {
350        // ignore wouldblock
351        if let TimerReadRes::WouldBlock = res
352        {
353            return Ok(());
354        }
355
356        let cur_timestamp = AbsoluteTime::now();
357
358        loop 
359        {
360            // get from front of the queue
361            let Some(front_entity) = 
362                self.deque_timeout_list.front() 
363                else 
364                { 
365                    break;
366                };
367
368            let time_until = front_entity.get_timeout_absolute();
369
370            if time_until <= cur_timestamp
371            {
372                let mut deq = self.deque_timeout_list.pop_front().unwrap();
373                
374                timeout_items.push(deq.clone_inner());
375
376                // advance the timeout
377                deq.timeout_mode.advance_timeout();
378                
379                // return back to deque with new timeout
380                self.add_to_timer_local(deq)?;
381            }
382            else
383            {
384                break;
385            }
386        }
387
388        // call timer reschedule
389        self.reschedule_timer()?;
390
391        return Ok(());
392    }
393
394}
395
396impl<R, MODE> OrderedTimerDeque<TimerDequeueConsumer<R, MODE>>
397where 
398    R: PartialEq + Eq + fmt::Display + fmt::Debug + Send, 
399    MODE: OrderedTimerDequeMode 
400{
401    /// Removes the instance from the queue by the reference to entity.
402    /// 
403    /// # Arguments
404    /// 
405    /// `entity` - `R` reference to added item which will be used to identify the
406    ///     item ont he record.
407    /// 
408    /// # Returns
409    /// 
410    /// A [Result] as alias [TimerResult] is returned with:
411    /// 
412    /// * [Result::Ok] with the inner type [Option] where
413    ///     * [Option::Some] is returned with the consumed `entity`.
414    ///     * [Option::None] is returned when item was not found.
415    /// 
416    /// * [Result::Err] with error description.
417    pub 
418    fn remove_from_sched_queue(&mut self, entity: &R) -> TimerResult<Option<R>>
419    {
420        if self.deque_timeout_list.len() == 0
421        {
422            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
423        }
424        else
425        {
426            // search for the item in list
427            if self.deque_timeout_list.len() == 1
428            {
429                // just pop the from the front
430                let ret_ent = self.deque_timeout_list.pop_front().unwrap();
431
432                // stop timer
433                self.stop_timer()?;
434
435                return Ok(Some(ret_ent.into_inner()));
436            }
437            else
438            {
439                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
440                // succesfully for temoved instance.
441
442                for (pos, q_item) 
443                in self.deque_timeout_list.iter().enumerate()
444                {
445                    if q_item == entity
446                    {
447                        // remove by the index
448                        let ret_ent = 
449                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
450
451                        // call timer reset if index is 0 (front)
452                        if pos == 0 
453                        {
454                            self.reschedule_timer()?;
455                        }
456
457                        return Ok(Some(ret_ent));
458                    }
459                }
460
461                return Ok(None);
462            }
463        }
464    }
465}
466
467#[cfg(test)]
468mod tests
469{
470    use std::{cmp::Ordering, fmt, time::{Duration, Instant}};
471
472    use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueConsumer};
473
474    #[test]
475    fn test_0()
476    {
477        #[derive(Debug, PartialEq, Eq, Clone)]
478        struct TestItem(u64);
479
480        impl fmt::Display for TestItem
481        {
482            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
483            {
484                write!(f, "0 = {}", self.0)
485            }
486        }
487
488        let mut time_list = 
489            OrderedTimerDeque
490                ::<TimerDequeueConsumer<TestItem, OrderdTimerDequeOnce>>
491                ::new("test_label".into(), 4, false).unwrap();
492
493        let s = Instant::now();
494
495        let ts = common::get_current_timestamp();
496
497        let tss_set = 
498            AbsoluteTime
499                ::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64)
500                    .unwrap();
501        
502        let ent1 = TestItem(1);
503        time_list.add_to_timer(ent1, tss_set).unwrap();
504
505
506        for _ in 0..3
507        {
508            let event = time_list.wait_for_event().unwrap();
509            if let TimerReadRes::WouldBlock = event
510            {
511                std::thread::sleep(Duration::from_millis(1001));
512
513                continue;
514            }
515
516            assert_eq!(event, TimerReadRes::Ok(1));
517
518            let e = s.elapsed();
519            let ts = AbsoluteTime::now();
520
521            assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
522
523            println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
524
525            let mut tm_items: Vec<TestItem> = Vec::with_capacity(1);
526
527            time_list.timeout_event_handler(event, &mut tm_items).unwrap();
528
529            assert_eq!(tm_items.len(), 1);
530            assert_eq!(tm_items.first().is_some(), true);
531            assert_eq!(tm_items.first().unwrap(), &TestItem(1));
532
533            return;
534        }
535
536        panic!("timeout befor timer!");
537    }
538}