timer-deque-rs 0.8.0

A OS based timer and timer queue which implements timeout queues of different types.
Documentation
use std::{borrow::Borrow, collections::HashSet, fmt, os::fd::AsRawFd, sync::{atomic::{AtomicBool, Ordering}, mpsc, Arc}, time::Duration};
use std::hash::Hash;
use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::{poll::{PollEventType, PollResult}, timer::{AbsoluteTime, RelativeTime}}, OrderedTimerDeque, TimerDequeConsumer, TimerPoll};


#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct TestItem(u64);

impl fmt::Display for TestItem
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result 
    {
        write!(f, "0 = {}", self.0)
    }
}

struct ArcWrap((Arc<TestItem>, AbsoluteTime));

impl Hash for ArcWrap
{
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) 
    {
        self.0.0.as_ref().0.hash(state);
    }
}

impl Eq for ArcWrap {}

impl PartialEq for ArcWrap
{
    fn eq(&self, other: &Self) -> bool 
    {
        self.0.0.as_ref().0 == other.0.0.as_ref().0
    }
}

impl PartialEq<Arc<TestItem>> for ArcWrap
{
    fn eq(&self, other: &Arc<TestItem>) -> bool 
    {
        self.0.0.as_ref().0 == other.0
    }
}

impl Borrow<Arc<TestItem>> for ArcWrap
{
    fn borrow(&self) -> &Arc<TestItem> 
    {
        return &self.0.0;
    }
}


impl AsRef<Arc<TestItem>> for ArcWrap
{
    fn as_ref(&self) -> &Arc<TestItem> 
    {
        return &self.0.0;
    }
}

#[test]
fn thread_test()
{
    let ev_watch = TimerPoll::new().unwrap();
    
    let mut time_list = 
        OrderedTimerDeque
            ::<TimerDequeConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
            ::new("test_label".into(), 4, false).unwrap();

    // add timer to event 
    ev_watch.add(&time_list).unwrap();

    let thread1_ctrl = Arc::new(AtomicBool::new(true));
    let thread1_weak = Arc::downgrade(&thread1_ctrl);

    let (snd, rcv) = mpsc::channel::<i32>();

    let (snd2, rcv2) = mpsc::channel::<()>();

    let thread_hndl = 
        std::thread::spawn(move ||
            {
                while thread1_ctrl.load(Ordering::Acquire) == true
                {
                    let events = ev_watch.poll(Option::Some(100)).unwrap().into_inner();

                    for event in events
                    {
                        match event
                        {
                            PollEventType::TimerTimeout(tfd) => 
                            {
                                snd.send(tfd).unwrap();
                                let _ = rcv2.recv();
                                continue;
                            },
                            PollEventType::TimerRemoved(tfd) =>
                            {
                                panic!("timer removed : {}!", tfd);
                            },
                            PollEventType::SubError(e) =>
                            {
                                panic!("error: {}", e);
                            }
                        }
                    }
                }

                return;
            }
        );

    let mut items: HashSet<ArcWrap> = HashSet::with_capacity(100);

    for i in 0..100 
    {


        let tss_set = 
            AbsoluteTime::now() + RelativeTime::new_time(1 + i / 10, 0);

        let ent = Arc::new(TestItem(i as u64));

        time_list.add_to_timer(ent.clone(), tss_set.clone()).unwrap();

        items.insert(ArcWrap((ent, tss_set)));
    }

    while time_list.timer_queue_len() > 0
    {
        let poll_event = rcv.recv().unwrap();

        /*if poll_event.len() != 1  
        {
            println!("empty events!");
            continue;
        }
        else*/ if poll_event != time_list.as_raw_fd()
        {
            println!("unknown fd!");
            continue;
        }

        let events = time_list.wait_for_event().unwrap();
        let mut timeout_items = Vec::<Arc<TestItem>>::with_capacity(10);
        time_list.timeout_event_handler(events, &mut timeout_items).unwrap();

        let cur_tm = common::get_current_timestamp().timestamp();
        
        println!("--- curtime: {}", cur_tm);
        for tm in timeout_items.into_iter()
        {
            {
                let itm = items.get(&tm).unwrap(); 

                println!("timeout id: {}, time: {}", tm.0, itm.0.1);
            }
            items.remove(&tm);
        }

        snd2.send(()).unwrap();
    }

    thread1_weak.upgrade().unwrap().store(false, Ordering::Release);

    thread_hndl.join().unwrap();

    return;
}

#[test]
fn thread_test_cancel()
{
    let ev_watch = TimerPoll::new().unwrap();
    
    let mut time_list = 
        OrderedTimerDeque
            ::<TimerDequeConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
            ::new("test_label".into(), 4, false).unwrap();

    // add timer to event 
    ev_watch.add(&time_list).unwrap();

    let tss_set = 
            AbsoluteTime::now() + RelativeTime::new_time(10, 0);

    let ent = Arc::new(TestItem(2));

    time_list.add_to_timer(ent.clone(), tss_set.clone()).unwrap();

    let thread_hndl = 
        std::thread::spawn(move ||
            {
                  std::thread::sleep(Duration::from_secs(1));
                drop(time_list);

                return;
            }
        );

  //std::thread::sleep(Duration::from_secs(1));
    let res = ev_watch.poll(Some(1500));

    println!("poll: {:?}", res);

    thread_hndl.join().unwrap();

    return;
}