timer_deque_rs/
timer_tickets.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::{fmt, sync::{Arc, Weak}};
16
17use rand::RngCore;
18
19use crate::
20{
21    common, 
22    error::{TimerError, TimerErrorType, TimerResult}, 
23    timer::{OrderedTimerDeque, OrderedTimerDequeIntrf}, 
24    timer_err, 
25    timer_portable::timer::TimerExpMode, TimerReadRes
26};
27
28/// A uniq ID which is generated for every ticket. It is a 128bit
29/// value which consists from timestamp sec and ns and random number.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct TimerDequeueId
32{
33    /// seconds
34    timestamp: i64,
35
36    /// nanoseconds
37    timestamp_ns: u32,
38
39    /// random number
40    ran_id: u32,
41}
42
43impl fmt::Display for TimerDequeueId
44{
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
46    {
47        write!(f, "{:X}/{:X}/{:X}", self.timestamp, self.timestamp_ns, self.ran_id)
48    }
49}
50
51impl PartialEq<TimerDequeueTicket> for TimerDequeueId
52{
53    fn eq(&self, other: &TimerDequeueTicket) -> bool 
54    {
55        return self == other.status.as_ref();
56    }
57}
58
59impl AsRef<TimerDequeueId> for TimerDequeueId
60{
61    fn as_ref(&self) -> &TimerDequeueId 
62    {
63        return self;
64    }
65}
66
67impl TimerDequeueId
68{
69    fn new() -> Self
70    {
71        let mut rng = rand::rng();
72
73        let ts = common::get_current_timestamp();
74
75        return 
76            Self
77            { 
78                timestamp: ts.timestamp(), 
79                timestamp_ns: ts.timestamp_subsec_nanos(), 
80                ran_id: rng.next_u32(), 
81            }; 
82    }
83}
84
85#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
86pub struct TimerDequeueTicket
87{
88   status: Arc<TimerDequeueId>,
89}
90
91impl fmt::Display for TimerDequeueTicket
92{
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
94    {
95        write!(f, "{}", self.status)
96    }
97}
98
99impl PartialEq<TimerDequeueId> for TimerDequeueTicket
100{
101    fn eq(&self, other: &TimerDequeueId) -> bool 
102    {
103        return self.status.as_ref() == other;
104    }
105}
106
107impl PartialEq<TimerDequeueTicketIssuer> for TimerDequeueTicket
108{
109    fn eq(&self, other: &TimerDequeueTicketIssuer) -> bool 
110    {
111        let Some(up) = other.weak_status.upgrade()
112            else { return false };
113
114        return self.status.as_ref() == up.as_ref();
115    }
116}
117
118impl AsRef<TimerDequeueId> for TimerDequeueTicket
119{
120    fn as_ref(&self) -> &TimerDequeueId 
121    {
122        return &self.status;
123    }
124}
125
126impl TimerDequeueTicket
127{
128    fn new() -> Self
129    {
130        return Self{ status: Arc::new(TimerDequeueId::new() ) }
131    }
132
133    fn pair(&self) -> Weak<TimerDequeueId>
134    {
135        return Arc::downgrade(&self.status);
136    }
137
138    pub 
139    fn get_deque_id(&self) -> &TimerDequeueId
140    {
141        return self.status.as_ref();
142    }
143
144    pub 
145    fn is_queued(&self) -> bool
146    {
147        return Arc::weak_count(&self.status) > 1;
148    }
149}
150
151/// A type of deque which issues a `tickets` in form of [TimerDequeueTicket] 
152/// when the timer is set. This type of timer dequeue allows not to delete the
153/// item from the queue and just drop the `ticket` instance. The timer dequeue 
154/// would ignore the dropped instances.
155#[derive(Debug)]
156pub struct TimerDequeueTicketIssuer
157{
158    /// A [Weak] reference to [Arc] which holds the queue identification.
159    weak_status: Weak<TimerDequeueId>,
160
161    /// An absolute timestamp in seconds (UTC time) sets the timer.
162    timeout_absolute: TimerExpMode,
163}
164
165impl fmt::Display for TimerDequeueTicketIssuer
166{  
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
168    { 
169        write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()), 
170            self.timeout_absolute)
171    }
172}
173
174impl Eq for TimerDequeueTicketIssuer {}
175
176impl PartialEq for TimerDequeueTicketIssuer
177{
178    fn eq(&self, other: &Self) -> bool 
179    {
180        let s = self.weak_status.upgrade();
181        let o = other.weak_status.upgrade();
182
183        return s == o;
184    }
185}
186
187impl PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer
188{
189    fn eq(&self, other: &TimerDequeueTicket) -> bool 
190    {
191        let Some(s) = self.weak_status.upgrade()
192            else { return false };
193
194        return s.as_ref() == other.status.as_ref();
195    }
196}
197
198impl Ord for TimerDequeueTicketIssuer
199{
200    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
201    {
202        return self.timeout_absolute.cmp(&other.timeout_absolute);
203    }
204}
205
206impl PartialOrd for TimerDequeueTicketIssuer
207{
208    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
209    {
210        return Some(self.cmp(other));
211    }
212}
213
214
215impl TimerDequeueTicketIssuer
216{ 
217    /// Initializes new instance.
218    fn new(abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, TimerDequeueTicket)>
219    {
220        let abs_time_nsec = abs_time_nsec & 999_999_999;
221
222        let cur = common::get_current_timestamp();
223        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
224
225        if TimerExpMode::from(cur) > req
226        {
227            timer_err!(TimerErrorType::Expired, "time already expired");
228        }
229
230        
231        let ext_ticket = 
232            TimerDequeueTicket::new();
233
234        let int_ticket = 
235            Self
236            {
237                weak_status: ext_ticket.pair(),
238                timeout_absolute: req
239            };
240
241
242        return Ok((int_ticket, ext_ticket));
243    }
244
245    /// Returns the absolute timeout of the current instance.
246    fn get_time_until(&self) -> TimerExpMode
247    {
248        return self.timeout_absolute;
249    }
250
251    /// Checks if the instance is still actual i.e was not dropeed.
252    fn is_valid(&self) -> bool
253    {
254        return self.weak_status.upgrade().is_some();
255    }
256
257    /// Obtains the [TimerDequeueId] only if the [Weak] pointer `weak_status`
258    /// to ID is still actual.
259    fn into_inner(&self) -> Option<TimerDequeueId>
260    {
261        return self.weak_status.upgrade().map(|f| *f.as_ref());
262    }
263}
264
265
266impl OrderedTimerDequeIntrf for TimerDequeueTicketIssuer
267{
268    /// No target is requred.
269    type Target = common::NoTarget;
270
271    /// Return the ticket.
272    type Ticket = TimerDequeueTicket;
273
274    #[inline]
275    fn wrap(_target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
276    {
277        return Self::new(abs_time_sec, abs_time_nsec);
278    }
279
280    #[inline]
281    fn get_timeout_absolute(&self) -> TimerExpMode 
282    {
283        return self.timeout_absolute;
284    }
285}
286
287impl OrderedTimerDeque<TimerDequeueTicketIssuer>
288{
289    /// Adds the new absolute timeout to the timer dequeue.
290    /// 
291    /// # Arguemnts
292    /// 
293    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
294    /// 
295    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
296    /// 
297    /// # Returns
298    /// 
299    /// A [Result] as alias [TimerResult] is returned with:
300    /// 
301    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
302    /// 
303    /// * [Result::Err] with error description.
304    #[inline]
305    pub   
306    fn add_to_timer(&mut self, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<TimerDequeueTicket>
307    {
308        return self.add_to_timer_local(common::NoTarget, abs_time_sec, abs_time_nsec);
309    }
310
311    /// Removes the instance from the queue by the issued `ticket`.
312    /// 
313    /// # Arguments
314    /// 
315    /// `ticket` - [TimerDequeueTicket] an issued ticket.
316    /// 
317    /// # Returns
318    /// 
319    /// A [Result] as alias [TimerResult] is returned with:
320    /// 
321    /// * [Result::Ok] witout any innder data.
322    /// 
323    /// * [Result::Err] with error description.
324    pub 
325    fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
326    {
327        if self.deque_timeout_list.len() == 0
328        {
329            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
330        }
331        else
332        {
333            // search for the item in list
334            if self.deque_timeout_list.len() == 1
335            {
336                // just pop the from the front
337                let _ = self.deque_timeout_list.pop_front();
338
339                // stop timer
340                self.stop_timer()?;
341
342                return Ok(());
343            }
344            else
345            {
346                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
347                // succesfully for temoved instance.
348
349                for (pos, q_item) 
350                in self.deque_timeout_list.iter().enumerate()
351                {
352                    if q_item == &ticket
353                    {
354                        // remove by the index
355                        let _ = 
356                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
357
358                        // call timer reset if index is 0 (front)
359                        if pos == 0 
360                        {
361                            self.reschedule_timer()?;
362                        }
363
364                        return Ok(());
365                    }
366                }
367
368                return Ok(());
369            }
370        }
371    }
372
373    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
374    /// or [Self::poll] can be used to obtain the event.
375    /// 
376    /// # Arguments
377    /// 
378    /// `res` - [TimerReadRes] an event from the timer to handle.
379    /// 
380    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
381    /// 
382    /// A [Result] as alias [TimerResult] is returned with:
383    /// 
384    /// * [Result::Ok] witout any innder data.
385    /// 
386    /// * [Result::Err] with error description.
387    pub 
388    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
389    {
390        // ignore wouldblock
391        if let TimerReadRes::WouldBlock = res
392        {
393            return Ok(());
394        }
395        
396        // check if the time reset is required
397        let force_timer_resched = 
398            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
399
400            
401        let mut to_remove: Vec<TimerDequeueId> = Vec::new();
402        let mut error: Option<TimerError> = None;
403        let mut last_time_until: TimerExpMode = TimerExpMode::None;
404        let cur_timestamp = common::get_current_timestamp();
405
406        loop 
407        {
408            // get from front of the queue
409            let Some(front_entity) = 
410                self.deque_timeout_list.front() 
411                else 
412                { 
413                    // queue is empty, stop timer if needed.
414                    let Err(e) = self.stop_timer()
415                    else
416                    {
417                        break;
418                    };
419
420                    error = Some(e);
421
422                    break;
423                };
424
425            let time_until = front_entity.get_time_until();
426
427            if time_until <= TimerExpMode::from(cur_timestamp)
428            {
429                if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
430                {
431                    to_remove.push(item);
432                }
433
434                // else item has gone previously or while we was messing around with queue
435            }
436            else
437            {
438                // we need to rearm the timer if so
439                if last_time_until != TimerExpMode::None || force_timer_resched == true
440                {
441                    // call timer reshedule
442                    self.reschedule_timer()?;
443                }
444
445                break;
446            }
447
448            last_time_until = time_until;
449        }
450
451        *timeout_items = to_remove;
452
453        return error.map_or(Ok(()), |f| Err(f));
454    }
455
456}
457
458#[cfg(test)]
459mod tests
460{
461    use std::time::{Duration, Instant};
462
463    use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_tickets::{TimerDequeueId, TimerDequeueTicketIssuer}};
464
465    #[test]
466    fn test_ticket_0()
467    {
468        let mut time_list = 
469            OrderedTimerDeque
470                ::<TimerDequeueTicketIssuer>
471                ::new("test_label".into(), 4, false).unwrap();
472
473
474        let s = Instant::now();
475
476        let tss_set = common::get_current_timestamp().timestamp()+2;
477        
478        let ticket = time_list.add_to_timer(tss_set, 0).unwrap();
479
480        println!("ticket issued: {}", ticket);
481
482        for _ in 0..3
483        {
484            let event = time_list.wait_for_event().unwrap();
485            if let TimerReadRes::WouldBlock = event
486            {
487                std::thread::sleep(Duration::from_millis(1001));
488
489                continue;
490            }
491            
492            assert_eq!(event, TimerReadRes::Ok(1));
493
494            let e = s.elapsed();
495            let ts = common::get_current_timestamp();
496
497            assert_eq!(ts.timestamp(), tss_set);
498            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
499
500            let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
501
502            time_list.timeout_event_handler(event, &mut tm_items).unwrap();
503
504            println!("tickets: {}", tm_items[0]);
505
506            assert_eq!(tm_items.len(), 1);
507            assert_eq!(tm_items.first().is_some(), true);
508            assert_eq!(tm_items.first().unwrap(), &ticket);
509
510            return;
511        }
512
513        panic!("timeout befor timer!");
514    }
515}