timer_deque_rs/deque_timeout/
timer_tickets.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::{fmt, sync::{Arc, Weak}};
16
17use crate::
18{
19    common::{self, TimerDequeueId}, deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode}, error::{TimerError, TimerErrorType, TimerResult}, timer_err, timer_portable::timer::{AbsoluteTime, RelativeTime, TimerExpMode}, TimerReadRes
20};
21
22use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
23
24
25impl PartialEq<TimerDequeueTicket> for TimerDequeueId
26{
27    fn eq(&self, other: &TimerDequeueTicket) -> bool 
28    {
29        return self == other.status.as_ref();
30    }
31}
32
33/// A ticket which is issued to the route which added the instance
34/// to the queue. If this instance is dropped, the [Weak] reference to [Arc]
35/// will no longer be valid and the timer would ignore the instance and remove
36/// it from the queue independeltly from the deque operation mode.
37#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
38pub struct TimerDequeueTicket
39{
40    /// A single instance of arc which the presense of which indicates that
41    /// the instance is valid. It holds the uniq ID which can be used to
42    /// remove the instance from the queue manually.
43    status: Arc<TimerDequeueId>,
44}
45
46impl fmt::Display for TimerDequeueTicket
47{
48    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
49    {
50        write!(f, "{}", self.status)
51    }
52}
53
54impl PartialEq<TimerDequeueId> for TimerDequeueTicket
55{
56    fn eq(&self, other: &TimerDequeueId) -> bool 
57    {
58        return self.status.as_ref() == other;
59    }
60}
61
62impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicketIssuer<MODE>> for TimerDequeueTicket
63{
64    fn eq(&self, other: &TimerDequeueTicketIssuer<MODE>) -> bool 
65    {
66        let Some(up) = other.weak_status.upgrade()
67            else { return false };
68
69        return self.status.as_ref() == up.as_ref();
70    }
71}
72
73impl AsRef<TimerDequeueId> for TimerDequeueTicket
74{
75    fn as_ref(&self) -> &TimerDequeueId 
76    {
77        return &self.status;
78    }
79}
80
81impl TimerDequeueTicket
82{
83    fn new() -> Self
84    {
85        return Self{ status: Arc::new(TimerDequeueId::new() ) }
86    }
87
88    fn pair(&self) -> Weak<TimerDequeueId>
89    {
90        return Arc::downgrade(&self.status);
91    }
92
93    pub 
94    fn get_deque_id(&self) -> &TimerDequeueId
95    {
96        return self.status.as_ref();
97    }
98
99    pub 
100    fn is_queued(&self) -> bool
101    {
102        return Arc::weak_count(&self.status) > 1;
103    }
104}
105
106/// A type of deque which issues a `tickets` in form of [TimerDequeueTicket] 
107/// when the timer is set. This type of timer dequeue allows not to delete the
108/// item from the queue and just drop the `ticket` instance. The timer dequeue 
109/// would ignore the dropped instances.
110/// 
111/// # Generics
112/// 
113/// `MODE` - a [OrderedTimerDequeMode] which defines the deque behaviour. There are
114///     two types of the operation:
115/// 
116/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
117/// 
118/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
119///     until the item is not removed from the queue manually.
120/// 
121/// # Examples
122/// 
123/// ```ignore
124/// let mut time_list = 
125///     OrderedTimerDeque
126///         ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
127///         ::new("test_label".into(), 4, false).unwrap();
128/// ```
129/// or
130/// ```ignore
131/// let mut time_list = 
132///     OrderedTimerDeque
133///         ::<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
134///         ::new("test_label".into(), 4, false).unwrap();
135/// ```
136#[derive(Debug)]
137pub struct TimerDequeueTicketIssuer<MODE: OrderedTimerDequeMode>
138{
139    /// A [Weak] reference to [Arc] which holds the queue identification.
140    weak_status: Weak<TimerDequeueId>,
141
142    /// An absolute timestamp in seconds (UTC time) sets the timer.
143    timeout_mode: MODE,
144}
145
146impl<MODE: OrderedTimerDequeMode> fmt::Display for TimerDequeueTicketIssuer<MODE>
147{  
148    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
149    { 
150        write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()), 
151            self.timeout_mode)
152    }
153}
154
155impl<MODE: OrderedTimerDequeMode> Eq for TimerDequeueTicketIssuer<MODE> {}
156
157impl<MODE: OrderedTimerDequeMode> PartialEq for TimerDequeueTicketIssuer<MODE>
158{
159    fn eq(&self, other: &Self) -> bool 
160    {
161        let s = self.weak_status.upgrade();
162        let o = other.weak_status.upgrade();
163
164        return s == o;
165    }
166}
167
168impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer<MODE>
169{
170    fn eq(&self, other: &TimerDequeueTicket) -> bool 
171    {
172        let Some(s) = self.weak_status.upgrade()
173            else { return false };
174
175        return s.as_ref() == other.status.as_ref();
176    }
177}
178
179impl<MODE: OrderedTimerDequeMode> Ord for TimerDequeueTicketIssuer<MODE>
180{
181    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
182    {
183        return self.timeout_mode.cmp(&other.timeout_mode);
184    }
185}
186
187impl<MODE: OrderedTimerDequeMode> PartialOrd for TimerDequeueTicketIssuer<MODE>
188{
189    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
190    {
191        return Some(self.cmp(other));
192    }
193}
194
195
196impl TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>
197{ 
198    /// Initializes new instance.
199    fn new(rel_time: RelativeTime) -> TimerResult<(Self, TimerDequeueTicket)>
200    {        
201        let ext_ticket = 
202            TimerDequeueTicket::new();
203
204        let int_ticket = 
205            Self
206            {
207                weak_status: 
208                    ext_ticket.pair(),
209                timeout_mode: 
210                   OrderdTimerDequePeriodic::new(rel_time)
211            };
212
213
214        return Ok((int_ticket, ext_ticket));
215    }
216}
217
218impl TimerDequeueTicketIssuer<OrderdTimerDequeOnce>
219{ 
220    /// Initializes new instance.
221    fn new(abs_time: AbsoluteTime) -> TimerResult<(Self, TimerDequeueTicket)>
222    {
223
224        let cur = AbsoluteTime::now();
225
226        if cur > abs_time
227        {
228            timer_err!(TimerErrorType::Expired, "time already expired");
229        }
230        
231        let ext_ticket = 
232            TimerDequeueTicket::new();
233
234        let int_ticket = 
235            Self
236            {
237                weak_status: 
238                    ext_ticket.pair(),
239                timeout_mode: 
240                    OrderdTimerDequeOnce::new(abs_time)
241            };
242
243
244        return Ok((int_ticket, ext_ticket));
245    }
246}
247
248impl<MODE: OrderedTimerDequeMode> TimerDequeueTicketIssuer<MODE>
249{
250    /// Checks if the instance is still actual i.e was not dropeed.
251    fn is_valid(&self) -> bool
252    {
253        return self.weak_status.upgrade().is_some();
254    }
255
256    /// Obtains the [TimerDequeueId] only if the [Weak] pointer `weak_status`
257    /// to ID is still actual.
258    fn into_inner(&self) -> Option<TimerDequeueId>
259    {
260        return self.weak_status.upgrade().map(|f| *f.as_ref());
261    }
262}
263
264
265impl<MODE: OrderedTimerDequeMode> OrderedTimerDequeIntrf for TimerDequeueTicketIssuer<MODE>
266{
267    /// No target is requred.
268    type Target = common::NoTarget;
269
270    /// Return the ticket.
271    type Ticket = TimerDequeueTicket;
272
273    #[inline]
274    fn get_timeout_absolute(&self) -> AbsoluteTime 
275    {
276        return self.timeout_mode.get_absolut_timeout();
277    }
278}
279
280impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
281{
282    /// Adds the new entry to the timeout qeueu which after alarm
283    /// removes the entry from the queue. A `ticket` will be 
284    /// issued which can be used to either deactivate the record 
285    /// by droppping it or to remove it manually.
286    /// 
287    /// # Arguemnts
288    /// 
289    /// `abs_time` - a [AbsoluteTime] absolute time in future when the timeout
290    ///     should happen.
291    /// 
292    /// # Returns
293    /// 
294    /// A [Result] as alias [TimerResult] is returned with:
295    /// 
296    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
297    /// 
298    /// * [Result::Err] with error description.
299    #[inline]
300    pub   
301    fn add_to_timer(&mut self, abs_time: AbsoluteTime) -> TimerResult<TimerDequeueTicket>
302    {
303        let (inst, ticket) = 
304            TimerDequeueTicketIssuer::<OrderdTimerDequeOnce>::new(abs_time)?;
305
306        return self.add_to_timer_local(inst).map(|_| ticket);
307    }
308
309    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
310    /// or [Self::poll] can be used to obtain the event.
311    /// 
312    /// # Arguments
313    /// 
314    /// `res` - [TimerReadRes] an event from the timer to handle.
315    /// 
316    /// `timeout_items` - a vector of an enitities for which timeout have happened.
317    /// 
318    /// A [Result] as alias [TimerResult] is returned with:
319    /// 
320    /// * [Result::Ok] witout any innder data.
321    /// 
322    /// * [Result::Err] with error description.
323    pub 
324    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
325    {
326        // ignore wouldblock
327        if let TimerReadRes::WouldBlock = res
328        {
329            return Ok(());
330        }
331
332        let cur_timestamp = AbsoluteTime::now();
333
334        loop 
335        {
336            // get from front of the queue
337            let Some(front_entity) = 
338                self.deque_timeout_list.front() 
339                else 
340                { 
341                    break;
342                };
343
344            let time_until = front_entity.get_timeout_absolute();
345
346            if time_until <= cur_timestamp
347            {
348                if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
349                {
350                    timeout_items.push(item);
351                }
352
353                // else item has gone previously or while we was messing around with queue
354            }
355            else
356            {
357                break;
358            }
359        }
360
361        // call timer reschedule
362        self.reschedule_timer()?;
363
364        return Ok(());
365    }
366}
367
368impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
369{
370    /// Adds the new entry to the timeout queue which after alarm
371    /// extends the timeout and sets the instance in the queue again. 
372    /// A `ticket` will be issued which can be used to either deactivate 
373    /// the record by droppping it or to remove it manually.
374    /// 
375    /// # Arguemnts
376    /// 
377    /// `rel_time` - a [RelativeTime] which will be used to extend the further 
378    ///     timeouts. .
379    /// 
380    /// # Returns
381    /// 
382    /// A [Result] as alias [TimerResult] is returned with:
383    /// 
384    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
385    /// 
386    /// * [Result::Err] with error description.
387    #[inline]
388    pub   
389    fn add_to_timer(&mut self, rel_time: RelativeTime) -> TimerResult<TimerDequeueTicket>
390    {
391        let (inst, ticket) = 
392            TimerDequeueTicketIssuer::<OrderdTimerDequePeriodic>::new(rel_time)?;
393
394        return self.add_to_timer_local(inst).map(|_| ticket);
395    }
396
397    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
398    /// or [Self::poll] can be used to obtain the event.
399    /// 
400    /// # Arguments
401    /// 
402    /// `res` - [TimerReadRes] an event from the timer to handle.
403    /// 
404    /// `timeout_items` - a vector of an enitities for which timeout have happened.
405    /// 
406    /// A [Result] as alias [TimerResult] is returned with:
407    /// 
408    /// * [Result::Ok] witout any innder data.
409    /// 
410    /// * [Result::Err] with error description.
411    pub 
412    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
413    {
414        // ignore wouldblock
415        if let TimerReadRes::WouldBlock = res
416        {
417            return Ok(());
418        }
419
420        let cur_timestamp = AbsoluteTime::now();
421
422        loop 
423        {
424            // get from front of the queue
425            let Some(front_entity) = 
426                self.deque_timeout_list.front() 
427                else 
428                { 
429                    break;
430                };
431
432            let time_until = front_entity.get_timeout_absolute();
433
434            if time_until <= cur_timestamp
435            {
436                let mut deq = self.deque_timeout_list.pop_front().unwrap();
437
438                if let Some(item) = deq.into_inner()
439                {
440                    // store the ID of item which is dequed
441                    timeout_items.push(item);
442
443                    // advance the timeout
444                    deq.timeout_mode.advance_timeout();
445
446                    // return back to deque with new timeout
447                    self.add_to_timer_local(deq)?;
448                }
449
450                // else item has gone previously or while we was messing around with queue
451            }
452            else
453            {
454                break;
455            }
456        }
457
458        // call timer reschedule
459        self.reschedule_timer()?;
460
461        return Ok(());
462    }
463
464}
465
466impl<MODE: OrderedTimerDequeMode> OrderedTimerDeque<TimerDequeueTicketIssuer<MODE>>
467{
468    /// Removes the instance from the queue by the issued `ticket`.
469    /// 
470    /// # Arguments
471    /// 
472    /// `ticket` - [TimerDequeueTicket] an issued ticket.
473    /// 
474    /// # Returns
475    /// 
476    /// A [Result] as alias [TimerResult] is returned with:
477    /// 
478    /// * [Result::Ok] witout any innder data.
479    /// 
480    /// * [Result::Err] with error description.
481    pub 
482    fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
483    {
484        if self.deque_timeout_list.len() == 0
485        {
486            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
487        }
488        else
489        {
490            // search for the item in list
491            if self.deque_timeout_list.len() == 1
492            {
493                // just pop the from the front
494                let _ = self.deque_timeout_list.pop_front();
495
496                // stop timer
497                self.stop_timer()?;
498
499                return Ok(());
500            }
501            else
502            {
503                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
504                // succesfully for temoved instance.
505
506                for (pos, q_item) 
507                in self.deque_timeout_list.iter().enumerate()
508                {
509                    if q_item == &ticket
510                    {
511                        // remove by the index
512                        let _ = 
513                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
514
515                        // call timer reset if index is 0 (front)
516                        if pos == 0 
517                        {
518                            self.reschedule_timer()?;
519                        }
520
521                        return Ok(());
522                    }
523                }
524
525                return Ok(());
526            }
527        }
528    }
529}
530
531#[cfg(test)]
532mod tests
533{
534    use std::{cmp::Ordering, time::{Duration, Instant}};
535
536    use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueId, TimerDequeueTicketIssuer};
537
538    #[test]
539    fn test_ticket_0()
540    {
541        let mut time_list = 
542            OrderedTimerDeque
543                ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
544                ::new("test_label".into(), 4, false).unwrap();
545
546
547        let s = Instant::now();
548
549        let ts = common::get_current_timestamp();
550
551        let tss_set = AbsoluteTime::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64);
552        
553        let ticket = time_list.add_to_timer(tss_set).unwrap();
554
555        println!("ticket issued: {}", ticket);
556
557        for _ in 0..3
558        {
559            let event = time_list.wait_for_event().unwrap();
560            if let TimerReadRes::WouldBlock = event
561            {
562                std::thread::sleep(Duration::from_millis(1001));
563
564                continue;
565            }
566            
567            assert_eq!(event, TimerReadRes::Ok(1));
568
569            let e = s.elapsed();
570            let ts = AbsoluteTime::now();
571
572            assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
573            println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
574
575            let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
576
577            time_list.timeout_event_handler(event, &mut tm_items).unwrap();
578
579            println!("tickets: {}", tm_items[0]);
580
581            assert_eq!(tm_items.len(), 1);
582            assert_eq!(tm_items.first().is_some(), true);
583            assert_eq!(tm_items.first().unwrap(), &ticket);
584
585            return;
586        }
587
588        panic!("timeout befor timer!");
589    }
590}