use super::{Clock, ClockPtr, Interval, MyError, Scheduler, SyncUnsafeRcRefCell, SyncUnsafeWeakRefCell, Tick};
use anyhow::{Ok, Result as AnyResult};
use log::{debug, error, info, trace, warn};
use rblist::{self, BList, Node, Scale};
use std::fmt::Formatter;
#[cfg(all(feature = "log_precision", debug_assertions))]
use std::time::Instant;
use std::{fmt::Debug, ptr::NonNull};
pub(super) struct TimerHandle {
pub(super) name: String,
pub(super) expire_at: Tick,
pub(super) expires: Interval,
pub(super) callback: Box<dyn FnOnce() + Send + Sync + 'static>,
#[cfg(all(feature = "log_precision", debug_assertions))]
pub(super) created_at: Instant,
}
impl Debug for TimerHandle {
#[cfg(not(feature = "log_precision"))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimerHandle")
.field("name", &self.name)
.field("expire_at", &self.expire_at)
.field("expires", &self.expires)
.finish()
}
#[cfg(all(feature = "log_precision", debug_assertions))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimerHandle")
.field("name", &self.name)
.field("expire_at", &self.expire_at)
.field("expires", &self.expires)
.field("created_at", &self.created_at)
.field("elapsed", &self.created_at.elapsed())
.finish()
}
}
impl Default for TimerHandle {
fn default() -> Self {
Self {
name: "".into(),
callback: Box::new(|| {}),
expire_at: 0.into(),
expires: 0.into(),
#[cfg(all(feature = "log_precision", debug_assertions))]
created_at: Instant::now(),
}
}
}
impl TimerHandle {
pub(super) fn expired(&self, cur_ticks: Tick) -> bool {
cur_ticks >= self.expire_at
}
pub(super) fn fire(self) {
trace!("Timer[{:?}] expired.", self);
(self.callback)();
}
}
pub(in super::super) struct Timer {
clk: Option<ClockPtr>,
#[cfg(test)]
cid: usize,
q: SyncUnsafeWeakRefCell<TPL>,
n: Node,
}
impl Timer {
pub(in super::super) fn cancel(&mut self) -> AnyResult<()> {
if let Some(clk) = self.clk.take() {
Clock::cancel_timer(clk, self)
} else {
trace!("Timer has been canceled");
Ok(())
}
}
pub(super) fn stop(&mut self) -> AnyResult<()> {
let tpl = self
.q
.unsafe_upgrade()
.ok_or(MyError::LaterOperation("cancel".into()))?;
let mut tpl = tpl.unsafe_try_borrow_mut()?;
tpl.cancel_timer(self)
}
pub(super) fn is_running(&self) -> bool {
self.q.unsafe_upgrade().map_or(false, |q| {
q.unsafe_try_borrow().map_or_else(
|e| {
warn!(
"Failed to borrow the q with error: {:?}, assuming it is not empty.",
e
);
true
},
|v| !v.is_empty(),
)
})
}
}
pub(in super::super) struct AutoDropTimer {
t: Option<Timer>,
}
impl Debug for AutoDropTimer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.t.fmt(f)
}
}
impl AutoDropTimer {
pub(in super::super) fn from_timer(t: Timer) -> Self {
Self { t: Some(t) }
}
pub(in super::super) fn cancel(&mut self) -> AnyResult<()> {
if let Some(mut t) = self.t.take() {
t.cancel()
} else {
trace!("Timer has been canceled");
Ok(())
}
}
pub(in super::super) fn take(&mut self) -> Option<Timer> {
self.t.take()
}
}
impl Drop for AutoDropTimer {
fn drop(&mut self) {
let _ = self.cancel();
}
}
impl Debug for Timer {
#[cfg(not(test))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Timer").field("tid", &self.n).finish()
}
#[cfg(test)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Timer")
.field("cid", &self.cid)
.field("tid", &self.n)
.finish()
}
}
pub(in super::super) struct TPL {
me: SyncUnsafeWeakRefCell<TPL>,
clk: NonNull<Clock>,
schedule_at: Tick,
scheduled: Option<(SyncUnsafeWeakRefCell<Scheduler>, Node)>,
interval: Interval,
q: rblist::BList<TimerHandle>,
}
impl Debug for TPL {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TPL")
.field("next_expire", &self.schedule_at)
.field("interval", &self.interval)
.finish()
}
}
impl TPL {
pub(super) fn new(clk: *mut Clock, now: Tick, interval: Interval) -> SyncUnsafeRcRefCell<Self> {
assert!(interval.to_i64() > 0);
SyncUnsafeRcRefCell::new_cyclic(|this| {
Self {
me: this,
clk: NonNull::new(clk).unwrap(),
schedule_at: now,
interval,
scheduled: None,
q: rblist::BList::new(Scale::Huge),
}
})
}
fn this(&self) -> SyncUnsafeRcRefCell<Self> {
self.me.unsafe_upgrade().unwrap()
}
pub(super) fn interval(&self) -> Interval {
self.interval
}
pub(super) fn schedule_at(&self) -> Tick {
self.schedule_at
}
pub(super) fn len(&self) -> isize {
self.q.len()
}
fn clock(&mut self) -> &mut Clock {
unsafe { self.clk.as_mut() }
}
pub(super) fn new_timer(
&mut self,
expires: Interval,
callback: Box<dyn FnOnce() + Send + Sync>,
now: Tick,
name: String,
) -> AnyResult<Timer> {
assert_eq!(expires, self.interval);
let expire_at = now.until(expires);
let th = TimerHandle {
name,
expire_at,
expires,
callback,
#[cfg(all(feature = "log_precision", debug_assertions))]
created_at: Instant::now(),
};
let n = self.q.push_back(th)?;
if self.scheduled.is_none() {
assert_eq!(self.q.len(), 1);
self.schedule_at = expire_at;
let tpl = self.this();
self.scheduled = Some(self.clock().schedule_tpl(expire_at, tpl)?);
} else {
if expire_at < self.schedule_at {
let schedule_at = self.schedule_at;
self.scheduled
.take()
.and_then(|(s, n)| self.clock().deschedule_tpl(schedule_at, s, n).ok());
self.schedule_at = expire_at;
let tpl = self.this();
self.scheduled = Some(self.clock().schedule_tpl(expire_at, tpl)?);
} }
Ok(Timer {
clk: Some(self.clock().this()),
#[cfg(test)]
cid: self.clock().id(),
q: SyncUnsafeWeakRefCell::unsafe_clone(&self.me),
n,
})
}
pub(super) fn cancel_timer(&mut self, t: &Timer) -> AnyResult<()> {
let cth = self
.q
.remove(&t.n)
.map_err(|_| MyError::CancelAfterExpired)?;
let schedule_at = self.schedule_at;
if cth.expire_at == schedule_at {
if let Some(front) = self.q.front() {
let expire_at = front.expire_at;
if schedule_at != expire_at {
trace!("The least expire_at is changed from {:?} to {:?} after the front timer is canceled, update the position in conductor.", schedule_at, expire_at);
self.scheduled
.take()
.and_then(|(s, n)| self.clock().deschedule_tpl(schedule_at, s, n).ok());
self.schedule_at = expire_at;
let tpl = self.this();
self.scheduled = Some(self.clock().schedule_tpl(expire_at, tpl)?);
} else {
debug!(
"No change in expire_at({:?}), do nothing, remaining timers in the queue: {}",
schedule_at, self.q.len()
);
}
} else {
assert!(self.q.is_empty());
info!("No more timers in the queue, remove from conductor, remove the whole TPL.");
self.scheduled
.take()
.and_then(|(s, n)| self.clock().deschedule_tpl(schedule_at, s, n).ok());
let _ = self.this(); let interval = self.interval;
self.clock().remove_tpl(interval)?;
}
}
Ok(())
}
pub(super) fn on_tick(
&mut self,
now: Tick,
list: &mut BList<TimerHandle>,
cap: &mut isize,
) -> u32 {
let mut inspected = 0;
let mut oth = self.q.front();
while (*cap > 0) && oth.is_some() {
let th = oth.unwrap();
inspected += 1;
if th.expired(now) {
*cap -= 1;
let _ = list
.push_back(self.q.pop_front().expect("Front node must be present"))
.inspect_err(|e| error!("Failed to insert node to list due to {:?}", e));
} else {
if inspected > 1 {
debug!("Some timer has expired, now it must be expired later, prepare to reschedule");
assert_ne!(th.expire_at, self.schedule_at);
self.schedule_at = th.expire_at;
} else {
if th.expire_at != self.schedule_at {
self.schedule_at = th.expire_at;
info!("The first timer is not expired, but not equalt to scheduled time, the expired timer processed in the previous round equals to the max allowed, hence left a gap here.");
} else {
debug!("No timer is expired, no change in schedule_at");
}
}
break;
}
oth = self.q.front();
}
inspected
}
pub(super) fn is_empty(&self) -> bool {
self.q.is_empty()
}
}
#[cfg(all(test, feature = "mock_clock"))]
mod tests {
use super::*;
extern crate env_logger;
use env_logger::{Builder, Env};
use log::info;
use std::sync::{Arc, LazyLock, Mutex, Once};
use std::time::Duration;
use taskchain::{CondvarPair, Kinds, Signal, TaskChain};
static SEQUENTIAL: LazyLock<Arc<CondvarPair>> =
LazyLock::new(|| Arc::new(CondvarPair::new(Signal::TRIGGER(Kinds::ANY))));
static INIT: Once = Once::new();
fn initialize() {
INIT.call_once(|| {
let _ = Builder::from_env(Env::default().default_filter_or("debug")).try_init();
});
}
#[test]
fn test_timer_handle() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let expire_done = Arc::new(Mutex::new(false));
let local = expire_done.clone();
let t = TimerHandle {
name: "test".to_string(),
expire_at: Tick(100),
expires: Interval(10),
callback: Box::new(move || {
info!("Timer fired ");
*expire_done.lock().unwrap() = true;
}),
#[cfg(all(feature = "log_precision", debug_assertions))]
created_at: Instant::now(),
};
for i in 0..100 {
assert!(!t.expired(Tick(i)));
}
assert!(t.expired(Tick(100)));
assert!(t.expired(Tick(101)));
t.fire();
assert!(*local.lock().unwrap());
}
#[test]
fn test_tpl_new() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let tpl = tpl.unsafe_try_borrow().unwrap();
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now);
assert!(tpl.scheduled.is_none());
}
#[test]
fn test_new_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let expire_done = Arc::new(Mutex::new(false));
let _ = expire_done.clone();
let t = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired ");
*expire_done.lock().unwrap() = true;
}),
now,
"test".to_string(),
)
.unwrap();
assert!(t.is_running());
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now.until(interval));
assert!(tpl.scheduled.is_some());
}
#[test]
#[should_panic]
fn test_new_timer_with_wrong_interval() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(100);
let expire_done = Arc::new(Mutex::new(false));
let _ = expire_done.clone();
let _t = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired ");
*expire_done.lock().unwrap() = true;
}),
now,
"test".to_string(),
)
.unwrap();
}
#[test]
fn test_new_multi_timers() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let mut now = Tick(100);
let orig = now;
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let exp_t1 = "T1 expired";
let exp_t2 = "T2 expired";
let exp_t3 = "T3 expired";
let exp_t4 = "T4 expired";
let expire_done = Arc::new(Mutex::new(0));
let local = expire_done.clone();
let t1 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t1);
*expire_done.lock().unwrap() = 1;
}),
now,
"t1".to_string(),
)
.unwrap();
now = now.until(1.into());
let expire_done = local.clone();
let t2 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t2);
*expire_done.lock().unwrap() = 1;
}),
now,
"t2".to_string(),
)
.unwrap();
let expire_done = local.clone();
let t3 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t3);
*expire_done.lock().unwrap() = 1;
}),
now,
"t3".to_string(),
)
.unwrap();
now = now.until(10.into());
let expire_done = local.clone();
let t4 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t4);
*expire_done.lock().unwrap() = 1;
}),
now,
"t4".to_string(),
)
.unwrap();
assert!(t1.is_running() && t2.is_running() && t3.is_running() && t4.is_running());
assert_eq!(tpl.len(), 4);
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, orig.until(interval));
assert!(tpl.scheduled.is_some());
}
#[test]
fn test_timer_drop() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let expire_done = Arc::new(Mutex::new(false));
let _ = expire_done.clone();
{
let t = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired ");
*expire_done.lock().unwrap() = true;
}),
now,
"test".to_string(),
)
.unwrap();
let adt = AutoDropTimer::from_timer(t);
assert!(adt.t.as_ref().unwrap().is_running());
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now.until(interval));
assert!(tpl.scheduled.is_some());
drop(guard); }
let _g = clock.lock().unwrap();
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now.until(interval));
}
#[test]
fn test_cancel_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let expire_done = Arc::new(Mutex::new(false));
let _ = expire_done.clone();
let t = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired ");
*expire_done.lock().unwrap() = true;
}),
now,
"test".to_string(),
)
.unwrap();
assert!(t.is_running());
assert!(tpl.len() == 1);
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now.until(interval));
assert!(tpl.scheduled.is_some());
let _ = tpl.cancel_timer(&t);
assert!(tpl.len() == 0);
assert_eq!(tpl.interval, interval);
assert_eq!(tpl.schedule_at, now.until(interval));
assert!(tpl.scheduled.is_none());
}
#[test]
fn test_new_multi_timers_combination() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let mut now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let exp_t1 = "T1 expired";
let exp_t2 = "T2 expired";
let exp_t3 = "T3 expired";
let exp_t4 = "T4 expired";
let expire_done = Arc::new(Mutex::new(0));
let local = expire_done.clone();
let t1 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t1);
*expire_done.lock().unwrap() = 1;
}),
now,
"t1".to_string(),
)
.unwrap();
let t1_exp = now.until(interval);
now = now.until(interval);
let expire_done = local.clone();
let t2 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t2);
*expire_done.lock().unwrap() = 2;
}),
now,
"t2".to_string(),
)
.unwrap();
let t2_exp = now.until(interval);
assert!(t1.is_running() && t2.is_running());
assert!(tpl.len() == 2);
assert_eq!(tpl.schedule_at, t1_exp);
let mut list = BList::<TimerHandle>::new(Scale::Small);
let mut cap = 100;
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 99);
assert_eq!(list.len(), 1);
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.schedule_at, t2_exp);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 1);
now = now.until(interval);
let expire_done = local.clone();
let t3 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t3);
*expire_done.lock().unwrap() = 3;
}),
now,
"t3".to_string(),
)
.unwrap();
let t3_exp = now.until(interval);
assert_eq!(tpl.schedule_at, t2_exp);
assert!(tpl.scheduled.is_some());
now = now.until(10.into());
let expire_done = local.clone();
let t4 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t4);
*expire_done.lock().unwrap() = 4;
}),
now,
"t4".to_string(),
)
.unwrap();
let _t4_exp = now.until(interval);
assert_eq!(tpl.schedule_at, t2_exp);
assert!(tpl.scheduled.is_some());
assert_eq!(tpl.len(), 3);
let _ = tpl.cancel_timer(&t2);
assert_eq!(tpl.len(), 2);
assert_eq!(tpl.schedule_at, t3_exp);
let _ = tpl.cancel_timer(&t4);
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.schedule_at, t3_exp);
let _ = tpl.cancel_timer(&t3);
assert_eq!(tpl.len(), 0);
assert!(tpl.scheduled.is_none());
}
#[test]
fn test_new_multi_timers_combination2() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let mut now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let exp_t1 = "T1 expired";
let exp_t2 = "T2 expired";
let _exp_t3 = "T3 expired";
let _exp_t4 = "T4 expired";
let expire_done = Arc::new(Mutex::new(0));
let local = expire_done.clone();
let t1 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t1);
*expire_done.lock().unwrap() = 1;
}),
now,
"t1".to_string(),
)
.unwrap();
let t1_exp = now.until(interval);
let expire_done = local.clone();
let t2 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t2);
*expire_done.lock().unwrap() = 2;
}),
now,
"t2".to_string(),
)
.unwrap();
let _t2_exp = now.until(interval);
let expire_done = local.clone();
now = now.until(1.into());
let _t3 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t2);
*expire_done.lock().unwrap() = 3;
}),
now,
"t3".to_string(),
)
.unwrap();
let _t3_exp = now.until(interval);
assert!(t1.is_running() && t2.is_running());
assert!(tpl.len() == 3);
assert_eq!(tpl.schedule_at, t1_exp);
let mut list = BList::<TimerHandle>::new(Scale::Small);
let mut cap = 0;
now = now.until(Interval(interval.0 + 1));
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 0);
assert_eq!(list.len(), 0);
assert_eq!(tpl.len(), 3);
assert_eq!(tpl.schedule_at, t1_exp);
cap = 2;
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 0);
assert_eq!(list.len(), 2);
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.schedule_at, t1_exp);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 1);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 2);
cap = 2;
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 1);
assert_eq!(list.len(), 1);
assert_eq!(tpl.len(), 0);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 3);
}
#[test]
fn test_new_multi_timers_combination3() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = Clock::new(None).unwrap();
let mut guard = clock.lock().unwrap();
let clk: *mut Clock = &mut *guard;
let mut now = Tick(100);
let interval = Interval(1000);
let tpl = TPL::new(clk, now, interval);
let mut tpl = tpl.unsafe_try_borrow_mut().unwrap();
let expires = Interval(1000);
let exp_t1 = "T1 expired";
let exp_t2 = "T2 expired";
let exp_t3 = "T3 expired";
let _exp_t4 = "T4 expired";
let expire_done = Arc::new(Mutex::new(0));
let local = expire_done.clone();
let t1 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t1);
*expire_done.lock().unwrap() = 1;
}),
now,
"t1".to_string(),
)
.unwrap();
let t1_exp = now.until(interval);
let expire_done = local.clone();
let t2 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t2);
*expire_done.lock().unwrap() = 2;
}),
now,
"t2".to_string(),
)
.unwrap();
let _t2_exp = now.until(interval);
let expire_done = local.clone();
now = now.until(2.into());
let t3 = tpl
.new_timer(
expires,
Box::new(move || {
info!("Timer fired:{} ", exp_t3);
*expire_done.lock().unwrap() = 3;
}),
now,
"t3".to_string(),
)
.unwrap();
let t3_exp = now.until(interval);
assert!(t1.is_running() && t2.is_running());
assert!(tpl.len() == 3);
assert_eq!(tpl.schedule_at, t1_exp);
let _ = tpl.cancel_timer(&t2);
assert_eq!(tpl.len(), 2);
let mut list = BList::<TimerHandle>::new(Scale::Small);
let mut cap = 2;
now = now.until(1.into());
info!("now: {:?}", now);
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 2);
assert_eq!(list.len(), 0);
assert_eq!(tpl.len(), 2);
assert_eq!(tpl.schedule_at, t1_exp);
now = now.until(998.into());
info!("now: {:?}", now);
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 1);
assert_eq!(list.len(), 1);
assert_eq!(tpl.len(), 1);
assert_eq!(tpl.schedule_at, t3_exp);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 1);
now = now.until(1.into());
tpl.on_tick(now, &mut list, &mut cap);
assert_eq!(cap, 0);
assert_eq!(list.len(), 1);
assert_eq!(tpl.len(), 0);
assert_eq!(tpl.schedule_at, t3_exp);
list.pop_front().unwrap().fire();
assert_eq!(*local.lock().unwrap(), 3);
let r = tpl.cancel_timer(&t1);
assert!(r.is_err());
info!("r: {:?}", r);
let r = tpl.cancel_timer(&t3);
assert!(r.is_err());
info!("r: {:?}", r);
}
}