timer_deque_rs/
timer_signal.rs

1/*-
2 * timer-rs - a Rust crate which provides timer and timer queues based on target OS
3 *  functionality.
4 * 
5 * Copyright (C) 2025 Aleksandr Morozov
6 * 
7 * The timer-rs crate can be redistributed and/or modified
8 * under the terms of either of the following licenses:
9 *
10 *   1. the Mozilla Public License Version 2.0 (the “MPL”) OR
11 *                     
12 *   2. EUROPEAN UNION PUBLIC LICENCE v. 1.2 EUPL © the European Union 2007, 2016
13 */
14
15use std::fmt;
16
17use crate::
18{
19    common, 
20    error::{TimerErrorType, TimerResult}, 
21    map_timer_err, 
22    timer::{OrderedTimerDeque, OrderedTimerDequeIntrf}, 
23    timer_err, 
24    timer_portable::timer::TimerExpMode, TimerReadRes
25};
26
27/// A trait which should be implemented on the struct which will be provided
28/// to timer dequeue. The trait implements the uniqid and signalling. There 
29/// are two realizations: `sig_timeout` for sync and `a_sig_timeout` for
30/// async. Both functions must use any methods to send signal but it must
31/// never block the thread!
32pub trait TimerDequeueSignal
33{
34    /// A uniq id type. It is used to identify the instance.
35    type TimerQueueID: Eq + PartialEq + fmt::Display + fmt::Debug;
36
37    /// Error type returned by the signal functions.
38    type TimeoutErr: fmt::Display + fmt::Debug;
39
40    /// This function should return the uniq id of the instance.
41    fn get_id(&self) -> Self::TimerQueueID;
42    
43    /// This function should notify i.e send signal or whatever. The
44    /// important condition is not to block the thread. 
45    fn sig_timeout(self) -> Result<(), Self::TimeoutErr>
46    where Self: Sized
47    {
48        return Ok(());
49    }
50
51    /// This function should notify i.e send signal or whatever. The
52    /// important condition is not to block the thread. 
53    async fn a_sig_timeout(self) -> Result<(), Self::TimeoutErr>
54    where Self: Sized
55    {
56        return Ok(());
57    }
58}
59
60/// Defines the type of the dequeue. This type of the dequeue is sending notification when
61/// the timeout event happens. The signal will be sent using `SIG`.
62/// 
63/// # Arguments
64/// 
65/// `SIG` - is an instance which provides the signaling and ID. It must implement:
66///     [TimerDequeueSignal], [fmt::Debug], [fmt::Display], [Eq], [PartialEq].
67/// 
68/// # Example
69/// 
70/// ```ignore
71/// let mut time_list = 
72///         OrderedTimerDeque
73///             ::<TimerDequeueSignalTicket<TestSigStruct>>
74///             ::new("test_label".into(), 4, false).unwrap();
75/// ```
76#[derive(Debug)]
77pub struct TimerDequeueSignalTicket<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>
78{
79    /// A notification and id instance.
80    signal: SIG,
81    
82    /// An absolute timestamp in seconds (UTC time) sets the timer.
83    timeout_absolute: TimerExpMode,
84}
85
86impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> TimerDequeueSignalTicket<SIG>
87{
88    pub 
89    fn new(sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<Self>
90    {
91        let abs_time_nsec = abs_time_nsec & 999_999_999;
92
93        let cur = common::get_current_timestamp();
94        let req = TimerExpMode::OneShot { sec: abs_time_sec, nsec: abs_time_nsec };
95
96        if TimerExpMode::from(cur) > req
97        {
98            timer_err!(TimerErrorType::Expired, "time already expired");
99        }
100
101        
102        return Ok(
103            Self
104            {
105                signal: 
106                    sig_hnd,
107                timeout_absolute: 
108                    req
109            }
110        );
111    }
112
113    fn get_time_until(&self) -> TimerExpMode
114    {
115        return self.timeout_absolute;
116    }
117
118    fn send_sig_timeout(self) -> TimerResult<()>
119    {
120        return 
121            self
122                .signal
123                .sig_timeout()
124                .map_err(|e|
125                    map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
126                );
127    }
128
129    async 
130    fn async_send_sig_timeout(self) -> TimerResult<()>
131    {
132        return 
133            self
134                .signal
135                .a_sig_timeout()
136                .await
137                .map_err(|e|
138                    map_timer_err!(TimerErrorType::ExternalError, "cannot send signal, error: {}", e)
139                );
140    }
141}
142
143impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> fmt::Display for TimerDequeueSignalTicket<SIG>
144{  
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> 
146    { 
147        write!(f, "{} until: {}", self.signal.get_id(), self.timeout_absolute)
148    }
149}
150
151impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Eq for TimerDequeueSignalTicket<SIG> {}
152
153impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq>  PartialEq for TimerDequeueSignalTicket<SIG>
154{
155    fn eq(&self, other: &Self) -> bool 
156    {
157        return self.signal == other.signal;
158    }
159}
160
161
162impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> Ord for TimerDequeueSignalTicket<SIG>
163{
164    fn cmp(&self, other: &Self) -> std::cmp::Ordering 
165    {
166        return self.timeout_absolute.cmp(&other.timeout_absolute);
167    }
168}
169
170impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> PartialOrd for TimerDequeueSignalTicket<SIG>
171{
172    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> 
173    {
174        return Some(self.cmp(other));
175    }
176}
177
178impl<SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq> OrderedTimerDequeIntrf for TimerDequeueSignalTicket<SIG>
179{
180    type Target = SIG;
181    type Ticket = common::NoTicket;
182
183    #[inline]
184    fn wrap(target: Self::Target, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<(Self, Self::Ticket)> 
185    {
186        return Ok((Self::new(target, abs_time_sec, abs_time_nsec)?, common::NoTicket));
187    }
188
189    #[inline]
190    fn get_timeout_absolute(&self) -> TimerExpMode 
191    {
192        return self.timeout_absolute;
193    }
194}
195
196impl<SIG> OrderedTimerDeque<TimerDequeueSignalTicket<SIG>>
197where SIG: TimerDequeueSignal + fmt::Debug + fmt::Display + Eq + PartialEq
198{
199    /// Adds the new absolute timeout to the timer dequeue.
200    /// 
201    /// # Arguemnts
202    /// 
203    /// `sig_hnd` - a `SIG` generic instance which would identify the instance and
204    ///     provide identification.
205    /// 
206    /// `abs_time_sec` - a [i64] absolute time in seconds. Can not be 0.
207    /// 
208    /// `abs_time_nsec` - a [i64] asolute time in nanosecodns. Can be 0.
209    /// 
210    /// # Returns
211    /// 
212    /// A [Result] as alias [TimerResult] is returned with:
213    /// 
214    /// * [Result::Ok] with the [TimerDequeueTicket] ticket.
215    /// 
216    /// * [Result::Err] with error description.
217    #[inline]
218    pub   
219    fn add_to_timer(&mut self, sig_hnd: SIG, abs_time_sec: i64, abs_time_nsec: i64) -> TimerResult<common::NoTicket>
220    {
221        return self.add_to_timer_local(sig_hnd, abs_time_sec, abs_time_nsec);
222    }
223
224    /// Removes the instance from the queue by the `SIG::TimerQueueID` ID.
225    /// 
226    /// # Arguments
227    /// 
228    /// `arg_uniq_id` - a reference to instance's ID `SIG::TimerQueueID`.
229    /// 
230    /// # Returns
231    /// 
232    /// A [Result] as alias [TimerResult] is returned with:
233    /// 
234    /// * [Result::Ok] witout any innder data.
235    /// 
236    /// * [Result::Err] with error description.
237    pub 
238    fn remove_from_sched_queue(&mut self, arg_uniq_id: &SIG::TimerQueueID) -> TimerResult<Option<()>>
239    {
240        if self.deque_timeout_list.len() == 0
241        {
242            timer_err!(TimerErrorType::QueueEmpty, "queue list is empty!");
243        }
244        else
245        {
246            // search for the item in list
247            if self.deque_timeout_list.len() == 1
248            {
249                // just pop the from the front
250                let _ = self.deque_timeout_list.pop_front().unwrap();
251
252                // stop timer
253                self.stop_timer()?;
254
255                return Ok(Some(()));
256            }
257            else
258            {
259                // in theory the `ticket` is a reference to ARC, so the weak should be upgraded 
260                // succesfully for temoved instance.
261
262                for (pos, q_item) 
263                in self.deque_timeout_list.iter().enumerate()
264                {
265                    if &q_item.signal.get_id() == arg_uniq_id
266                    {
267                        // remove by the index
268                        let _ = 
269                            self.deque_timeout_list.remove(pos);
270
271                        // call timer reset if index is 0 (front)
272                        if pos == 0 
273                        {
274                            self.reschedule_timer()?;
275                        }
276
277                        return Ok(Some(()));
278                    }
279                }
280
281                return Ok(None);
282            }
283        }
284    }
285
286    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
287    /// or [Self::poll] can be used to obtain the event.
288    /// 
289    /// # Arguments
290    /// 
291    /// `res` - [TimerReadRes] an event from the timer to handle.
292    /// 
293    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
294    /// 
295    /// A [Result] as alias [TimerResult] is returned with:
296    /// 
297    /// * [Result::Ok] witout any innder data.
298    /// 
299    /// * [Result::Err] with error description.
300    pub 
301    fn timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
302    {
303        // ignore wouldblock
304        if let TimerReadRes::WouldBlock = res
305        {
306            return Ok(());
307        }
308        
309        // check if the time reset is required
310        let force_timer_resched = 
311            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
312
313        let mut error: TimerResult<()> = Ok(());
314        let mut last_time_until: TimerExpMode = TimerExpMode::None;
315
316        let cur_timestamp = common::get_current_timestamp();
317
318        loop 
319        {
320            // get from front of the queue
321            let Some(front_entity) = 
322                self.deque_timeout_list.front() 
323                else 
324                { 
325                    self.stop_timer()?;
326
327                    return Ok(());
328                };
329
330            let time_until = front_entity.get_time_until();
331
332            if time_until <= TimerExpMode::from(cur_timestamp)
333            {
334                let front_entity = self.deque_timeout_list.pop_front().unwrap();
335
336                // send signal
337                error = front_entity.send_sig_timeout();
338            }
339            else
340            {
341                // we need to rearm the timer if so
342                if last_time_until != TimerExpMode::None || force_timer_resched == true
343                {
344                    // call timer reshedule
345                    self.reschedule_timer()?;
346                }
347
348                break;
349            }
350
351            last_time_until = time_until;
352        } // loop
353
354
355        return error;
356    }
357
358    /// An async version of the event handler.
359    /// 
360    /// Handles the event which was `read` from the timer. The functions [Self::wait_for_event]
361    /// or [Self::poll] can be used to obtain the event.
362    /// 
363    /// # Arguments
364    /// 
365    /// `res` - [TimerReadRes] an event from the timer to handle.
366    /// 
367    /// `timeout_items` - a vector of tickets which were not dropped and timeout.
368    /// 
369    /// A [Result] as alias [TimerResult] is returned with:
370    /// 
371    /// * [Result::Ok] witout any innder data.
372    /// 
373    /// * [Result::Err] with error description.
374    pub async
375    fn async_timeout_event_handler(&mut self, res: TimerReadRes<u64>) -> TimerResult<()>
376    {
377        // ignore wouldblock
378        if let TimerReadRes::WouldBlock = res
379        {
380            return Ok(());
381        }
382        
383        // check if the time reset is required
384        let force_timer_resched = 
385            TimerReadRes::Cancelled == res || TimerReadRes::Ok(0) == res;
386
387        let mut error: TimerResult<()> = Ok(());
388        let mut last_time_until: TimerExpMode = TimerExpMode::None;
389
390        let cur_timestamp = common::get_current_timestamp();
391
392        loop 
393        {
394            // get from front of the queue
395            let Some(front_entity) = 
396                self.deque_timeout_list.front() 
397                else 
398                { 
399                    self.stop_timer()?;
400
401                    return Ok(());
402                };
403
404            let time_until = front_entity.get_time_until();
405
406            if time_until <= TimerExpMode::from(cur_timestamp)
407            {
408                let front_entity = self.deque_timeout_list.pop_front().unwrap();
409
410                // send signal
411                error = front_entity.async_send_sig_timeout().await;
412            }
413            else
414            {
415                // we need to rearm the timer if so
416                if last_time_until != TimerExpMode::None || force_timer_resched == true
417                {
418                    // call timer reshedule
419                    self.reschedule_timer()?;
420                }
421
422                break;
423            }
424
425            last_time_until = time_until;
426        } // loop
427
428
429        return error;
430    }
431
432}
433
434
435#[cfg(test)]
436mod tests
437{
438    use std::{collections::VecDeque, fmt, sync::mpsc::{self, SendError, Sender}, time::{Duration, Instant}};
439
440    use crate::{common, timer::OrderedTimerDeque, timer_portable::timer::TimerReadRes, timer_signal::{TimerDequeueSignal, TimerDequeueSignalTicket}};
441
442    #[test]
443    fn test_timer_test()
444    {
445        #[derive(Debug)]
446        struct TestSigStruct
447        {
448            uniq_id: u64,
449            mpsc_sender: Sender<u64>,
450        }
451
452        impl Eq for TestSigStruct{}
453
454        impl PartialEq for TestSigStruct
455        {
456            fn eq(&self, other: &Self) -> bool 
457            {
458                return self.uniq_id == other.uniq_id;
459            }
460        }
461
462        impl fmt::Display for TestSigStruct
463        {
464            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
465            {
466                write!(f, "sig handler timer id: {}", self.uniq_id)
467            }
468        }
469
470        impl TimerDequeueSignal for TestSigStruct
471        {
472            type TimerQueueID = u64;
473        
474            type TimeoutErr = SendError<Self::TimerQueueID>;
475        
476            fn get_id(&self) -> Self::TimerQueueID 
477            {
478                return self.uniq_id;
479            }
480        
481            fn sig_timeout(self) -> Result<(), Self::TimeoutErr> 
482            {
483                return self.mpsc_sender.send(self.uniq_id);
484            }
485        }
486
487         let mut time_list = 
488            OrderedTimerDeque
489                ::<TimerDequeueSignalTicket<TestSigStruct>>
490                ::new("test_label".into(), 4, false).unwrap();
491
492
493        let s = Instant::now();
494
495        let tss_set = common::get_current_timestamp().timestamp()+2;
496
497        let (snd, rcv) = mpsc::channel::<u64>();
498
499        let sig_tm = TestSigStruct{ uniq_id: 456, mpsc_sender: snd };
500
501        println!("signal handler: {}", sig_tm);
502        
503        let _ = time_list.add_to_timer(sig_tm, tss_set, 0).unwrap();
504
505        for _ in 0..3
506        {
507            let event = time_list.wait_for_event().unwrap();
508            if let TimerReadRes::WouldBlock = event
509            {
510                std::thread::sleep(Duration::from_millis(1001));
511
512                continue;
513            }
514
515            assert_eq!(event, TimerReadRes::Ok(1));
516
517            let e = s.elapsed();
518            let ts = common::get_current_timestamp();
519
520            assert_eq!(ts.timestamp(), tss_set);
521            println!("instant: {:?} ts: {}, timerset: {}", e, ts.timestamp(), tss_set);
522
523            time_list.timeout_event_handler(event).unwrap();
524
525            let timer_id = rcv.recv_timeout(Duration::from_secs(1)).unwrap();
526
527            assert_eq!(timer_id, 456);
528
529            return;
530        }
531
532        panic!("timeout befor timer!");
533    }
534
535    #[test]
536    fn test_pop()
537    {
538        let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
539
540        ve.push_back([0_u8; 32]);
541        ve.push_back([1_u8; 32]);
542        ve.push_back([2_u8; 32]);
543        ve.push_back([3_u8; 32]);
544
545        let mut small: Option<Duration> = None;
546        let mut large: Option<Duration> = None;
547        let mut some_cnt: u32 = 0;
548        let mut mean: u128 = 0;
549        
550        for _ in 0..5000
551        {
552            let s = Instant::now();
553
554            let v = ve.pop_front().unwrap();
555
556            if v.len() == 32
557            {
558                some_cnt += 1;
559            }
560
561            ve.push_front(v);
562
563            let e = s.elapsed();
564
565            if small.is_none() == true
566            {
567                small = Some(e);
568            }
569            else if small.as_ref().unwrap() > &e
570            {
571                small = Some(e);
572            }
573
574            if large.is_none() == true
575            {
576                large = Some(e);
577            }
578            else if large.as_ref().unwrap() < &e
579            {
580                large = Some(e);
581            }
582
583            mean += e.as_nanos();
584        }
585
586        println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
587    }
588
589    #[test]
590    fn test_pop2()
591    {
592        let mut ve = VecDeque::<[u8; 32]>::with_capacity(5);
593
594        ve.push_back([0_u8; 32]);
595        ve.push_back([1_u8; 32]);
596        ve.push_back([2_u8; 32]);
597        ve.push_back([3_u8; 32]);
598
599        let mut small: Option<Duration> = None;
600        let mut large: Option<Duration> = None;
601        let mut some_cnt: u32 = 0;
602        let mut mean: u128 = 0;
603        
604        for _ in 0..5000
605        {
606            let s = Instant::now();
607
608            let v = ve.front().unwrap();
609
610            if v.len() == 32
611            {
612                some_cnt += 1;
613            }
614
615            let v = ve.pop_front().unwrap();
616            
617
618            let e = s.elapsed();
619            ve.push_front(v);
620
621            if small.is_none() == true
622            {
623                small = Some(e);
624            }
625            else if small.as_ref().unwrap() > &e
626            {
627                small = Some(e);
628            }
629
630            if large.is_none() == true
631            {
632                large = Some(e);
633            }
634            else if large.as_ref().unwrap() < &e
635            {
636                large = Some(e);
637            }
638
639            mean += e.as_nanos();
640        }
641
642        println!("s: {:?}, e: {:?}, mean: {} some_cnt: {}", small.unwrap(), large.unwrap(), mean/100, some_cnt);
643    }
644}