timer-deque-rs 0.6.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
use std::{cmp, fmt, sync::{Arc, atomic::{AtomicBool, Ordering}, mpsc}, time::Duration};

use crate::{AbsoluteTime, TimerDequeConsumer, TimerDequeTicketIssuer, TimerPoll, deque_timeout::{DequeOnce, OrderTimerDeque}, timer_portable::{PollEventType, PolledTimerFd}};

#[derive(PartialEq, Eq, Debug, Clone)]
pub struct TestItem
{
    n: usize
}

impl fmt::Display for TestItem
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        return write!(f, "TestItem {}", self.n);
    }
}

impl TestItem
{
    fn new(n: usize) -> Self
    {
        return Self{n};
    }
}

#[test]
fn test_poll()
{
    let poll = TimerPoll::new().unwrap();

    let timer1 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeTicketIssuer<_>>
            ::new("test_label1".into(), 4, false, true).unwrap();

    let mut time_list_1 = poll.add(timer1).unwrap();

    let mut timer2 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
            ::new("test_label2".into(), 4, false, true).unwrap();

    // setting timer with offset 2 sec
    let tss = AbsoluteTime::now();
    let tss_time = tss + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    let Some(items) = poll.poll(None).unwrap()
        else { return };
    
    let tss_time_rs = AbsoluteTime::now();

    assert_eq!(tss_time_rs.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in items
    {
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
    }


    // adding new timer t poll
    let mut time_list_2 = poll.add(timer2).unwrap();



    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
    
    println!("2: ticket: {}", ticket);

    let Some(items) = poll.poll(None).unwrap()
        else { return };

    println!("3: {:?}", items);

    for item in items
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
    }

    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);
    let tss_r = Arc::new(TestItem::new(6));

    time_list_2.get_inner_mut().add(tss_r.clone(), tss_set).unwrap();
    
    let Some(items) = poll.poll(None).unwrap()
        else { return };

    println!("4: {:?}", items);

    for item in items
    {    
        let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
    }
    
}


#[test]
fn test_ticket_multithread()
{
    let exit_flag = Arc::new(AtomicBool::new(false));
    let (tx, rx) = mpsc::channel::<Vec<PollEventType>>();

   let poll = TimerPoll::new().unwrap();

    let timer1 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeTicketIssuer<_>>
            ::new("test_label1".into(), 4, false, true).unwrap();

    let mut time_list_1 = poll.add(timer1).unwrap();

    let mut timer2 = 
        OrderTimerDeque
            ::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
            ::new("test_label2".into(), 4, false, true).unwrap();

    

    let c_poll = poll.clone();
    let c_exit_flag = exit_flag.clone();

    let poll_thread = 
        std::thread::spawn(move || 
            {
                while c_exit_flag.load(Ordering::Relaxed) == false
                {
                    let Some(items) = 
                        c_poll.poll(None).unwrap()
                    else
                        { continue; };
                
                    println!("ev: {:?}", items);

                    tx.send(items).unwrap();
                }

                return;
            }
        );

    std::thread::sleep(Duration::from_micros(100));

    // adding new timer t poll
    let mut time_list_2 = poll.add(timer2).unwrap();


    //poll.interrupt_poll();

    // setting timer with offset 2 sec
    let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
    let tss_time_e = AbsoluteTime::now();

    assert_eq!(tss_time_e.seconds_cmp(&tss_time), cmp::Ordering::Equal);
    println!("{}, {}, {} {} {:?}", tss_time, tss_time_e, tss_time_e-tss_time, ticket, res);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    let tss_time2 = AbsoluteTime::now() + Duration::from_secs(2);
    let tss_set2 = DequeOnce::new(tss_time2);

    let item2 = Arc::new(TestItem::new(3));
    time_list_2.get_inner_mut().add(item2.clone(), tss_set2).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(4)).unwrap();
    println!("{:?}", res);

    for item in res
    {    
        let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ---- POSTPONE ----
    let tss = AbsoluteTime::now();
    let mut tss_time = tss + Duration::from_secs(3);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
    {
        panic!("error");
    }
    
    time_list_1.get_inner_mut().postpone(&ticket.get_deque_id(), Duration::from_secs(2).into()).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();

    println!("{:?}", res);

    let tss_now = AbsoluteTime::now();
    tss_time += Duration::from_secs(2);

    assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ----RESCHED----

    let tss = AbsoluteTime::now();
    let mut tss_time = tss + Duration::from_secs(3);
    let tss_set = DequeOnce::new(tss_time);

    let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();

    if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
    {
        panic!("error");
    }
    
    tss_time += Duration::from_secs(2);
    let tss_set2 = DequeOnce::new(tss_time);

    time_list_1.get_inner_mut().reschedule(&ticket.get_deque_id(), tss_set2).unwrap();

    // receive timer timeout
    let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();

    println!("{:?}", res);

    let tss_now = AbsoluteTime::now();

    assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);

    for item in res
    {    
        let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
        println!("res: {:?}", res);
    }

    // ---
    
    exit_flag.store(true, Ordering::Relaxed);

    poll.interrupt_poll();

    std::thread::sleep(Duration::from_millis(1));

    return;

}