timer-deque-rs 0.8.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
use std::{cmp, fmt, sync::{Arc, atomic::{AtomicBool, Ordering}, mpsc}, time::Duration};

use crate::{AbsoluteTime, TimerDequeConsumer, TimerDequeTicketIssuer, TimerPoll, deque_timeout::{DequeOnce, OrderTimerDeque}, timer_portable::{PollEventType, PolledTimerFd}};

#[derive(PartialEq, Eq, Debug, Clone)]
pub struct TestItem
{
    n: usize
}

impl fmt::Display for TestItem
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        return write!(f, "TestItem {}", self.n);
    }
}

impl TestItem
{
    fn new(n: usize) -> Self
    {
        return Self{n};
    }
}

#[test]
fn test_poll()
{
    let poll = TimerPoll::new().unwrap();

    let timer1 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeTicketIssuer<_>>
            ::new("test_label1".into(), 4, false, true).unwrap();

    let mut time_list_1 = poll.add(timer1).unwrap();

    let mut timer2 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
            ::new("test_label2".into(), 4, false, true).unwrap();

    // setting timer with offset 2 sec
    let tss = AbsoluteTime::now();
    let tss_time = tss + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    let Some(items) = poll.poll(None).unwrap()
        else { return };
    
    let tss_time_rs = AbsoluteTime::now();

    assert_eq!(tss_time_rs.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in items
    {
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
    }


    // adding new timer t poll
    let mut time_list_2 = poll.add(timer2).unwrap();



    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
    
    println!("2: ticket: {}", ticket);

    let Some(items) = poll.poll(None).unwrap()
        else { return };

    println!("3: {:?}", items);

    for item in items
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
    }

    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);
    let tss_r = Arc::new(TestItem::new(6));

    time_list_2.get_inner_mut().add(tss_r.clone(), tss_set).unwrap();
    
    let Some(items) = poll.poll(None).unwrap()
        else { return };

    println!("4: {:?}", items);

    for item in items
    {    
        let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
    }
    
}

#[cfg(all(target_family = "unix", feature = "enable_mio_compat"))]
mod test_mio_compat
{
    use std::{cmp, sync::Arc, time::Duration};

    use mio::{Events, Interest, Poll};

    use crate::{AbsoluteTime, TimerDequeConsumer, TimerDequeTicketIssuer, TimerFdMioCompat, deque_timeout::{DequeOnce, OrderTimerDeque, tests::TestItem}};

    #[test]
    fn test_poll_mio()
    {
        let mut poll = Poll::new().unwrap();
        let mut events = Events::with_capacity(2);

        let mut timer1 = 
            OrderTimerDeque
                ::<DequeOnce, TimerDequeTicketIssuer<_>>
                ::new("test_label1".into(), 4, false, true).unwrap();

        let mut timer2 = 
            OrderTimerDeque
                ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
                ::new("test_label2".into(), 4, false, true).unwrap();

        let tok = timer1.get_token();
        poll
            .registry()
            .register(&mut timer1, tok, Interest::READABLE)
            .unwrap();

        // setting timer with offset 2 sec
        let tss = AbsoluteTime::now();
        let tss_time = tss + Duration::from_secs(2);
        let tss_set = DequeOnce::new(tss_time);

        let ticket = timer1.add(tss_set).unwrap();

        poll.poll(&mut events, None).unwrap();
        
        let tss_time_rs = AbsoluteTime::now();

        assert_eq!(tss_time_rs.seconds_cmp(&tss_time), cmp::Ordering::Equal);

        assert_eq!(events.is_empty(), false);

        for ev in events.iter()
        {
            assert_eq!(ev.token(), timer1.get_token());
            
            let res = timer1.wait_for_event_and_process().unwrap().unwrap();
            assert_eq!(res.len(), 1);

            println!("res: {:?}", res);
        }


        // adding new timer t poll
        let tok = timer2.get_token();
        poll
            .registry()
            .register(&mut timer2, tok, Interest::READABLE)
            .unwrap();


        let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
        let tss_set = DequeOnce::new(tss_time);

        let ticket = timer1.add(tss_set).unwrap();
        
        println!("2: ticket: {}", ticket);

        poll.poll(&mut events, None).unwrap();

        assert_eq!(events.is_empty(), false);

        for ev in events.iter()
        {
            assert_eq!(ev.token(), timer1.get_token());
            
            let res = timer1.wait_for_event_and_process().unwrap().unwrap();
            assert_eq!(res.len(), 1);

            println!("res: {:?}", res);
        }

        let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
        let tss_set = DequeOnce::new(tss_time);
        let tss_r = Arc::new(TestItem::new(6));

        timer2.add(tss_r.clone(), tss_set).unwrap();
        
        poll.poll(&mut events, None).unwrap();

        assert_eq!(events.is_empty(), false);

        for ev in events.iter()
        {
            assert_eq!(ev.token(), timer2.get_token());
            
            let res = timer2.wait_for_event_and_process().unwrap().unwrap();
            assert_eq!(res.len(), 1);

            println!("res: {:?}", res);
        }
        
    }
}

#[test]
fn test_ticket_multithread()
{
    let exit_flag = Arc::new(AtomicBool::new(false));
    let (tx, rx) = mpsc::channel::<Vec<PollEventType>>();

   let poll = TimerPoll::new().unwrap();

    let timer1 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeTicketIssuer<_>>
            ::new("test_label1".into(), 4, false, true).unwrap();

    let mut time_list_1 = poll.add(timer1).unwrap();

    let mut timer2 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
            ::new("test_label2".into(), 4, false, true).unwrap();

    

    let c_poll = poll.clone();
    let c_exit_flag = exit_flag.clone();

    let poll_thread = 
        std::thread::spawn(move || 
            {
                while c_exit_flag.load(Ordering::Relaxed) == false
                {
                    let Some(items) = 
                        c_poll.poll(None).unwrap()
                    else
                        { continue; };
                
                    println!("ev: {:?}", items);

                    tx.send(items).unwrap();
                }

                return;
            }
        );

    std::thread::sleep(Duration::from_micros(100));

    // adding new timer t poll
    let mut time_list_2 = poll.add(timer2).unwrap();


    //poll.interrupt_poll();

    // setting timer with offset 2 sec
    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
    let tss_time_e = AbsoluteTime::now();

    assert_eq!(tss_time_e.seconds_cmp(&tss_time), cmp::Ordering::Equal);
    println!("{}, {}, {} {} {:?}", tss_time, tss_time_e, tss_time_e-tss_time, ticket, res);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    let tss_time2 = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set2 = DequeOnce::new(tss_time2);

    let item2 = Arc::new(TestItem::new(3));
    time_list_2.get_inner_mut().add(item2.clone(), tss_set2).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(4)).unwrap();
    println!("{:?}", res);

    for item in res
    {    
        let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ---- POSTPONE ----
    let tss = AbsoluteTime::now();
    let mut tss_time = tss + Duration::from_secs(3);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
    {
        panic!("error");
    }
    
    time_list_1.get_inner_mut().postpone(&ticket.get_deque_id(), Duration::from_secs(2).into()).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();

    println!("{:?}", res);

    let tss_now = AbsoluteTime::now();
    tss_time += Duration::from_secs(2);

    assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ----RESCHED----

    let tss = AbsoluteTime::now();
    let mut tss_time = tss + Duration::from_secs(3);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
    {
        panic!("error");
    }
    
    tss_time += Duration::from_secs(2);
    let tss_set2 = DequeOnce::new(tss_time);

    time_list_1.get_inner_mut().reschedule(&ticket.get_deque_id(), tss_set2).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();

    println!("{:?}", res);

    let tss_now = AbsoluteTime::now();

    assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ---
    
    exit_flag.store(true, Ordering::Relaxed);

    poll.interrupt_poll();

    std::thread::sleep(Duration::from_millis(1));

    return;

}