timer_deque_rs/
timer_signal.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::fmt;
16
17use crate::
18{
19    common, 
20    error::{TimerErrorType, TimerResult}, 
21    map_timer_err, 
22    timer::{OrderedTimerDeque, OrderedTimerDequeIntrf}, 
23    timer_err, 
24    timer_portable::timer::TimerExpMode, TimerReadRes
25};
26
27/// A trait which should be implemented on the struct which will be provided
28/// to timer dequeue. The trait implements the uniqid and signalling. There 
29/// are two realizations: `sig_timeout` for sync and `a_sig_timeout` for
30/// async. Both functions must use any methods to send signal but it must
31/// never block the thread!
32pub trait TimerDequeueSignal
33{
34    /// A uniq id type. It is used to identify the instance.
35    type TimerQueueID: Eq + PartialEq + fmt::Display + fmt::Debug;
36
37    /// Error type returned by the signal functions.
38    type TimeoutErr: fmt::Display + fmt::Debug;
39
40    /// This function should return the uniq id of the instance.
41    fn get_id(&self) -> Self::TimerQueueID;
42    
43    /// This function should notify i.e send signal or whatever. The
44    /// important condition is not to block the thread. 
45    fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
46    where Self: Sized
47    {
48        return Ok(());
49    }
50
51    /// This function should notify i.e send signal or whatever. The
52    /// important condition is not to block the thread. 
53    async fn a_sig_timeout(self) -> Result<(), Self::TimeoutErr>
54    where Self: Sized
55    {
56        return Ok(());
57    }
58}
59
60/// Defines the type of the dequeue. This type of the dequeue is sending notification when
61/// the timeout event happens. The signal will be sent using `SIG`.
62/// 
63/// # Arguments
64/// 
65/// `SIG` - is an instance which provides the signaling and ID. It must implement:
66///     [TimerDequeueSignal], [fmt::Debug], [fmt::Display], [Eq], [PartialEq].
67#[derive(Debug)]
68pub struct TimerDequeueSignalTicket<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>
69{
70    /// A notification and id instance.
71    signal: SIG,
72    
73    /// An absolute timestamp in seconds (UTC time) sets the timer.
74    timeout_absolute: TimerExpMode,
75}
76
77impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> TimerDequeueSignalTicket<SIG>
78{
79    pub 
80    fn new(sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
81    {
82        let abs_time_nsec = abs_time_nsec & 999_999_999;
83
84        let cur = common::get_current_timestamp();
85        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
86
87        if TimerExpMode::from(cur) > req
88        {
89            timer_err!(TimerErrorType::Expired, "time already expired");
90        }
91
92        
93        return Ok(
94            Self
95            {
96                signal: 
97                    sig_hnd,
98                timeout_absolute: 
99                    req
100            }
101        );
102    }
103
104    fn get_time_until(&self) -> TimerExpMode
105    {
106        return self.timeout_absolute;
107    }
108
109    fn send_sig_timeout(self) -> TimerResult<()>
110    {
111        return 
112            self
113                .signal
114                .sig_timeout()
115                .map_err(|e|
116                    map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
117                );
118    }
119
120    async 
121    fn async_send_sig_timeout(self) -> TimerResult<()>
122    {
123        return 
124            self
125                .signal
126                .a_sig_timeout()
127                .await
128                .map_err(|e|
129                    map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
130                );
131    }
132}
133
134impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> fmt::Display for TimerDequeueSignalTicket<SIG>
135{  
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
137    { 
138        write!(f, "{} until: {}", self.signal.get_id(), self.timeout_absolute)
139    }
140}
141
142impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Eq for TimerDequeueSignalTicket<SIG> {}
143
144impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>  PartialEq for TimerDequeueSignalTicket<SIG>
145{
146    fn eq(&self, other: &Self) -> bool 
147    {
148        return self.signal == other.signal;
149    }
150}
151
152
153impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Ord for TimerDequeueSignalTicket<SIG>
154{
155    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
156    {
157        return self.timeout_absolute.cmp(&other.timeout_absolute);
158    }
159}
160
161impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialOrd for TimerDequeueSignalTicket<SIG>
162{
163    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
164    {
165        return Some(self.cmp(other));
166    }
167}
168
169impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> OrderedTimerDequeIntrf for TimerDequeueSignalTicket<SIG>
170{
171    type Target = SIG;
172    type Ticket = common::NoTicket;
173
174    #[inline]
175    fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
176    {
177        return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
178    }
179
180    #[inline]
181    fn get_timeout_absolute(&self) -> TimerExpMode 
182    {
183        return self.timeout_absolute;
184    }
185}
186
187impl<SIG> OrderedTimerDeque<TimerDequeueSignalTicket<SIG>>
188where SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq
189{
190    /// Adds the new absolute timeout to the timer dequeue.
191    /// 
192    /// # Arguemnts
193    /// 
194    /// `sig_hnd` - a `SIG` generic instance which would identify the instance and
195    ///     provide identification.
196    /// 
197    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
198    /// 
199    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
200    /// 
201    /// # Returns
202    /// 
203    /// A [Result] as alias [TimerResult] is returned with:
204    /// 
205    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
206    /// 
207    /// * [Result::Err] with error description.
208    #[inline]
209    pub   
210    fn add_to_timer(&mut self, sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
211    {
212        return self.add_to_timer_local(sig_hnd, abs_time_sec, abs_time_nsec);
213    }
214
215    /// Removes the instance from the queue by the `SIG::TimerQueueID` ID.
216    /// 
217    /// # Arguments
218    /// 
219    /// `arg_uniq_id` - a reference to instance's ID `SIG::TimerQueueID`.
220    /// 
221    /// # Returns
222    /// 
223    /// A [Result] as alias [TimerResult] is returned with:
224    /// 
225    /// * [Result::Ok] witout any innder data.
226    /// 
227    /// * [Result::Err] with error description.
228    pub 
229    fn remove_from_sched_queue(&mut self, arg_uniq_id: &SIG::TimerQueueID) -> TimerResult<Option<()>>
230    {
231        if self.deque_timeout_list.len() == 0
232        {
233            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
234        }
235        else
236        {
237            // search for the item in list
238            if self.deque_timeout_list.len() == 1
239            {
240                // just pop the from the front
241                let _ = self.deque_timeout_list.pop_front().unwrap();
242
243                // stop timer
244                self.stop_timer()?;
245
246                return Ok(Some(()));
247            }
248            else
249            {
250                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
251                // succesfully for temoved instance.
252
253                for (pos, q_item) 
254                in self.deque_timeout_list.iter().enumerate()
255                {
256                    if &q_item.signal.get_id() == arg_uniq_id
257                    {
258                        // remove by the index
259                        let _ = 
260                            self.deque_timeout_list.remove(pos);
261
262                        // call timer reset if index is 0 (front)
263                        if pos == 0 
264                        {
265                            self.reschedule_timer()?;
266                        }
267
268                        return Ok(Some(()));
269                    }
270                }
271
272                return Ok(None);
273            }
274        }
275    }
276
277    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
278    /// or [Self::poll] can be used to obtain the event.
279    /// 
280    /// # Arguments
281    /// 
282    /// `res` - [TimerReadRes] an event from the timer to handle.
283    /// 
284    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
285    /// 
286    /// A [Result] as alias [TimerResult] is returned with:
287    /// 
288    /// * [Result::Ok] witout any innder data.
289    /// 
290    /// * [Result::Err] with error description.
291    pub 
292    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
293    {
294        // ignore wouldblock
295        if let TimerReadRes::WouldBlock = res
296        {
297            return Ok(());
298        }
299        
300        // check if the time reset is required
301        let force_timer_resched = 
302            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
303
304        let mut error: TimerResult<()> = Ok(());
305        let mut last_time_until: TimerExpMode = TimerExpMode::None;
306
307        let cur_timestamp = common::get_current_timestamp();
308
309        loop 
310        {
311            // get from front of the queue
312            let Some(front_entity) = 
313                self.deque_timeout_list.front() 
314                else 
315                { 
316                    self.stop_timer()?;
317
318                    return Ok(());
319                };
320
321            let time_until = front_entity.get_time_until();
322
323            if time_until <= TimerExpMode::from(cur_timestamp)
324            {
325                let front_entity = self.deque_timeout_list.pop_front().unwrap();
326
327                // send signal
328                error = front_entity.send_sig_timeout();
329            }
330            else
331            {
332                // we need to rearm the timer if so
333                if last_time_until != TimerExpMode::None || force_timer_resched == true
334                {
335                    // call timer reshedule
336                    self.reschedule_timer()?;
337                }
338
339                break;
340            }
341
342            last_time_until = time_until;
343        } // loop
344
345
346        return error;
347    }
348
349    /// An async version of the event handler.
350    /// 
351    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
352    /// or [Self::poll] can be used to obtain the event.
353    /// 
354    /// # Arguments
355    /// 
356    /// `res` - [TimerReadRes] an event from the timer to handle.
357    /// 
358    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
359    /// 
360    /// A [Result] as alias [TimerResult] is returned with:
361    /// 
362    /// * [Result::Ok] witout any innder data.
363    /// 
364    /// * [Result::Err] with error description.
365    pub async
366    fn async_timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
367    {
368        // ignore wouldblock
369        if let TimerReadRes::WouldBlock = res
370        {
371            return Ok(());
372        }
373        
374        // check if the time reset is required
375        let force_timer_resched = 
376            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
377
378        let mut error: TimerResult<()> = Ok(());
379        let mut last_time_until: TimerExpMode = TimerExpMode::None;
380
381        let cur_timestamp = common::get_current_timestamp();
382
383        loop 
384        {
385            // get from front of the queue
386            let Some(front_entity) = 
387                self.deque_timeout_list.front() 
388                else 
389                { 
390                    self.stop_timer()?;
391
392                    return Ok(());
393                };
394
395            let time_until = front_entity.get_time_until();
396
397            if time_until <= TimerExpMode::from(cur_timestamp)
398            {
399                let front_entity = self.deque_timeout_list.pop_front().unwrap();
400
401                // send signal
402                error = front_entity.async_send_sig_timeout().await;
403            }
404            else
405            {
406                // we need to rearm the timer if so
407                if last_time_until != TimerExpMode::None || force_timer_resched == true
408                {
409                    // call timer reshedule
410                    self.reschedule_timer()?;
411                }
412
413                break;
414            }
415
416            last_time_until = time_until;
417        } // loop
418
419
420        return error;
421    }
422
423}
424
425
426#[cfg(test)]
427mod tests
428{
429    use std::{collections::VecDeque, fmt, sync::mpsc::{self, SendError, Sender}, time::{Duration, Instant}};
430
431    use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_signal::{TimerDequeueSignal, TimerDequeueSignalTicket}};
432
433    #[test]
434    fn test_timer_test()
435    {
436        #[derive(Debug)]
437        struct TestSigStruct
438        {
439            uniq_id: u64,
440            mpsc_sender: Sender<u64>,
441        }
442
443        impl Eq for TestSigStruct{}
444
445        impl PartialEq for TestSigStruct
446        {
447            fn eq(&self, other: &Self) -> bool 
448            {
449                return self.uniq_id == other.uniq_id;
450            }
451        }
452
453        impl fmt::Display for TestSigStruct
454        {
455            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
456            {
457                write!(f, "sig handler timer id: {}", self.uniq_id)
458            }
459        }
460
461        impl TimerDequeueSignal for TestSigStruct
462        {
463            type TimerQueueID = u64;
464        
465            type TimeoutErr = SendError<Self::TimerQueueID>;
466        
467            fn get_id(&self) -> Self::TimerQueueID 
468            {
469                return self.uniq_id;
470            }
471        
472            fn sig_timeout(self) -> Result<(), Self::TimeoutErr> 
473            {
474                return self.mpsc_sender.send(self.uniq_id);
475            }
476        }
477
478         let mut time_list = 
479            OrderedTimerDeque
480                ::<TimerDequeueSignalTicket<TestSigStruct>>
481                ::new("test_label".into(), 4, false).unwrap();
482
483
484        let s = Instant::now();
485
486        let tss_set = common::get_current_timestamp().timestamp()+2;
487
488        let (snd, rcv) = mpsc::channel::<u64>();
489
490        let sig_tm = TestSigStruct{ uniq_id: 456, mpsc_sender: snd };
491
492        println!("signal handler: {}", sig_tm);
493        
494        let _ = time_list.add_to_timer(sig_tm, tss_set, 0).unwrap();
495
496        for _ in 0..3
497        {
498            let event = time_list.wait_for_event().unwrap();
499            if let TimerReadRes::WouldBlock = event
500            {
501                std::thread::sleep(Duration::from_millis(1001));
502
503                continue;
504            }
505
506            assert_eq!(event, TimerReadRes::Ok(1));
507
508            let e = s.elapsed();
509            let ts = common::get_current_timestamp();
510
511            assert_eq!(ts.timestamp(), tss_set);
512            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
513
514            time_list.timeout_event_handler(event).unwrap();
515
516            let timer_id = rcv.recv_timeout(Duration::from_secs(1)).unwrap();
517
518            assert_eq!(timer_id, 456);
519
520            return;
521        }
522
523        panic!("timeout befor timer!");
524    }
525
526    #[test]
527    fn test_pop()
528    {
529        let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
530
531        ve.push_back([0_u8; 32]);
532        ve.push_back([1_u8; 32]);
533        ve.push_back([2_u8; 32]);
534        ve.push_back([3_u8; 32]);
535
536        let mut small: Option<Duration> = None;
537        let mut large: Option<Duration> = None;
538        let mut some_cnt: u32 = 0;
539        let mut mean: u128 = 0;
540        
541        for _ in 0..5000
542        {
543            let s = Instant::now();
544
545            let v = ve.pop_front().unwrap();
546
547            if v.len() == 32
548            {
549                some_cnt += 1;
550            }
551
552            ve.push_front(v);
553
554            let e = s.elapsed();
555
556            if small.is_none() == true
557            {
558                small = Some(e);
559            }
560            else if small.as_ref().unwrap() > &e
561            {
562                small = Some(e);
563            }
564
565            if large.is_none() == true
566            {
567                large = Some(e);
568            }
569            else if large.as_ref().unwrap() < &e
570            {
571                large = Some(e);
572            }
573
574            mean += e.as_nanos();
575        }
576
577        println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
578    }
579
580    #[test]
581    fn test_pop2()
582    {
583        let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
584
585        ve.push_back([0_u8; 32]);
586        ve.push_back([1_u8; 32]);
587        ve.push_back([2_u8; 32]);
588        ve.push_back([3_u8; 32]);
589
590        let mut small: Option<Duration> = None;
591        let mut large: Option<Duration> = None;
592        let mut some_cnt: u32 = 0;
593        let mut mean: u128 = 0;
594        
595        for _ in 0..5000
596        {
597            let s = Instant::now();
598
599            let v = ve.front().unwrap();
600
601            if v.len() == 32
602            {
603                some_cnt += 1;
604            }
605
606            let v = ve.pop_front().unwrap();
607            
608
609            let e = s.elapsed();
610            ve.push_front(v);
611
612            if small.is_none() == true
613            {
614                small = Some(e);
615            }
616            else if small.as_ref().unwrap() > &e
617            {
618                small = Some(e);
619            }
620
621            if large.is_none() == true
622            {
623                large = Some(e);
624            }
625            else if large.as_ref().unwrap() < &e
626            {
627                large = Some(e);
628            }
629
630            mean += e.as_nanos();
631        }
632
633        println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
634    }
635}