use std::{cmp, fmt, sync::{Arc, atomic::{AtomicBool, Ordering}, mpsc}, time::Duration};
use crate::{AbsoluteTime, TimerDequeConsumer, TimerDequeTicketIssuer, TimerPoll, deque_timeout::{DequeOnce, OrderTimerDeque}, timer_portable::{PollEventType, PolledTimerFd}};
#[derive(PartialEq, Eq, Debug, Clone)]
pub struct TestItem
{
n: usize
}
impl fmt::Display for TestItem
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
return write!(f, "TestItem {}", self.n);
}
}
impl TestItem
{
fn new(n: usize) -> Self
{
return Self{n};
}
}
#[test]
fn test_poll()
{
let poll = TimerPoll::new().unwrap();
let timer1 =
OrderTimerDeque
::<DequeOnce, TimerDequeTicketIssuer<_>>
::new("test_label1".into(), 4, false, true).unwrap();
let mut time_list_1 = poll.add(timer1).unwrap();
let mut timer2 =
OrderTimerDeque
::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
::new("test_label2".into(), 4, false, true).unwrap();
let tss = AbsoluteTime::now();
let tss_time = tss + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
let Some(items) = poll.poll(None).unwrap()
else { return };
let tss_time_rs = AbsoluteTime::now();
assert_eq!(tss_time_rs.seconds_cmp(&tss_time), cmp::Ordering::Equal);
for item in items
{
let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
}
let mut time_list_2 = poll.add(timer2).unwrap();
let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
println!("2: ticket: {}", ticket);
let Some(items) = poll.poll(None).unwrap()
else { return };
println!("3: {:?}", items);
for item in items
{
let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
}
let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let tss_r = Arc::new(TestItem::new(6));
time_list_2.get_inner_mut().add(tss_r.clone(), tss_set).unwrap();
let Some(items) = poll.poll(None).unwrap()
else { return };
println!("4: {:?}", items);
for item in items
{
let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
}
}
#[cfg(all(target_family = "unix", feature = "enable_mio_compat"))]
mod test_mio_compat
{
use std::{cmp, sync::Arc, time::Duration};
use mio::{Events, Interest, Poll};
use crate::{AbsoluteTime, TimerDequeConsumer, TimerDequeTicketIssuer, TimerFdMioCompat, deque_timeout::{DequeOnce, OrderTimerDeque, tests::TestItem}};
#[test]
fn test_poll_mio()
{
let mut poll = Poll::new().unwrap();
let mut events = Events::with_capacity(2);
let mut timer1 =
OrderTimerDeque
::<DequeOnce, TimerDequeTicketIssuer<_>>
::new("test_label1".into(), 4, false, true).unwrap();
let mut timer2 =
OrderTimerDeque
::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
::new("test_label2".into(), 4, false, true).unwrap();
let tok = timer1.get_token();
poll
.registry()
.register(&mut timer1, tok, Interest::READABLE)
.unwrap();
let tss = AbsoluteTime::now();
let tss_time = tss + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let ticket = timer1.add(tss_set).unwrap();
poll.poll(&mut events, None).unwrap();
let tss_time_rs = AbsoluteTime::now();
assert_eq!(tss_time_rs.seconds_cmp(&tss_time), cmp::Ordering::Equal);
assert_eq!(events.is_empty(), false);
for ev in events.iter()
{
assert_eq!(ev.token(), timer1.get_token());
let res = timer1.wait_for_event_and_process().unwrap().unwrap();
assert_eq!(res.len(), 1);
println!("res: {:?}", res);
}
let tok = timer2.get_token();
poll
.registry()
.register(&mut timer2, tok, Interest::READABLE)
.unwrap();
let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let ticket = timer1.add(tss_set).unwrap();
println!("2: ticket: {}", ticket);
poll.poll(&mut events, None).unwrap();
assert_eq!(events.is_empty(), false);
for ev in events.iter()
{
assert_eq!(ev.token(), timer1.get_token());
let res = timer1.wait_for_event_and_process().unwrap().unwrap();
assert_eq!(res.len(), 1);
println!("res: {:?}", res);
}
let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let tss_r = Arc::new(TestItem::new(6));
timer2.add(tss_r.clone(), tss_set).unwrap();
poll.poll(&mut events, None).unwrap();
assert_eq!(events.is_empty(), false);
for ev in events.iter()
{
assert_eq!(ev.token(), timer2.get_token());
let res = timer2.wait_for_event_and_process().unwrap().unwrap();
assert_eq!(res.len(), 1);
println!("res: {:?}", res);
}
}
}
#[test]
fn test_ticket_multithread()
{
let exit_flag = Arc::new(AtomicBool::new(false));
let (tx, rx) = mpsc::channel::<Vec<PollEventType>>();
let poll = TimerPoll::new().unwrap();
let timer1 =
OrderTimerDeque
::<DequeOnce, TimerDequeTicketIssuer<_>>
::new("test_label1".into(), 4, false, true).unwrap();
let mut time_list_1 = poll.add(timer1).unwrap();
let mut timer2 =
OrderTimerDeque
::<DequeOnce, TimerDequeConsumer<Arc<TestItem>, _>>
::new("test_label2".into(), 4, false, true).unwrap();
let c_poll = poll.clone();
let c_exit_flag = exit_flag.clone();
let poll_thread =
std::thread::spawn(move ||
{
while c_exit_flag.load(Ordering::Relaxed) == false
{
let Some(items) =
c_poll.poll(None).unwrap()
else
{ continue; };
println!("ev: {:?}", items);
tx.send(items).unwrap();
}
return;
}
);
std::thread::sleep(Duration::from_micros(100));
let mut time_list_2 = poll.add(timer2).unwrap();
let tss_time = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set = DequeOnce::new(tss_time);
let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
let res = rx.recv_timeout(Duration::from_secs(3)).unwrap();
let tss_time_e = AbsoluteTime::now();
assert_eq!(tss_time_e.seconds_cmp(&tss_time), cmp::Ordering::Equal);
println!("{}, {}, {} {} {:?}", tss_time, tss_time_e, tss_time_e-tss_time, ticket, res);
for item in res
{
let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
println!("res: {:?}", res);
}
let tss_time2 = AbsoluteTime::now() + Duration::from_secs(2);
let tss_set2 = DequeOnce::new(tss_time2);
let item2 = Arc::new(TestItem::new(3));
time_list_2.get_inner_mut().add(item2.clone(), tss_set2).unwrap();
let res = rx.recv_timeout(Duration::from_secs(4)).unwrap();
println!("{:?}", res);
for item in res
{
let res = time_list_2.get_inner_mut().handle_timer_event(item).unwrap();
println!("res: {:?}", res);
}
let tss = AbsoluteTime::now();
let mut tss_time = tss + Duration::from_secs(3);
let tss_set = DequeOnce::new(tss_time);
let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
{
panic!("error");
}
time_list_1.get_inner_mut().postpone(&ticket.get_deque_id(), Duration::from_secs(2).into()).unwrap();
let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();
println!("{:?}", res);
let tss_now = AbsoluteTime::now();
tss_time += Duration::from_secs(2);
assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);
for item in res
{
let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
println!("res: {:?}", res);
}
let tss = AbsoluteTime::now();
let mut tss_time = tss + Duration::from_secs(3);
let tss_set = DequeOnce::new(tss_time);
let ticket = time_list_1.get_inner_mut().add(tss_set).unwrap();
if let Ok(_) = rx.recv_timeout(Duration::from_secs(1))
{
panic!("error");
}
tss_time += Duration::from_secs(2);
let tss_set2 = DequeOnce::new(tss_time);
time_list_1.get_inner_mut().reschedule(&ticket.get_deque_id(), tss_set2).unwrap();
let res = rx.recv_timeout(Duration::from_secs(6)).unwrap();
println!("{:?}", res);
let tss_now = AbsoluteTime::now();
assert_eq!(tss_now.seconds_cmp(&tss_time), cmp::Ordering::Equal);
for item in res
{
let res = time_list_1.get_inner_mut().handle_timer_event(item).unwrap();
println!("res: {:?}", res);
}
exit_flag.store(true, Ordering::Relaxed);
poll.interrupt_poll();
std::thread::sleep(Duration::from_millis(1));
return;
}