use std::{borrow::Borrow, collections::HashSet, fmt, os::fd::AsRawFd, sync::{atomic::{AtomicBool, Ordering}, mpsc, Arc}, time::Duration};
use std::hash::Hash;
use crate::{common, deque_timeout::OrderdTimerDequeOnce, timer_portable::{poll::{PollEventType, PollResult}, timer::{AbsoluteTime, RelativeTime}}, OrderedTimerDeque, TimerDequeConsumer, TimerPoll};
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct TestItem(u64);
impl fmt::Display for TestItem
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "0 = {}", self.0)
}
}
struct ArcWrap((Arc<TestItem>, AbsoluteTime));
impl Hash for ArcWrap
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H)
{
self.0.0.as_ref().0.hash(state);
}
}
impl Eq for ArcWrap {}
impl PartialEq for ArcWrap
{
fn eq(&self, other: &Self) -> bool
{
self.0.0.as_ref().0 == other.0.0.as_ref().0
}
}
impl PartialEq<Arc<TestItem>> for ArcWrap
{
fn eq(&self, other: &Arc<TestItem>) -> bool
{
self.0.0.as_ref().0 == other.0
}
}
impl Borrow<Arc<TestItem>> for ArcWrap
{
fn borrow(&self) -> &Arc<TestItem>
{
return &self.0.0;
}
}
impl AsRef<Arc<TestItem>> for ArcWrap
{
fn as_ref(&self) -> &Arc<TestItem>
{
return &self.0.0;
}
}
#[test]
fn thread_test()
{
let ev_watch = TimerPoll::new().unwrap();
let mut time_list =
OrderedTimerDeque
::<TimerDequeConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
ev_watch.add(&time_list).unwrap();
let thread1_ctrl = Arc::new(AtomicBool::new(true));
let thread1_weak = Arc::downgrade(&thread1_ctrl);
let (snd, rcv) = mpsc::channel::<i32>();
let (snd2, rcv2) = mpsc::channel::<()>();
let thread_hndl =
std::thread::spawn(move ||
{
while thread1_ctrl.load(Ordering::Acquire) == true
{
let events = ev_watch.poll(Option::Some(100)).unwrap().into_inner();
for event in events
{
match event
{
PollEventType::TimerTimeout(tfd) =>
{
snd.send(tfd).unwrap();
let _ = rcv2.recv();
continue;
},
PollEventType::TimerRemoved(tfd) =>
{
panic!("timer removed : {}!", tfd);
},
PollEventType::SubError(e) =>
{
panic!("error: {}", e);
}
}
}
}
return;
}
);
let mut items: HashSet<ArcWrap> = HashSet::with_capacity(100);
for i in 0..100
{
let tss_set =
AbsoluteTime::now() + RelativeTime::new_time(1 + i / 10, 0);
let ent = Arc::new(TestItem(i as u64));
time_list.add_to_timer(ent.clone(), tss_set.clone()).unwrap();
items.insert(ArcWrap((ent, tss_set)));
}
while time_list.timer_queue_len() > 0
{
let poll_event = rcv.recv().unwrap();
if poll_event != time_list.as_raw_fd()
{
println!("unknown fd!");
continue;
}
let events = time_list.wait_for_event().unwrap();
let mut timeout_items = Vec::<Arc<TestItem>>::with_capacity(10);
time_list.timeout_event_handler(events, &mut timeout_items).unwrap();
let cur_tm = common::get_current_timestamp().timestamp();
println!("--- curtime: {}", cur_tm);
for tm in timeout_items.into_iter()
{
{
let itm = items.get(&tm).unwrap();
println!("timeout id: {}, time: {}", tm.0, itm.0.1);
}
items.remove(&tm);
}
snd2.send(()).unwrap();
}
thread1_weak.upgrade().unwrap().store(false, Ordering::Release);
thread_hndl.join().unwrap();
return;
}
#[test]
fn thread_test_cancel()
{
let ev_watch = TimerPoll::new().unwrap();
let mut time_list =
OrderedTimerDeque
::<TimerDequeConsumer<Arc<TestItem>, OrderdTimerDequeOnce>>
::new("test_label".into(), 4, false).unwrap();
ev_watch.add(&time_list).unwrap();
let tss_set =
AbsoluteTime::now() + RelativeTime::new_time(10, 0);
let ent = Arc::new(TestItem(2));
time_list.add_to_timer(ent.clone(), tss_set.clone()).unwrap();
let thread_hndl =
std::thread::spawn(move ||
{
std::thread::sleep(Duration::from_secs(1));
drop(time_list);
return;
}
);
let res = ev_watch.poll(Some(1500));
println!("poll: {:?}", res);
thread_hndl.join().unwrap();
return;
}