use super::{
Counter, Interval, MyError, NonBlockTickBridge, Stats, SyncUnsafeRcRefCell, SyncUnsafeWeakRefCell, Tick,
Ticker, Timer, TimerHandle, DURATION_PER_TICK, TPL,
};
use anyhow::Result as AnyResult;
use std::cmp::min;
use std::pin::Pin;
use std::rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::{collections::BTreeMap, time::Duration};
use log::{debug, error, info, trace, warn};
use rblist::{BList, Node, Scale};
static mut CLOCK_ID: AtomicUsize = AtomicUsize::new(1);
const MAX_COUNT_PER_SCHEDULE: u32 = 101;
const MAX_TIMERS_PER_TICK: u32 = 10007;
pub(super) type Scheduler = BList<Option<SyncUnsafeRcRefCell<TPL>>>;
pub(in super::super) type ClockPtr = Pin<&'static Mutex<Clock>>;
pub(in super::super) struct Clock {
me: Option<ClockPtr>,
id: usize, now: Tick,
conductor: BTreeMap<Tick, SyncUnsafeRcRefCell<Scheduler>>, tpls: BTreeMap<Interval, SyncUnsafeRcRefCell<TPL>>, sts: Stats,
ovld_threshold: u32,
num: isize,
_pin: core::marker::PhantomPinned,
}
impl Clock {
pub(in super::super) fn new(tb: Option<Box<NonBlockTickBridge>>) -> Option<ClockPtr> {
let boxed = Box::new(Mutex::new(Self {
me: None,
id: unsafe { CLOCK_ID.fetch_add(1, Ordering::Relaxed) },
now: 0.into(),
conductor: BTreeMap::new(),
tpls: BTreeMap::new(),
sts: Stats::new(0.into()),
ovld_threshold: 1000,
num: 0,
_pin: core::marker::PhantomPinned,
}));
let rp = Box::into_raw(boxed);
let ptr: &'static Mutex<Self> = unsafe { &*rp };
let me = Pin::static_ref(ptr);
let id;
{
let mut guard = ptr.lock().unwrap();
let clock = &mut *guard;
clock.me = Some(me);
id = clock.id;
}
let r = Ticker::add_clock(me, tb);
if !r.is_ok() {
unsafe {
let _ = Box::from_raw(rp);
};
return None;
}
let r = r.unwrap();
if !r.0 {
unsafe {
let _ = Box::from_raw(rp);
};
warn!("A replacement clock is returned.");
return Some(r.1);
}
let mut success = false;
let wait =
((Duration::from_secs(5).as_micros()) / (DURATION_PER_TICK.as_micros() / 2)) as u32;
for _ in 0..wait {
std::thread::sleep(DURATION_PER_TICK / 2);
{
let mut guard = ptr.lock().unwrap(); let clock = &mut *guard;
if clock.now != 0.into() {
success = true;
clock.sts.reset(clock.now);
break;
}
}
}
assert!(
success,
"Failed to initialize the clock in {:?} , the clock is leaked",
DURATION_PER_TICK * wait / 2
);
info!("Clock {} initialized successfully", id);
Some(me)
}
pub(in super::super) fn new_for_testing_without_ticker() -> ClockPtr {
let boxed = Box::new(Mutex::new(Self {
me: None,
id: unsafe { CLOCK_ID.fetch_add(1, Ordering::Relaxed) },
now: 0.into(),
conductor: BTreeMap::new(),
tpls: BTreeMap::new(),
sts: Stats::new(0.into()),
ovld_threshold: u32::MAX,
num: 0,
_pin: core::marker::PhantomPinned,
}));
let ptr: &'static Mutex<Self> = Box::leak(boxed);
let me = Pin::static_ref(ptr);
let mut guard = ptr.lock().unwrap();
let clock = &mut *guard;
clock.me = Some(me);
clock.now = 1.into();
drop(guard);
let mut success = false;
let wait = 100;
for _ in 0..wait {
{
let mut guard = ptr.lock().unwrap(); let clock = &mut *guard;
if clock.now != 0.into() {
success = true;
clock.sts.reset(clock.now);
break;
}
}
std::thread::sleep(DURATION_PER_TICK / 2);
}
assert!(
success,
"Failed to initialize the clock in {:?} , the clock is leaked",
DURATION_PER_TICK * wait / 2
);
me
}
pub(in super::super) fn new_timer<F>(
clk: ClockPtr,
duration: Duration,
f: F,
name: String,
) -> AnyResult<Timer>
where
F: FnOnce() + Send + Sync + 'static,
{
let interval: Interval = duration.into();
let mut guard = clk.lock().unwrap(); let clock = &mut *guard;
let clock_ptr: *mut Clock = clock;
let mut tpl = SyncUnsafeRcRefCell::unsafe_try_borrow_mut(
clock
.tpls
.entry(interval)
.or_insert_with(|| TPL::new(clock_ptr, clock.now, interval)),
)?;
tpl.new_timer(interval, Box::new(f), clock.now, name) .inspect(|_| {
clock.sts.push_counter(1, Counter::Created);
clock.num += 1;
})
}
pub(in super::super) fn cancel_timer(clk: ClockPtr, t: &mut Timer) -> AnyResult<()> {
debug!("Cancel timer: {:?}", t);
let mut guard = clk.lock().unwrap(); let clk = &mut *guard;
t.stop()?;
clk.peg_timer_canceled();
Ok(())
}
pub(super) fn peg_timer_canceled(&mut self) {
self.sts.push_counter(1, Counter::Canceled);
self.num -= 1;
assert!(self.num >= 0, "The number of timers must be non-negative");
}
pub(super) fn remove_tpl(&mut self, interval: Interval) -> AnyResult<SyncUnsafeRcRefCell<TPL>> {
self.tpls
.remove(&interval)
.ok_or(MyError::TplRemoved(interval.to_i64()).into())
}
pub(in super::super) fn on_tick(clk: ClockPtr, now: Tick) -> AnyResult<()> {
let mut guard = clk.lock().unwrap(); let mut clock = &mut *guard;
let (ovld, _) = clock.sts.on_tick(now);
if ovld > 0 && clock.now > (clock.ovld_threshold as u64).into() {
warn!("The clock is overloaded for {} seconds", ovld);
}
clock.now = now;
let mut inspected_now = 0;
loop {
let (inspected, mut expired, done) = clock.schedule()?;
inspected_now += inspected;
clock
.sts
.push_counter(expired.len() as u32, Counter::Expired);
clock.sts.push_inspect(inspected);
clock.num -= expired.len() as isize;
assert!(clock.num >= 0, "The number of timers must be non-negative");
drop(guard);
while let Some(th) = expired.pop_front() {
th.fire();
}
if inspected_now >= MAX_TIMERS_PER_TICK {
warn!(
"Too many timers {} is processed in one tick, the limit is {}",
inspected_now, MAX_TIMERS_PER_TICK
);
break;
}
if done {
break;
}
guard = clk.lock().unwrap(); clock = &mut *guard;
}
Ok(())
}
pub(super) fn this(&self) -> ClockPtr {
let x = self.me.unwrap();
assert!(self.me.is_some()); x
}
pub(super) fn len(&self) -> isize {
self.num
}
pub(super) fn id(&self) -> usize {
self.id
}
pub(super) fn num_type(&self) -> isize {
self.tpls.len() as isize
}
pub(super) fn num_scheduler(&self) -> isize {
self.conductor.len() as isize
}
pub(super) fn min_scheduler(&self) -> Option<Tick> {
self.conductor.first_key_value().map(|(m, _)| *m)
}
pub(super) fn max_scheduler(&self) -> Option<Tick> {
self.conductor.last_key_value().map(|(m, _)| *m)
}
pub(in super::super) fn clock_len(clk: ClockPtr) -> isize {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.len()
}
pub(in super::super) fn clock_type_num(clk: ClockPtr) -> isize {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.num_type()
}
pub(in super::super) fn clock_scheduler_num(clk: ClockPtr) -> isize {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.num_scheduler()
}
pub(in super::super) fn clock_min_max_scheduler(clk: ClockPtr) -> Option<(Tick, Tick)> {
let guard = clk.lock().unwrap(); let clock = &*guard;
if let Some(min) = clock.min_scheduler() {
let max = clock.max_scheduler().unwrap();
Some((min, max))
} else {
None
}
}
pub(in super::super) fn clock_min_scheduler(clk: ClockPtr) -> Option<Tick> {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.min_scheduler()
}
pub(in super::super) fn clock_max_scheduler(clk: ClockPtr) -> Option<Tick> {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.max_scheduler()
}
pub(in super::super) fn now(clk: ClockPtr) -> Tick {
let guard = clk.lock().unwrap(); let clock = &*guard;
clock.now
}
}
impl Clock {
pub(super) fn schedule_tpl(
&mut self,
expire_at: Tick,
tpl: SyncUnsafeRcRefCell<TPL>,
) -> AnyResult<(SyncUnsafeWeakRefCell<Scheduler>, Node)> {
let r = {
let sch = self
.conductor
.entry(expire_at)
.or_insert_with(|| SyncUnsafeRcRefCell::new(BList::new(Scale::MTiny)));
let mut scheduler = sch.unsafe_try_borrow_mut()?;
scheduler
.push_back(Some(tpl.unsafe_clone()))
.map(|n| (sch.unsafe_downgrade(), n))
};
trace!(
"A tpl with expire_at {:?} is scheduled, now the remaining conductor includes:{:?}",
expire_at,
self.conductor.keys(),
);
r
}
pub(super) fn deschedule_tpl(
&mut self,
expire_at: Tick,
scheduler: SyncUnsafeWeakRefCell<Scheduler>,
n: Node,
) -> AnyResult<()> {
let sch = scheduler
.unsafe_upgrade()
.ok_or(MyError::SchedulerRemoved(expire_at.to_u64()))?;
assert!(rc::Rc::ptr_eq(
&sch.0,
&self
.conductor
.get(&expire_at)
.expect((format!("The scheduler of {:?} must be presented", expire_at)).as_str())
.0
));
let mut scheduler = sch.unsafe_try_borrow_mut()?;
scheduler.remove(&n)?;
if scheduler.is_empty() {
self.conductor.remove(&expire_at);
}
trace!(
"The remaining schedulers:{:?} after removing the tpl with expire_at {:?}",
self.conductor.keys(),
expire_at,
);
Ok(())
}
fn schedule(&mut self) -> AnyResult<(u32, BList<TimerHandle>, bool)> {
debug!("Run schedule at {:?}", self.now);
let mut inspected = 0;
let mut list: BList<TimerHandle> = BList::new(Scale::Small);
let mut cap = min(list.capacity() as isize, MAX_COUNT_PER_SCHEDULE as isize);
while (cap > 0) && self.conductor.first_key_value().is_some() {
let at = self.conductor.first_key_value().unwrap().0;
if *at > self.now {
debug!(
"Scheduler: {:?} is not ready yet now {:?}, put it back to the conductor.",
*at, self.now
);
break;
}
let (at, sch) = self.conductor.pop_first().expect("Must be present");
let expire_at = at;
debug!(
"Schedule the scheduler {:?} now {:?}.",
expire_at, self.now
);
let mut scheduler = sch.unsafe_try_borrow_mut()?;
while (cap > 0) && scheduler.front_mut().is_some() {
let front = scheduler.front_mut().unwrap();
if front.is_none() {
error!("The first timer production line is empty.");
scheduler.pop_front();
continue; }
let t = front.as_mut().unwrap();
let mut tpl = t.unsafe_try_borrow_mut()?;
let cur_schedule_at = tpl.schedule_at();
assert!(cur_schedule_at <= self.now);
inspected += tpl.on_tick(self.now, &mut list, &mut cap);
if tpl.is_empty() {
info!(
"The timer production line {:?} is empty, remove the type of tpl completely.",
tpl
);
self.tpls.remove(&tpl.interval());
drop(tpl);
scheduler.pop_front();
} else {
let new_schedule_at = tpl.schedule_at();
if new_schedule_at != cur_schedule_at {
assert!(new_schedule_at > cur_schedule_at);
info!("No more expired timers are waiting for processing.Reschedule the timer line to the relative scheduler at {:?}", new_schedule_at);
let nt = t.unsafe_clone();
drop(tpl);
scheduler.pop_front();
let _ = self.schedule_tpl(new_schedule_at, nt).inspect_err(|e| {
error!(
"Failed to reschedule the timer line to {:?} due to {:?}",
new_schedule_at, e
)
}); } else {
assert_eq!(cap, 0);
info!("The current round has reached the max allowed number, some expired timers are waiting for processing.Insert the scheduler back to the conductor to be scheduled in the next round");
self.conductor.insert(cur_schedule_at, sch.unsafe_clone());
break;
}
}
if scheduler.is_empty() {
info!(
"All the timers in the schduler at {:?} has been gone, check next one",
cur_schedule_at
);
break;
}
}
}
trace!("Remaining schedulers:{:?}", self.conductor.keys());
Ok((inspected, list, cap > 0))
}
}
#[cfg(all(test, not(feature = "mock_clock")))]
mod tests {
use super::*;
extern crate env_logger;
use env_logger::{Builder, Env};
use log::{info, warn};
use std::sync::{Arc, LazyLock, Mutex, Once};
use std::time::Duration;
use taskchain::{CondvarPair, Kinds, Signal, TaskChain};
static INIT: Once = Once::new();
static SEQUENTIAL: LazyLock<Arc<CondvarPair>> =
LazyLock::new(|| Arc::new(CondvarPair::new(Signal::TRIGGER(Kinds::ANY))));
fn initialize() {
INIT.call_once(|| {
let _ = Builder::from_env(Env::default().default_filter_or("warn")).try_init();
});
}
fn new_test_clock() -> Pin<&'static Mutex<Clock>> {
Clock::new_for_testing_without_ticker()
}
#[test]
fn test_clock_initialization() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
{
let clock = new_test_clock();
let guard = clock.lock().unwrap();
assert_eq!(guard.now, 1.into());
}
{
let clock = new_test_clock();
let guard = clock.lock().unwrap();
assert_eq!(guard.now, 1.into());
}
}
#[test]
fn test_this() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
let clock = new_test_clock();
let guard = clock.lock().unwrap();
let clk = &*guard;
let _ = clk.this();
let _ = clk.this();
}
#[test]
fn test_schedule_and_deschedule_tpl() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clock = new_test_clock();
let mut guard = clock.lock().unwrap();
let clk = &mut *guard;
let tpl = TPL::new(clk, clk.now, 1.into());
let expire_at = Tick::from(10);
let (scheduler_weak, node) = clk.schedule_tpl(expire_at, tpl.unsafe_clone()).unwrap();
assert!(scheduler_weak.unsafe_upgrade().is_some());
clk.deschedule_tpl(expire_at, scheduler_weak, node).unwrap();
assert!(guard.conductor.get(&expire_at).is_none());
}
#[test]
fn test_new_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clk = new_test_clock();
let duration = Duration::from_secs(10);
let timer = Clock::new_timer(clk, duration, || {}, "timer".into()).unwrap();
assert!(timer.is_running());
assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
info!("Timer: {:?}", timer);
}
#[test]
fn test_new_multi_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clk = new_test_clock();
let duration = Duration::from_secs(10);
Clock::new_timer(clk, duration, || {}, "t1".into())
.expect("Failed to create the first timer");
Clock::new_timer(clk, duration, || {}, "t2".into())
.expect("Failed to create the first timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
let duration = Duration::from_secs(20);
Clock::new_timer(clk, duration, || {}, "t3".into())
.expect("Failed to create the first timer");
Clock::new_timer(clk, duration, || {}, "t4".into())
.expect("Failed to create the first timer");
assert_eq!(Clock::clock_len(clk), 4);
assert_eq!(Clock::clock_type_num(clk), 2);
}
#[test]
fn test_cancel_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clk = new_test_clock();
let duration = Duration::from_secs(10);
let mut t1 = Clock::new_timer(clk, duration, || {}, "t1".into())
.expect("Failed to create the first timer");
let mut t2 = Clock::new_timer(clk, duration, || {}, "t2".into())
.expect("Failed to create the first timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
let duration = Duration::from_secs(20);
let mut t3 = Clock::new_timer(clk, duration, || {}, "t3".into())
.expect("Failed to create the first timer");
let mut t4 = Clock::new_timer(clk, duration, || {}, "t4".into())
.expect("Failed to create the first timer");
assert_eq!(Clock::clock_len(clk), 4);
assert_eq!(Clock::clock_type_num(clk), 2);
Clock::cancel_timer(clk, &mut t1).expect("Failed to cancel the first timer");
assert_eq!(Clock::clock_len(clk), 3);
assert_eq!(Clock::clock_type_num(clk), 2);
Clock::cancel_timer(clk, &mut t2).expect("Failed to cancel the first timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
Clock::cancel_timer(clk, &mut t3).expect("Failed to cancel the first timer");
assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
Clock::cancel_timer(clk, &mut t4).expect("Failed to cancel the first timer");
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
}
fn timer_log(n: i32) -> String {
let mut x = String::new();
x.push_str(format!("Timer{} expired", n).as_str());
x
}
fn insert_timer(
clk: Pin<&'static Mutex<Clock>>,
duration: Duration,
n: i32,
group: String,
) -> Vec<Timer> {
let mut timers = Vec::new();
for i in 1..n + 1 {
let x = group.clone();
let timer = Clock::new_timer(
clk,
duration,
move || info!("[Group:{}-Timer:{}] expired.", x, i),
format!("[Group:{}-Timer:{}]", group, i).into(),
)
.expect("Failed to create the timer");
timers.push(timer);
}
timers
}
#[test]
fn test_timer_expire_1() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = 2;
let clk = new_test_clock();
let mut now = Tick::from(2);
let base_exp = Duration::from_secs(1);
let exp = [base_exp * 2, base_exp * 4, base_exp * 6];
let step = Interval::from(base_exp);
Clock::on_tick(clk, now).expect("Failed on tick");
let _timers = insert_timer(clk, exp[0], n, "1".to_string());
let expire_at = now.until(Interval::from(exp[0]));
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at, expire_at)
);
now = now.until(step);
Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at, expire_at)
);
now = now.until(step);
Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
assert_eq!(Clock::clock_scheduler_num(clk), 0);
assert!(Clock::clock_min_max_scheduler(clk).is_none());
}
#[test]
fn test_timer_expire_2() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = 2;
let clk = new_test_clock();
let mut now = Tick::from(2);
let base_exp = Duration::from_secs(1);
let exp = [base_exp * 2, base_exp * 4, base_exp * 6];
let step = Interval::from(base_exp);
Clock::on_tick(clk, now).expect("Failed on tick");
let mut timers1 = insert_timer(clk, exp[0], n, "1".to_string());
let expire_at1 = now.until(Interval::from(exp[0]));
now = now.until(step);
Clock::on_tick(clk, now).expect("Failed on tick");
let mut timers2 = insert_timer(clk, exp[0], n, "2".to_string());
let expire_at2 = now.until(Interval::from(exp[0]));
assert_eq!(Clock::clock_len(clk), 4);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at1, expire_at1)
);
now = now.until(step);
Clock::on_tick(clk, now).expect("Failed on tick"); assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at2, expire_at2)
);
now = now.until(step - 1.into());
Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at2, expire_at2)
);
assert!(Clock::cancel_timer(clk, &mut timers1[0]).is_err());
assert!(Clock::cancel_timer(clk, &mut timers2[0]).is_ok());
assert!(Clock::cancel_timer(clk, &mut timers2[0]).is_err());
assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at2, expire_at2)
);
now = now.until(step);
Clock::on_tick(clk, now).expect("Failed on tick"); assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
assert_eq!(Clock::clock_scheduler_num(clk), 0);
assert!(Clock::clock_min_max_scheduler(clk).is_none());
assert!(Clock::cancel_timer(clk, &mut timers2[1]).is_err());
}
fn test_timer_expire_3_4_5_helper(clk: Pin<&'static Mutex<Clock>>, n: i32) {
warn!("test_timer_expire_3_4_5_helper with n={}", n);
let mut now = Tick::from(2);
let base_exp = Duration::from_secs(1);
let exp = [base_exp * 4, base_exp * 5, base_exp * 6];
let step = Interval::from(base_exp);
Clock::on_tick(clk, now).expect("Failed on tick");
let _timers31 = insert_timer(clk, exp[2], n, "31".to_string());
let expire_at31 = now.until(Interval::from(exp[2]));
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers21 = insert_timer(clk, exp[1], n, "21".to_string());
let expire_at21 = now.until(Interval::from(exp[1]));
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers11 = insert_timer(clk, exp[0], n, "11".into());
let expire_at11 = now.until(Interval::from(exp[0]));
assert_eq!(Clock::clock_len(clk), (n * 3) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at11, expire_at11)
);
assert_eq!(expire_at11, expire_at21);
assert_eq!(expire_at11, expire_at31);
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers12 = insert_timer(clk, exp[0], n, "12".into());
let expire_at12 = now.until(Interval::from(exp[0])); let _timers22 = insert_timer(clk, exp[1], n, "22".into());
let expire_at22 = now.until(Interval::from(exp[1])); let _timers32 = insert_timer(clk, exp[2], n, "32".into());
let expire_at32 = now.until(Interval::from(exp[2])); assert_eq!(Clock::clock_len(clk), (n * 3 * 2) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at11, expire_at11)
);
now = now.until(step * 3); let expiring = n * 3;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), (n * 3) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 3);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at12, expire_at32)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), (n * 2) as isize);
assert_eq!(Clock::clock_type_num(clk), 2);
assert_eq!(Clock::clock_scheduler_num(clk), 2);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at22, expire_at32)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), n as isize);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at32, expire_at32)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
assert_eq!(Clock::clock_scheduler_num(clk), 0);
assert!(Clock::clock_min_max_scheduler(clk).is_none());
}
fn test_timer_expire_3_4_5_recreate_helper(clk: Pin<&'static Mutex<Clock>>, n: i32) {
warn!("test_timer_expire_3_4_5_recreate_helper with n={}", n);
let mut now = Tick::from(2);
let base_exp = Duration::from_secs(1);
let exp = [base_exp * 4, base_exp * 5, base_exp * 6];
let step = Interval::from(base_exp);
Clock::on_tick(clk, now).expect("Failed on tick");
let _timers31 = insert_timer(clk, exp[2], n, "31".to_string());
let expire_at31 = now.until(Interval::from(exp[2]));
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers21 = insert_timer(clk, exp[1], n, "21".to_string());
let expire_at21 = now.until(Interval::from(exp[1]));
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers11 = insert_timer(clk, exp[0], n, "11".into());
let expire_at11 = now.until(Interval::from(exp[0]));
assert_eq!(Clock::clock_len(clk), (n * 3) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at11, expire_at11)
);
assert_eq!(expire_at11, expire_at21);
assert_eq!(expire_at11, expire_at31);
now = now.until(step); Clock::on_tick(clk, now).expect("Failed on tick");
let _timers12 = insert_timer(clk, exp[0], n, "12".into());
let expire_at12 = now.until(Interval::from(exp[0])); let _timers22 = insert_timer(clk, exp[1], n, "22".into());
let expire_at22 = now.until(Interval::from(exp[1])); let _timers32 = insert_timer(clk, exp[2], n, "32".into());
let expire_at32 = now.until(Interval::from(exp[2])); assert_eq!(Clock::clock_len(clk), (n * 3 * 2) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at11, expire_at11)
);
now = now.until(step * 3); let expiring = n * 3;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), (n * 3) as isize);
assert_eq!(Clock::clock_type_num(clk), 3);
assert_eq!(Clock::clock_scheduler_num(clk), 3);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at12, expire_at32)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
} assert_eq!(Clock::clock_len(clk), (n * 2) as isize);
assert_eq!(Clock::clock_type_num(clk), 2);
assert_eq!(Clock::clock_scheduler_num(clk), 2);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at22, expire_at32)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), n as isize);
assert_eq!(Clock::clock_type_num(clk), 1);
assert_eq!(Clock::clock_scheduler_num(clk), 1);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at32, expire_at32)
);
let _timers13 = insert_timer(clk, exp[0], n, "13".into());
let expire_at13 = now.until(Interval::from(exp[0])); let _timers33 = insert_timer(clk, exp[2], n, "33".into());
let expire_at33 = now.until(Interval::from(exp[2])); assert_eq!(Clock::clock_len(clk), (n * 3) as isize);
assert_eq!(Clock::clock_type_num(clk), 2);
assert_eq!(Clock::clock_scheduler_num(clk), 2);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at32, expire_at13)
);
now = now.until(step); let expiring = n;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
} assert_eq!(Clock::clock_len(clk), (n * 2) as isize);
assert_eq!(Clock::clock_type_num(clk), 2);
assert_eq!(Clock::clock_scheduler_num(clk), 2);
assert_eq!(
Clock::clock_min_max_scheduler(clk).unwrap(),
(expire_at13, expire_at33)
);
now = now.until(step * 6); let expiring = n * 2;
let round = {
if expiring == 0 {
1
} else {
((expiring - 1) / MAX_TIMERS_PER_TICK as i32) + 1
}
};
for _ in 0..round {
Clock::on_tick(clk, now).expect("Failed on tick");
}
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
assert_eq!(Clock::clock_scheduler_num(clk), 0);
assert!(Clock::clock_min_max_scheduler(clk).is_none());
}
#[test]
fn test_timer_expire_3() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
test_timer_expire_3_4_5_helper(new_test_clock(), 2);
}
#[test]
fn test_timer_expire_3_recreate() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
test_timer_expire_3_4_5_recreate_helper(new_test_clock(), 2);
}
#[test]
fn test_timer_expire_4() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = [
1,
3,
MAX_COUNT_PER_SCHEDULE / 3 - 1,
MAX_COUNT_PER_SCHEDULE / 3,
MAX_COUNT_PER_SCHEDULE / 3 + 1,
MAX_COUNT_PER_SCHEDULE / 2 - 1,
MAX_COUNT_PER_SCHEDULE / 2,
MAX_COUNT_PER_SCHEDULE / 2 + 1,
MAX_COUNT_PER_SCHEDULE * 2 / 3 - 1,
MAX_COUNT_PER_SCHEDULE * 2 / 3,
MAX_COUNT_PER_SCHEDULE * 2 / 3 + 1,
MAX_COUNT_PER_SCHEDULE - 1,
MAX_COUNT_PER_SCHEDULE,
MAX_COUNT_PER_SCHEDULE + 1,
];
for i in n {
test_timer_expire_3_4_5_helper(new_test_clock(), i as i32);
}
}
#[test]
fn test_timer_expire_4_recreate() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = [
1,
3,
MAX_COUNT_PER_SCHEDULE / 3 - 1,
MAX_COUNT_PER_SCHEDULE / 3,
MAX_COUNT_PER_SCHEDULE / 3 + 1,
MAX_COUNT_PER_SCHEDULE / 2 - 1,
MAX_COUNT_PER_SCHEDULE / 2,
MAX_COUNT_PER_SCHEDULE / 2 + 1,
MAX_COUNT_PER_SCHEDULE * 2 / 3 - 1,
MAX_COUNT_PER_SCHEDULE * 2 / 3,
MAX_COUNT_PER_SCHEDULE * 2 / 3 + 1,
MAX_COUNT_PER_SCHEDULE - 1,
MAX_COUNT_PER_SCHEDULE,
MAX_COUNT_PER_SCHEDULE + 1,
];
for i in n {
test_timer_expire_3_4_5_recreate_helper(new_test_clock(), i as i32);
}
}
#[test]
fn test_timer_expire_5() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = [
MAX_TIMERS_PER_TICK - 1,
MAX_TIMERS_PER_TICK,
MAX_TIMERS_PER_TICK + 1,
];
for i in n {
test_timer_expire_3_4_5_helper(new_test_clock(), i as i32);
}
}
#[test]
fn test_timer_expire_5_recreate() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let n = [
MAX_TIMERS_PER_TICK - 1,
MAX_TIMERS_PER_TICK,
MAX_TIMERS_PER_TICK + 1,
];
for i in n {
test_timer_expire_3_4_5_recreate_helper(new_test_clock(), i as i32);
}
}
#[test]
fn test_timer_combined() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clk = new_test_clock();
let duration = Duration::from_secs(10);
let mut now = Tick::from(1);
let interval = Interval::from(duration / 2);
now = now.until(interval);
let mut t1 = Clock::new_timer(clk, duration, || info!("{}", timer_log(1)), "t1".into())
.expect("Failed to create the timer");
let mut t2 = Clock::new_timer(clk, duration, || info!("{}", timer_log(2)), "t2".into())
.expect("Failed to create the timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
let duration = Duration::from_secs(20);
let mut t3 = Clock::new_timer(clk, duration, || info!("{}", timer_log(3)), "t3".into())
.expect("Failed to create the timer");
let mut t4 = Clock::new_timer(clk, duration, || info!("{}", timer_log(4)), "t4".into())
.expect("Failed to create the timer");
assert_eq!(Clock::clock_len(clk), 4);
assert_eq!(Clock::clock_type_num(clk), 2);
Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 4);
assert_eq!(Clock::clock_type_num(clk), 2);
now = now.until(interval);
Clock::on_tick(clk, now).expect("Failed on tick"); let r = Clock::cancel_timer(clk, &mut t1);
assert!(r.is_err());
info!("Error to cancle: {:?}", r);
let r = Clock::cancel_timer(clk, &mut t2);
assert!(r.is_err());
info!("Error to cancle: {:?}", r);
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
Clock::cancel_timer(clk, &mut t3).expect("Failed to cancel the timer");
assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
now = now.until(Interval(interval.0 * 2));
Clock::on_tick(clk, now).expect("Failed on tick"); let r = Clock::cancel_timer(clk, &mut t4);
assert!(r.is_err());
info!("Error to cancel: {:?}", r);
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
Clock::on_tick(clk, now).expect("Failed on tick");
now = now.until(interval);
Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
let _t5 = Clock::new_timer(clk, duration, || info!("{}", timer_log(5)), "t5".into())
.expect("Failed to create the timer");
now = now.until(interval * 2);
Clock::on_tick(clk, now).expect("Failed on tick"); let mut t6 = Clock::new_timer(clk, duration, || info!("{}", timer_log(6)), "t6".into())
.expect("Failed to create the timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
now = now.until(interval * 2);
Clock::on_tick(clk, now).expect("Failed on tick"); assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
let _t7 = Clock::new_timer(clk, duration, || info!("{}", timer_log(7)), "t7".into())
.expect("Failed to create the timer");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
now = now.until(interval); Clock::on_tick(clk, now).expect("Failed on tick");
assert_eq!(Clock::clock_len(clk), 2);
assert_eq!(Clock::clock_type_num(clk), 1);
Clock::cancel_timer(clk, &mut t6).expect("Failed to cancel the timer");
assert_eq!(Clock::clock_len(clk), 1);
assert_eq!(Clock::clock_type_num(clk), 1);
now = now.until(interval * 4);
Clock::on_tick(clk, now).expect("Error happened in on_tick");
assert_eq!(Clock::clock_len(clk), 0);
assert_eq!(Clock::clock_type_num(clk), 0);
}
}