timer_deque_rs/deque_timeout/
timer_tickets.rs

1/*-
2 * timer-deque-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov alex@nixd.org
6 *  4neko.org alex@4neko.org
7 * 
8 * The timer-rs crate can be redistributed and/or modified
9 * under the terms of either of the following licenses:
10 *
11 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
12 *                     
13 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
14 */
15
16use std::{fmt, sync::{Arc, Weak}};
17
18use crate::
19{
20    common::{self, TimerDequeueId}, 
21    deque_timeout::{OrderdTimerDequeOnce, OrderdTimerDequePeriodic, OrderedTimerDequeMode}, 
22    error::{TimerErrorType, TimerResult}, 
23    timer_err, 
24    timer_portable::timer::{AbsoluteTime, RelativeTime}, 
25    TimerReadRes
26};
27
28use super::{OrderedTimerDeque, OrderedTimerDequeIntrf};
29
30
31impl PartialEq<TimerDequeueTicket> for TimerDequeueId
32{
33    fn eq(&self, other: &TimerDequeueTicket) -> bool 
34    {
35        return self == other.status.as_ref();
36    }
37}
38
39/// A ticket which is issued to the route which added the instance
40/// to the queue. If this instance is dropped, the [Weak] reference to [Arc]
41/// will no longer be valid and the timer would ignore the instance and remove
42/// it from the queue independeltly from the deque operation mode.
43#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
44pub struct TimerDequeueTicket
45{
46    /// A single instance of arc which the presense of which indicates that
47    /// the instance is valid. It holds the uniq ID which can be used to
48    /// remove the instance from the queue manually.
49    status: Arc<TimerDequeueId>,
50}
51
52impl fmt::Display for TimerDequeueTicket
53{
54    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
55    {
56        write!(f, "{}", self.status)
57    }
58}
59
60impl PartialEq<TimerDequeueId> for TimerDequeueTicket
61{
62    fn eq(&self, other: &TimerDequeueId) -> bool 
63    {
64        return self.status.as_ref() == other;
65    }
66}
67
68impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicketIssuer<MODE>> for TimerDequeueTicket
69{
70    fn eq(&self, other: &TimerDequeueTicketIssuer<MODE>) -> bool 
71    {
72        let Some(up) = other.weak_status.upgrade()
73            else { return false };
74
75        return self.status.as_ref() == up.as_ref();
76    }
77}
78
79impl AsRef<TimerDequeueId> for TimerDequeueTicket
80{
81    fn as_ref(&self) -> &TimerDequeueId 
82    {
83        return &self.status;
84    }
85}
86
87impl TimerDequeueTicket
88{
89    fn new() -> Self
90    {
91        return Self{ status: Arc::new(TimerDequeueId::new() ) }
92    }
93
94    fn pair(&self) -> Weak<TimerDequeueId>
95    {
96        return Arc::downgrade(&self.status);
97    }
98
99    pub 
100    fn get_deque_id(&self) -> &TimerDequeueId
101    {
102        return self.status.as_ref();
103    }
104
105    pub 
106    fn is_queued(&self) -> bool
107    {
108        return Arc::weak_count(&self.status) > 1;
109    }
110}
111
112/// A type of deque which issues a `tickets` in form of [TimerDequeueTicket] 
113/// when the timer is set. This type of timer dequeue allows not to delete the
114/// item from the queue and just drop the `ticket` instance. The timer dequeue 
115/// would ignore the dropped instances.
116/// 
117/// # Generics
118/// 
119/// `MODE` - a [OrderedTimerDequeMode] which defines the deque behaviour. There are
120///     two types of the operation:
121/// 
122/// * [OrderdTimerDequeOnce] - after timeout the element is removed from the queue.
123/// 
124/// * [OrderdTimerDequePeriodic] - after timeout the element timeout is extended
125///     until the item is not removed from the queue manually.
126/// 
127/// # Examples
128/// 
129/// ```ignore
130/// let mut time_list = 
131///     OrderedTimerDeque
132///         ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
133///         ::new("test_label".into(), 4, false).unwrap();
134/// ```
135/// or
136/// ```ignore
137/// let mut time_list = 
138///     OrderedTimerDeque
139///         ::<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
140///         ::new("test_label".into(), 4, false).unwrap();
141/// ```
142#[derive(Debug)]
143pub struct TimerDequeueTicketIssuer<MODE: OrderedTimerDequeMode>
144{
145    /// A [Weak] reference to [Arc] which holds the queue identification.
146    weak_status: Weak<TimerDequeueId>,
147
148    /// An absolute timestamp in seconds (UTC time) sets the timer.
149    timeout_mode: MODE,
150}
151
152impl<MODE: OrderedTimerDequeMode> fmt::Display for TimerDequeueTicketIssuer<MODE>
153{  
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
155    { 
156        write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()), 
157            self.timeout_mode)
158    }
159}
160
161impl<MODE: OrderedTimerDequeMode> Eq for TimerDequeueTicketIssuer<MODE> {}
162
163impl<MODE: OrderedTimerDequeMode> PartialEq for TimerDequeueTicketIssuer<MODE>
164{
165    fn eq(&self, other: &Self) -> bool 
166    {
167        let s = self.weak_status.upgrade();
168        let o = other.weak_status.upgrade();
169
170        return s == o;
171    }
172}
173
174impl<MODE: OrderedTimerDequeMode> PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer<MODE>
175{
176    fn eq(&self, other: &TimerDequeueTicket) -> bool 
177    {
178        let Some(s) = self.weak_status.upgrade()
179            else { return false };
180
181        return s.as_ref() == other.status.as_ref();
182    }
183}
184
185impl<MODE: OrderedTimerDequeMode> Ord for TimerDequeueTicketIssuer<MODE>
186{
187    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
188    {
189        return self.timeout_mode.cmp(&other.timeout_mode);
190    }
191}
192
193impl<MODE: OrderedTimerDequeMode> PartialOrd for TimerDequeueTicketIssuer<MODE>
194{
195    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
196    {
197        return Some(self.cmp(other));
198    }
199}
200
201
202impl TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>
203{ 
204    /// Initializes new instance.
205    fn new(rel_time: RelativeTime) -> TimerResult<(Self, TimerDequeueTicket)>
206    {        
207        let ext_ticket = 
208            TimerDequeueTicket::new();
209
210        let int_ticket = 
211            Self
212            {
213                weak_status: 
214                    ext_ticket.pair(),
215                timeout_mode: 
216                   OrderdTimerDequePeriodic::new(rel_time)
217            };
218
219
220        return Ok((int_ticket, ext_ticket));
221    }
222}
223
224impl TimerDequeueTicketIssuer<OrderdTimerDequeOnce>
225{ 
226    /// Initializes new instance.
227    fn new(abs_time: AbsoluteTime) -> TimerResult<(Self, TimerDequeueTicket)>
228    {
229
230        let cur = AbsoluteTime::now();
231
232        if cur > abs_time
233        {
234            timer_err!(TimerErrorType::Expired, "time already expired");
235        }
236        
237        let ext_ticket = 
238            TimerDequeueTicket::new();
239
240        let int_ticket = 
241            Self
242            {
243                weak_status: 
244                    ext_ticket.pair(),
245                timeout_mode: 
246                    OrderdTimerDequeOnce::new(abs_time)
247            };
248
249
250        return Ok((int_ticket, ext_ticket));
251    }
252}
253
254impl<MODE: OrderedTimerDequeMode> TimerDequeueTicketIssuer<MODE>
255{
256    /// Checks if the instance is still actual i.e was not dropeed.
257    fn is_valid(&self) -> bool
258    {
259        return self.weak_status.upgrade().is_some();
260    }
261
262    /// Obtains the [TimerDequeueId] only if the [Weak] pointer `weak_status`
263    /// to ID is still actual.
264    fn into_inner(&self) -> Option<TimerDequeueId>
265    {
266        return self.weak_status.upgrade().map(|f| *f.as_ref());
267    }
268}
269
270
271impl<MODE: OrderedTimerDequeMode> OrderedTimerDequeIntrf for TimerDequeueTicketIssuer<MODE>
272{
273    /// No target is requred.
274    type Target = common::NoTarget;
275
276    /// Return the ticket.
277    type Ticket = TimerDequeueTicket;
278
279    #[inline]
280    fn get_timeout_absolute(&self) -> AbsoluteTime 
281    {
282        return self.timeout_mode.get_absolut_timeout();
283    }
284}
285
286impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
287{
288    /// Adds the new entry to the timeout qeueu which after alarm
289    /// removes the entry from the queue. A `ticket` will be 
290    /// issued which can be used to either deactivate the record 
291    /// by droppping it or to remove it manually.
292    /// 
293    /// # Arguemnts
294    /// 
295    /// `abs_time` - a [AbsoluteTime] absolute time in future when the timeout
296    ///     should happen.
297    /// 
298    /// # Returns
299    /// 
300    /// A [Result] as alias [TimerResult] is returned with:
301    /// 
302    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
303    /// 
304    /// * [Result::Err] with error description.
305    #[inline]
306    pub   
307    fn add_to_timer(&mut self, abs_time: AbsoluteTime) -> TimerResult<TimerDequeueTicket>
308    {
309        let (inst, ticket) = 
310            TimerDequeueTicketIssuer::<OrderdTimerDequeOnce>::new(abs_time)?;
311
312        return self.add_to_timer_local(inst).map(|_| ticket);
313    }
314
315    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
316    /// or [Self::poll] can be used to obtain the event.
317    /// 
318    /// # Arguments
319    /// 
320    /// `res` - [TimerReadRes] an event from the timer to handle.
321    /// 
322    /// `timeout_items` - a vector of an enitities for which timeout have happened.
323    /// 
324    /// A [Result] as alias [TimerResult] is returned with:
325    /// 
326    /// * [Result::Ok] witout any innder data.
327    /// 
328    /// * [Result::Err] with error description.
329    pub 
330    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
331    {
332        // ignore wouldblock
333        if let TimerReadRes::WouldBlock = res
334        {
335            return Ok(());
336        }
337
338        let cur_timestamp = AbsoluteTime::now();
339
340        loop 
341        {
342            // get from front of the queue
343            let Some(front_entity) = 
344                self.deque_timeout_list.front() 
345                else 
346                { 
347                    break;
348                };
349
350            let time_until = front_entity.get_timeout_absolute();
351
352            if time_until <= cur_timestamp
353            {
354                if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
355                {
356                    timeout_items.push(item);
357                }
358
359                // else item has gone previously or while we was messing around with queue
360            }
361            else
362            {
363                break;
364            }
365        }
366
367        // call timer reschedule
368        self.reschedule_timer()?;
369
370        return Ok(());
371    }
372}
373
374impl OrderedTimerDeque<TimerDequeueTicketIssuer<OrderdTimerDequePeriodic>>
375{
376    /// Adds the new entry to the timeout queue which after alarm
377    /// extends the timeout and sets the instance in the queue again. 
378    /// A `ticket` will be issued which can be used to either deactivate 
379    /// the record by droppping it or to remove it manually.
380    /// 
381    /// # Arguemnts
382    /// 
383    /// `rel_time` - a [RelativeTime] which will be used to extend the further 
384    ///     timeouts. .
385    /// 
386    /// # Returns
387    /// 
388    /// A [Result] as alias [TimerResult] is returned with:
389    /// 
390    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
391    /// 
392    /// * [Result::Err] with error description.
393    #[inline]
394    pub   
395    fn add_to_timer(&mut self, rel_time: RelativeTime) -> TimerResult<TimerDequeueTicket>
396    {
397        let (inst, ticket) = 
398            TimerDequeueTicketIssuer::<OrderdTimerDequePeriodic>::new(rel_time)?;
399
400        return self.add_to_timer_local(inst).map(|_| ticket);
401    }
402
403    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
404    /// or [Self::poll] can be used to obtain the event.
405    /// 
406    /// # Arguments
407    /// 
408    /// `res` - [TimerReadRes] an event from the timer to handle.
409    /// 
410    /// `timeout_items` - a vector of an enitities for which timeout have happened.
411    /// 
412    /// A [Result] as alias [TimerResult] is returned with:
413    /// 
414    /// * [Result::Ok] witout any innder data.
415    /// 
416    /// * [Result::Err] with error description.
417    pub 
418    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
419    {
420        // ignore wouldblock
421        if let TimerReadRes::WouldBlock = res
422        {
423            return Ok(());
424        }
425
426        let cur_timestamp = AbsoluteTime::now();
427
428        loop 
429        {
430            // get from front of the queue
431            let Some(front_entity) = 
432                self.deque_timeout_list.front() 
433                else 
434                { 
435                    break;
436                };
437
438            let time_until = front_entity.get_timeout_absolute();
439
440            if time_until <= cur_timestamp
441            {
442                let mut deq = self.deque_timeout_list.pop_front().unwrap();
443
444                if let Some(item) = deq.into_inner()
445                {
446                    // store the ID of item which is dequed
447                    timeout_items.push(item);
448
449                    // advance the timeout
450                    deq.timeout_mode.advance_timeout();
451
452                    // return back to deque with new timeout
453                    self.add_to_timer_local(deq)?;
454                }
455
456                // else item has gone previously or while we was messing around with queue
457            }
458            else
459            {
460                break;
461            }
462        }
463
464        // call timer reschedule
465        self.reschedule_timer()?;
466
467        return Ok(());
468    }
469
470}
471
472impl<MODE: OrderedTimerDequeMode> OrderedTimerDeque<TimerDequeueTicketIssuer<MODE>>
473{
474    /// Removes the instance from the queue by the issued `ticket`.
475    /// 
476    /// # Arguments
477    /// 
478    /// `ticket` - [TimerDequeueTicket] an issued ticket.
479    /// 
480    /// # Returns
481    /// 
482    /// A [Result] as alias [TimerResult] is returned with:
483    /// 
484    /// * [Result::Ok] witout any innder data.
485    /// 
486    /// * [Result::Err] with error description.
487    pub 
488    fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
489    {
490        if self.deque_timeout_list.len() == 0
491        {
492            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
493        }
494        else
495        {
496            // search for the item in list
497            if self.deque_timeout_list.len() == 1
498            {
499                // just pop the from the front
500                let _ = self.deque_timeout_list.pop_front();
501
502                // stop timer
503                self.stop_timer()?;
504
505                return Ok(());
506            }
507            else
508            {
509                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
510                // succesfully for temoved instance.
511
512                for (pos, q_item) 
513                in self.deque_timeout_list.iter().enumerate()
514                {
515                    if q_item == &ticket
516                    {
517                        // remove by the index
518                        let _ = 
519                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
520
521                        // call timer reset if index is 0 (front)
522                        if pos == 0 
523                        {
524                            self.reschedule_timer()?;
525                        }
526
527                        return Ok(());
528                    }
529                }
530
531                return Ok(());
532            }
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests
539{
540    use std::{cmp::Ordering, time::{Duration, Instant}};
541
542    use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::timer::{AbsoluteTime, TimerReadRes}, OrderedTimerDeque, TimerDequeueId, TimerDequeueTicketIssuer};
543
544    #[test]
545    fn test_ticket_0()
546    {
547        let mut time_list = 
548            OrderedTimerDeque
549                ::<TimerDequeueTicketIssuer<OrderdTimerDequeOnce>>
550                ::new("test_label".into(), 4, false).unwrap();
551
552
553        let s = Instant::now();
554
555        let ts = common::get_current_timestamp();
556
557        let tss_set = 
558            AbsoluteTime
559                ::new_time(ts.timestamp()+2, ts.timestamp_subsec_nanos() as i64)
560                    .unwrap();
561        
562        let ticket = time_list.add_to_timer(tss_set).unwrap();
563
564        println!("ticket issued: {}", ticket);
565
566        for _ in 0..3
567        {
568            let event = time_list.wait_for_event().unwrap();
569            if let TimerReadRes::WouldBlock = event
570            {
571                std::thread::sleep(Duration::from_millis(1001));
572
573                continue;
574            }
575            
576            assert_eq!(event, TimerReadRes::Ok(1));
577
578            let e = s.elapsed();
579            let ts = AbsoluteTime::now();
580
581            assert_eq!(ts.seconds_cmp(&tss_set) == Ordering::Equal, true);
582            println!("ev: {}, instant: {:?} ts: {}, timerset: {}", event, e, ts, tss_set);
583
584            let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
585
586            time_list.timeout_event_handler(event, &mut tm_items).unwrap();
587
588            println!("tickets: {}", tm_items[0]);
589
590            assert_eq!(tm_items.len(), 1);
591            assert_eq!(tm_items.first().is_some(), true);
592            assert_eq!(tm_items.first().unwrap(), &ticket);
593
594            return;
595        }
596
597        panic!("timeout befor timer!");
598    }
599}