timer_deque_rs/
timer_tickets.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::{fmt, sync::{Arc, Weak}};
16
17use rand::RngCore;
18
19use crate::
20{
21    common, 
22    error::{TimerError, TimerErrorType, TimerResult}, 
23    timer::{OrderedTimerDeque, OrderedTimerDequeIntrf}, 
24    timer_err, 
25    timer_portable::timer::TimerExpMode, TimerReadRes
26};
27
28/// A uniq ID which is generated for every ticket. It is a 128bit
29/// value which consists from timestamp sec and ns and random number.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct TimerDequeueId
32{
33    /// seconds
34    timestamp: i64,
35
36    /// nanoseconds
37    timestamp_ns: u32,
38
39    /// random number
40    ran_id: u32,
41}
42
43impl fmt::Display for TimerDequeueId
44{
45    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
46    {
47        write!(f, "{:X}/{:X}/{:X}", self.timestamp, self.timestamp_ns, self.ran_id)
48    }
49}
50
51impl PartialEq<TimerDequeueTicket> for TimerDequeueId
52{
53    fn eq(&self, other: &TimerDequeueTicket) -> bool 
54    {
55        return self == other.status.as_ref();
56    }
57}
58
59impl AsRef<TimerDequeueId> for TimerDequeueId
60{
61    fn as_ref(&self) -> &TimerDequeueId 
62    {
63        return self;
64    }
65}
66
67impl TimerDequeueId
68{
69    fn new() -> Self
70    {
71        let mut rng = rand::rng();
72
73        let ts = common::get_current_timestamp();
74
75        return 
76            Self
77            { 
78                timestamp: ts.timestamp(), 
79                timestamp_ns: ts.timestamp_subsec_nanos(), 
80                ran_id: rng.next_u32(), 
81            }; 
82    }
83}
84
85#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
86pub struct TimerDequeueTicket
87{
88   status: Arc<TimerDequeueId>,
89}
90
91impl fmt::Display for TimerDequeueTicket
92{
93    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
94    {
95        write!(f, "{}", self.status)
96    }
97}
98
99impl PartialEq<TimerDequeueId> for TimerDequeueTicket
100{
101    fn eq(&self, other: &TimerDequeueId) -> bool 
102    {
103        return self.status.as_ref() == other;
104    }
105}
106
107impl PartialEq<TimerDequeueTicketIssuer> for TimerDequeueTicket
108{
109    fn eq(&self, other: &TimerDequeueTicketIssuer) -> bool 
110    {
111        let Some(up) = other.weak_status.upgrade()
112            else { return false };
113
114        return self.status.as_ref() == up.as_ref();
115    }
116}
117
118impl AsRef<TimerDequeueId> for TimerDequeueTicket
119{
120    fn as_ref(&self) -> &TimerDequeueId 
121    {
122        return &self.status;
123    }
124}
125
126impl TimerDequeueTicket
127{
128    fn new() -> Self
129    {
130        return Self{ status: Arc::new(TimerDequeueId::new() ) }
131    }
132
133    fn pair(&self) -> Weak<TimerDequeueId>
134    {
135        return Arc::downgrade(&self.status);
136    }
137
138    pub 
139    fn get_deque_id(&self) -> &TimerDequeueId
140    {
141        return self.status.as_ref();
142    }
143
144    pub 
145    fn is_queued(&self) -> bool
146    {
147        return Arc::weak_count(&self.status) > 1;
148    }
149}
150
151/// A type of deque which issues a `tickets` in form of [TimerDequeueTicket] 
152/// when the timer is set. This type of timer dequeue allows not to delete the
153/// item from the queue and just drop the `ticket` instance. The timer dequeue 
154/// would ignore the dropped instances.
155/// 
156/// # Examples
157/// 
158/// ```ignore
159/// let mut time_list = 
160///     OrderedTimerDeque
161///         ::<TimerDequeueTicketIssuer>
162///         ::new("test_label".into(), 4, false).unwrap();
163/// ```
164#[derive(Debug)]
165pub struct TimerDequeueTicketIssuer
166{
167    /// A [Weak] reference to [Arc] which holds the queue identification.
168    weak_status: Weak<TimerDequeueId>,
169
170    /// An absolute timestamp in seconds (UTC time) sets the timer.
171    timeout_absolute: TimerExpMode,
172}
173
174impl fmt::Display for TimerDequeueTicketIssuer
175{  
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
177    { 
178        write!(f, "{} until: {}", self.weak_status.upgrade().map_or("dropped".into(), |f| f.to_string()), 
179            self.timeout_absolute)
180    }
181}
182
183impl Eq for TimerDequeueTicketIssuer {}
184
185impl PartialEq for TimerDequeueTicketIssuer
186{
187    fn eq(&self, other: &Self) -> bool 
188    {
189        let s = self.weak_status.upgrade();
190        let o = other.weak_status.upgrade();
191
192        return s == o;
193    }
194}
195
196impl PartialEq<TimerDequeueTicket> for TimerDequeueTicketIssuer
197{
198    fn eq(&self, other: &TimerDequeueTicket) -> bool 
199    {
200        let Some(s) = self.weak_status.upgrade()
201            else { return false };
202
203        return s.as_ref() == other.status.as_ref();
204    }
205}
206
207impl Ord for TimerDequeueTicketIssuer
208{
209    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
210    {
211        return self.timeout_absolute.cmp(&other.timeout_absolute);
212    }
213}
214
215impl PartialOrd for TimerDequeueTicketIssuer
216{
217    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
218    {
219        return Some(self.cmp(other));
220    }
221}
222
223
224impl TimerDequeueTicketIssuer
225{ 
226    /// Initializes new instance.
227    fn new(abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, TimerDequeueTicket)>
228    {
229        let abs_time_nsec = abs_time_nsec & 999_999_999;
230
231        let cur = common::get_current_timestamp();
232        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
233
234        if TimerExpMode::from(cur) > req
235        {
236            timer_err!(TimerErrorType::Expired, "time already expired");
237        }
238
239        
240        let ext_ticket = 
241            TimerDequeueTicket::new();
242
243        let int_ticket = 
244            Self
245            {
246                weak_status: ext_ticket.pair(),
247                timeout_absolute: req
248            };
249
250
251        return Ok((int_ticket, ext_ticket));
252    }
253
254    /// Returns the absolute timeout of the current instance.
255    fn get_time_until(&self) -> TimerExpMode
256    {
257        return self.timeout_absolute;
258    }
259
260    /// Checks if the instance is still actual i.e was not dropeed.
261    fn is_valid(&self) -> bool
262    {
263        return self.weak_status.upgrade().is_some();
264    }
265
266    /// Obtains the [TimerDequeueId] only if the [Weak] pointer `weak_status`
267    /// to ID is still actual.
268    fn into_inner(&self) -> Option<TimerDequeueId>
269    {
270        return self.weak_status.upgrade().map(|f| *f.as_ref());
271    }
272}
273
274
275impl OrderedTimerDequeIntrf for TimerDequeueTicketIssuer
276{
277    /// No target is requred.
278    type Target = common::NoTarget;
279
280    /// Return the ticket.
281    type Ticket = TimerDequeueTicket;
282
283    #[inline]
284    fn wrap(_target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
285    {
286        return Self::new(abs_time_sec, abs_time_nsec);
287    }
288
289    #[inline]
290    fn get_timeout_absolute(&self) -> TimerExpMode 
291    {
292        return self.timeout_absolute;
293    }
294}
295
296impl OrderedTimerDeque<TimerDequeueTicketIssuer>
297{
298    /// Adds the new absolute timeout to the timer dequeue.
299    /// 
300    /// # Arguemnts
301    /// 
302    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
303    /// 
304    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
305    /// 
306    /// # Returns
307    /// 
308    /// A [Result] as alias [TimerResult] is returned with:
309    /// 
310    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
311    /// 
312    /// * [Result::Err] with error description.
313    #[inline]
314    pub   
315    fn add_to_timer(&mut self, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<TimerDequeueTicket>
316    {
317        return self.add_to_timer_local(common::NoTarget, abs_time_sec, abs_time_nsec);
318    }
319
320    /// Removes the instance from the queue by the issued `ticket`.
321    /// 
322    /// # Arguments
323    /// 
324    /// `ticket` - [TimerDequeueTicket] an issued ticket.
325    /// 
326    /// # Returns
327    /// 
328    /// A [Result] as alias [TimerResult] is returned with:
329    /// 
330    /// * [Result::Ok] witout any innder data.
331    /// 
332    /// * [Result::Err] with error description.
333    pub 
334    fn remove_from_sched_queue(&mut self, ticket: TimerDequeueTicket) -> TimerResult<()>
335    {
336        if self.deque_timeout_list.len() == 0
337        {
338            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
339        }
340        else
341        {
342            // search for the item in list
343            if self.deque_timeout_list.len() == 1
344            {
345                // just pop the from the front
346                let _ = self.deque_timeout_list.pop_front();
347
348                // stop timer
349                self.stop_timer()?;
350
351                return Ok(());
352            }
353            else
354            {
355                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
356                // succesfully for temoved instance.
357
358                for (pos, q_item) 
359                in self.deque_timeout_list.iter().enumerate()
360                {
361                    if q_item == &ticket
362                    {
363                        // remove by the index
364                        let _ = 
365                            self.deque_timeout_list.remove(pos).unwrap().into_inner();
366
367                        // call timer reset if index is 0 (front)
368                        if pos == 0 
369                        {
370                            self.reschedule_timer()?;
371                        }
372
373                        return Ok(());
374                    }
375                }
376
377                return Ok(());
378            }
379        }
380    }
381
382    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
383    /// or [Self::poll] can be used to obtain the event.
384    /// 
385    /// # Arguments
386    /// 
387    /// `res` - [TimerReadRes] an event from the timer to handle.
388    /// 
389    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
390    /// 
391    /// A [Result] as alias [TimerResult] is returned with:
392    /// 
393    /// * [Result::Ok] witout any innder data.
394    /// 
395    /// * [Result::Err] with error description.
396    pub 
397    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>, timeout_items: &mut Vec<TimerDequeueId>) -> TimerResult<()>
398    {
399        // ignore wouldblock
400        if let TimerReadRes::WouldBlock = res
401        {
402            return Ok(());
403        }
404        
405        // check if the time reset is required
406        let force_timer_resched = 
407            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
408
409            
410        let mut to_remove: Vec<TimerDequeueId> = Vec::new();
411        let mut error: Option<TimerError> = None;
412        let mut last_time_until: TimerExpMode = TimerExpMode::None;
413        let cur_timestamp = common::get_current_timestamp();
414
415        loop 
416        {
417            // get from front of the queue
418            let Some(front_entity) = 
419                self.deque_timeout_list.front() 
420                else 
421                { 
422                    // queue is empty, stop timer if needed.
423                    let Err(e) = self.stop_timer()
424                    else
425                    {
426                        break;
427                    };
428
429                    error = Some(e);
430
431                    break;
432                };
433
434            let time_until = front_entity.get_time_until();
435
436            if time_until <= TimerExpMode::from(cur_timestamp)
437            {
438                if let Some(item) = self.deque_timeout_list.pop_front().unwrap().into_inner()
439                {
440                    to_remove.push(item);
441                }
442
443                // else item has gone previously or while we was messing around with queue
444            }
445            else
446            {
447                // we need to rearm the timer if so
448                if last_time_until != TimerExpMode::None || force_timer_resched == true
449                {
450                    // call timer reshedule
451                    self.reschedule_timer()?;
452                }
453
454                break;
455            }
456
457            last_time_until = time_until;
458        }
459
460        *timeout_items = to_remove;
461
462        return error.map_or(Ok(()), |f| Err(f));
463    }
464
465}
466
467#[cfg(test)]
468mod tests
469{
470    use std::time::{Duration, Instant};
471
472    use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_tickets::{TimerDequeueId, TimerDequeueTicketIssuer}};
473
474    #[test]
475    fn test_ticket_0()
476    {
477        let mut time_list = 
478            OrderedTimerDeque
479                ::<TimerDequeueTicketIssuer>
480                ::new("test_label".into(), 4, false).unwrap();
481
482
483        let s = Instant::now();
484
485        let tss_set = common::get_current_timestamp().timestamp()+2;
486        
487        let ticket = time_list.add_to_timer(tss_set, 0).unwrap();
488
489        println!("ticket issued: {}", ticket);
490
491        for _ in 0..3
492        {
493            let event = time_list.wait_for_event().unwrap();
494            if let TimerReadRes::WouldBlock = event
495            {
496                std::thread::sleep(Duration::from_millis(1001));
497
498                continue;
499            }
500            
501            assert_eq!(event, TimerReadRes::Ok(1));
502
503            let e = s.elapsed();
504            let ts = common::get_current_timestamp();
505
506            assert_eq!(ts.timestamp(), tss_set);
507            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
508
509            let mut tm_items: Vec<TimerDequeueId> = Vec::with_capacity(1);
510
511            time_list.timeout_event_handler(event, &mut tm_items).unwrap();
512
513            println!("tickets: {}", tm_items[0]);
514
515            assert_eq!(tm_items.len(), 1);
516            assert_eq!(tm_items.first().is_some(), true);
517            assert_eq!(tm_items.first().unwrap(), &ticket);
518
519            return;
520        }
521
522        panic!("timeout befor timer!");
523    }
524}