use super::super::{MyError, NonBlockTickBridge};
use super::{Clock, ClockPtr, Interval, Tick, DURATION_PER_TICK, TICKS_PER_SECOND};
use anyhow::Result as AnyResult;
use crossbeam_channel::Select;
use log::{error, info, trace, warn};
use once_cell::sync::Lazy;
use std::{
cmp::max,
sync::{
atomic::{AtomicUsize, Ordering},
Mutex, Once,
},
thread::{self, sleep, JoinHandle},
time::{Duration, Instant},
};
use thread_priority::*;
static TICKER: Lazy<Ticker> = Lazy::new(|| Ticker::new());
pub(super) static NUM_OF_BROKER: Lazy<usize> = Lazy::new(|| {
let mut num = num_cpus::get();
if num > 16 {
num = 16;
} else if num < 2 {
num = 2;
}
num
});
pub(super) static MAX_CLOCKS: Lazy<usize> =
Lazy::new(|| max(*NUM_OF_BROKER * 4, num_cpus::get() * 4));
pub(super) static MAX_DIRECT_CLOCKS: Lazy<usize> = Lazy::new(|| *NUM_OF_BROKER);
static WAIT_FOR_INIT_CALIBRATE: Once = Once::new();
static INIT: Once = Once::new();
enum Message {
Terminate,
Tick(Tick),
AddClock(LocalClock, Option<crossbeam_channel::Sender<AnyResult<()>>>),
}
type Channel = (
crossbeam_channel::Sender<Message>,
crossbeam_channel::Receiver<Message>,
);
const TICKS_TO_CALIBRATE: Interval = TICKS_PER_SECOND; const CHANNEL_SIZE: usize = 100;
struct LocalClock {
clk: ClockPtr,
tb: Option<Box<NonBlockTickBridge>>,
}
impl LocalClock {
fn new(clk: ClockPtr, tb: Option<Box<NonBlockTickBridge>>) -> Self {
Self { clk, tb }
}
fn on_tick(&self, tick: Tick) -> AnyResult<()> {
self.tb.as_ref().map_or_else(
|| -> AnyResult<()> {
Clock::on_tick(self.clk, tick)?;
Ok(())
},
|tb| -> AnyResult<()> {
tb(super::super::Clock(self.clk), tick);
Ok(())
},
)
}
}
pub(in super::super) struct Ticker {
sch: Vec<Channel>, cch_stopped: Mutex<bool>,
cch: Channel, next: AtomicUsize,
terminate: bool, jh: Mutex<Option<JoinHandle<()>>>,
num_of_broker: AtomicUsize,
num_of_clock: AtomicUsize,
num_per_broker: Mutex<Vec<usize>>,
direct_clocks: Mutex<(u32, Vec<ClockPtr>)>,
}
pub(super) struct Tcb {
pub(super) tick: Tick,
pub(super) dpt: Duration, pub(super) start: Instant,
pub(super) ttc: Tick, pub(super) calibrated: (Tick, Instant),
}
impl Ticker {
pub(in super::super) fn take_join_handle() -> Option<JoinHandle<()>> {
let mut lock = TICKER.jh.lock().unwrap();
let jh = &mut *lock;
jh.take()
}
pub(self) fn new() -> Self {
let mut t = Ticker {
sch: Vec::new(),
cch_stopped: Mutex::new(false),
cch: crossbeam_channel::bounded(CHANNEL_SIZE),
next: AtomicUsize::new(0),
terminate: false, jh: Mutex::new(None),
num_of_broker: AtomicUsize::new(0),
num_of_clock: AtomicUsize::new(0),
num_per_broker: Mutex::new(Vec::new()),
direct_clocks: Mutex::new((0, Vec::new())),
};
{
let mut npb_lock = t.num_per_broker.lock().unwrap();
let npb = &mut *npb_lock;
for _ in 0..(*NUM_OF_BROKER) {
t.sch.push(crossbeam_channel::bounded(CHANNEL_SIZE));
npb.push(0);
}
}
warn!("{} channels are created", t.sch.len());
*t.jh.lock().unwrap() = Self::start();
t
}
pub(super) fn start() -> Option<JoinHandle<()>> {
let mut jh: Option<JoinHandle<()>> = None;
INIT.call_once(|| {
let h = thread::spawn(|| {
let mut broker_handles: Vec<JoinHandle<()>> = Vec::new();
thread::scope(|s| {
ThreadBuilder::default()
.name("Ticker".to_string())
.priority(ThreadPriority::Max)
.spawn_scoped(s, |_| {
let tick = 1.into();
let time = Instant::now();
let mut tcb = Tcb {
tick,
dpt: DURATION_PER_TICK,
start: time,
ttc: tick.until(TICKS_TO_CALIBRATE),
calibrated: (tick, time),
};
Self::beat(&mut tcb);
})
.unwrap();
ThreadBuilder::default()
.name("Controller".to_string())
.priority(ThreadPriority::Max)
.spawn_scoped(s, |_| Self::on_control(&mut broker_handles))
.unwrap();
});
for jh in broker_handles {
let _ = jh.join();
}
warn!("All threads exited");
});
jh = Some(h);
});
return jh;
}
pub(super) fn beat(tcb: &mut Tcb) {
let txes: Vec<crossbeam_channel::Sender<Message>> =
TICKER.sch.iter().map(|(s, _)| s.clone()).collect();
loop {
sleep(tcb.dpt);
tcb.tick += 1;
if tcb.tick == tcb.ttc {
let now = Instant::now();
let elapsed = now.duration_since(tcb.calibrated.1);
let ticks = TICKS_TO_CALIBRATE;
assert_eq!(tcb.tick.since(tcb.calibrated.0), TICKS_TO_CALIBRATE);
let dpt = Duration::from_nanos(
(((Duration::from_secs((ticks.0 / TICKS_PER_SECOND.0) as u64)).as_nanos()
* tcb.dpt.as_nanos())
/ elapsed.as_nanos()) as u64,
);
trace!(
"calibrated us per tick: {}us -> {}us, {}us elapsed in {} ticks",
tcb.dpt.as_micros(),
dpt.as_micros(),
elapsed.as_micros(),
ticks.0
);
tcb.dpt = dpt;
tcb.ttc = tcb.tick.until(TICKS_TO_CALIBRATE);
tcb.calibrated = (tcb.tick, now);
} else if tcb.tick > tcb.ttc {
error!(
"The calibration time {:?} was passed by until {:?}",
tcb.ttc, tcb.tick
);
let now = Instant::now();
let elapsed = now.duration_since(tcb.calibrated.1);
let ticks = tcb.tick.since(tcb.calibrated.0);
let dpt = elapsed / (ticks.0 as u32);
warn!(
"calibrated dpt: {} -> {}, {} elapsed in {}",
tcb.dpt.as_millis(),
dpt.as_millis(),
elapsed.as_millis(),
ticks.0
);
tcb.dpt = dpt;
tcb.ttc = tcb.tick.until(TICKS_TO_CALIBRATE);
tcb.calibrated = (tcb.tick, now);
} else {
}
for i in 0..(*NUM_OF_BROKER) {
let r = txes[i].try_send(Message::Tick(tcb.tick));
if r.is_err() {
error!(
"Failed to send tick notification to {:?} with error {:?}, it could be an error or end of process.",
i, r
);
if TICKER.terminate {
warn!(
"The ticker is being terminated, should be on the end of process only."
);
return;
}
}
}
}
}
fn ticker_mut() -> &'static mut Ticker {
let t: *mut Ticker = &*TICKER as *const Ticker as *mut Ticker;
let mt = unsafe { t.as_mut().unwrap() };
mt
}
pub(super) fn on_message(id: usize) {
info!("TimerProcessor{} start", id);
if id >= TICKER.sch.len() {
assert!(id < TICKER.sch.len())
}
let rx = &TICKER.sch[id].1;
let mut clocks: Vec<LocalClock> = Vec::new();
while let Ok(msg) = rx.recv() {
match msg {
Message::Terminate => {
if id == 0 {
Self::ticker_mut().terminate = true;
}
warn!("TimerProcessor{} exit", id);
return;
}
Message::Tick(tick) => {
for clk in &clocks {
let _ = clk.on_tick(tick);
}
}
Message::AddClock(clk, _) => {
clocks.push(clk);
info!("{} ringers is added in TimerProcessor{}", clocks.len(), id);
}
}
}
}
pub(super) fn on_control(broker_handles: &mut Vec<JoinHandle<()>>) {
info!("Controller thread started");
let mut sel = Select::new();
let mut count = 0;
let now = Instant::now();
let cchi = sel.recv(&TICKER.cch.1);
let mut processors = 0;
assert_eq!(cchi, 0);
for i in 0..(*NUM_OF_BROKER) {
let rxi = (*NUM_OF_BROKER) - 1 - i;
let schi = sel.recv(&TICKER.sch[rxi].1);
assert_eq!(schi, i + 1);
assert_eq!(schi + rxi, (*NUM_OF_BROKER))
}
loop {
let _ = sel.ready();
for i in processors..*NUM_OF_BROKER {
count += TICKER.sch[i].1.try_iter().count();
}
let mut lock_cch = TICKER.cch_stopped.lock().unwrap();
let cch_stopped = &mut *lock_cch;
while let Ok(msg) = TICKER.cch.1.try_recv() {
match msg {
Message::Terminate => {
if Self::num_of_clock() == 0 {
Self::ticker_mut().terminate = true;
}
warn!("Terminate controller and the process");
return;
}
Message::Tick(_) => {
unreachable!()
}
Message::AddClock(clk, reply) => {
let n = TICKER.num_of_clock.fetch_add(1, Ordering::AcqRel);
if n >= (*MAX_CLOCKS) {
error!("Too many clocks are added through controller, something wrong in the implementation, max is {}", *MAX_CLOCKS);
TICKER.num_of_clock.fetch_sub(1, Ordering::AcqRel);
let _ = reply
.expect("Reply channel must be presented")
.send(Err(MyError::TooManyClocks.into()));
} else if n < *NUM_OF_BROKER {
assert_eq!(n, processors);
let schi = (*NUM_OF_BROKER) - n; sel.remove(schi);
let mh = ThreadBuilder::default()
.name(format!("TimerProcessor{}", n).to_string())
.priority(ThreadPriority::Max)
.spawn(move |_| {
Self::on_message(n);
})
.unwrap();
broker_handles.push(mh);
TICKER.num_of_broker.fetch_add(1, Ordering::Relaxed);
let next = TICKER.next.fetch_add(1, Ordering::AcqRel);
assert_eq!(next, n);
assert_eq!(
(*(Self::ticker_mut().num_per_broker.lock().unwrap()))[n],
0
);
(*(Self::ticker_mut().num_per_broker.lock().unwrap()))[n] += 1;
processors += 1;
TICKER.sch[n].0.send(Message::AddClock(clk, None)).unwrap();
if processors == *NUM_OF_BROKER {
*cch_stopped = true;
}
let _ = reply.expect("Reply channel must be presented").send(Ok(()));
} else {
*cch_stopped = true;
assert!(n >= *NUM_OF_BROKER);
assert_eq!(processors, *NUM_OF_BROKER);
warn!("Add clock in controller while all processors are up, add before the controller can exit. Exceptional situation.");
let r = Self::add_clock_to_processor(clk);
let _ = reply.expect("Reply channel must be presented").send(r);
}
}
}
}
if *cch_stopped {
warn!("All processors have been initiated, exit the controller after having run for {:?}. {} messages were dropped in controller",
now.elapsed(), count);
assert_eq!(TICKER.num_of_broker.load(Ordering::Relaxed), *NUM_OF_BROKER);
return;
}
}
}
pub(in super::super) fn terminate() {
let _ = TICKER.cch.0.try_send(Message::Terminate);
for i in 0..(*NUM_OF_BROKER) {
TICKER.sch[i].0.send(Message::Terminate).unwrap();
}
}
pub(super) fn min_clocks() -> (usize, usize) {
let lock = TICKER.num_per_broker.lock().unwrap();
let npb = &*lock;
if npb.len() == 0 {
return (0, 0);
}
let (mut at, mut min) = (0, npb[0]);
for i in 0..npb.len() {
if npb[i] < min {
at = i;
min = npb[i]
}
}
return (at, min);
}
pub(super) fn max_clocks() -> (usize, usize) {
let lock = TICKER.num_per_broker.lock().unwrap();
let npb = &*lock;
if npb.len() == 0 {
return (0, 0);
}
let (mut at, mut max) = (0, npb[0]);
for i in 0..npb.len() {
if npb[i] > max {
at = i;
max = npb[i]
}
}
(at, max)
}
pub(in super::super) fn num_of_clock() -> usize {
TICKER.num_of_clock.load(Ordering::Relaxed)
}
pub(super) fn num_of_broker() -> usize {
TICKER.num_of_broker.load(Ordering::Relaxed)
}
fn add_clock_to_processor(clk: LocalClock) -> AnyResult<()> {
let n = TICKER.num_of_clock.fetch_add(1, Ordering::AcqRel);
if n >= (*MAX_CLOCKS) && clk.tb.is_some() {
info!(
"Too many clocks is being added to bridged clocks, max allowed is {}",
*MAX_CLOCKS
);
TICKER.num_of_clock.fetch_sub(1, Ordering::AcqRel);
return Err(MyError::TooManyClocks.into());
}
let mut next = TICKER.next.fetch_add(1, Ordering::AcqRel);
if next >= *NUM_OF_BROKER {
next %= *NUM_OF_BROKER;
}
TICKER.sch[next]
.0
.try_send(Message::AddClock(clk, None))
.map_or_else(
|e| {
error!("Failed to add clock to processor, error:{:?}", e);
Err(MyError::QueueIsFull.into())
},
|_| {
(*(Self::ticker_mut().num_per_broker.lock().unwrap()))[next] += 1;
Ok(())
},
)
}
fn check_add_direct_clock(clk: ClockPtr) -> Option<ClockPtr> {
let mut lock = TICKER.direct_clocks.lock().unwrap();
let dc = &mut *lock;
if dc.1.len() < *MAX_DIRECT_CLOCKS {
dc.1.push(clk.clone());
None
} else {
if dc.0 >= dc.1.len() as u32 {
dc.0 = dc.0 % (dc.1.len() as u32);
}
let picked = dc.1[dc.0 as usize];
dc.0 += 1;
Some(picked)
}
}
pub(in super::super) fn add_clock(
clk: ClockPtr,
tb: Option<Box<NonBlockTickBridge>>,
) -> AnyResult<(bool, ClockPtr)> {
if tb.is_none() {
let existing_clk = Self::check_add_direct_clock(clk);
if existing_clk.is_some() {
return Ok((false, existing_clk.unwrap()));
}
info!("Continue to add the clock to the system");
}
let local_clk = LocalClock::new(clk, tb);
let lock_cch = TICKER.cch_stopped.lock().unwrap();
let cch_stopped = *lock_cch;
if !cch_stopped {
let (tx, rx) = crossbeam_channel::bounded::<AnyResult<()>>(1);
TICKER
.cch
.0
.try_send(Message::AddClock(local_clk, Some(tx)))?;
drop(lock_cch);
WAIT_FOR_INIT_CALIBRATE.call_once(|| {
warn!("Wait for 3 seconds for initial calibration");
thread::sleep(Duration::from_secs(3));
});
return rx
.recv_timeout(Duration::from_secs(1))?
.map(|_| (true, clk));
}
drop(lock_cch);
trace!("The controller has gone, handle the request here");
Self::add_clock_to_processor(local_clk).map(|_| (true, clk))
}
}
#[cfg(all(test, feature = "mock_clock"))]
mod tests {
use super::super::*;
extern crate env_logger;
use env_logger::{Builder, Env};
use crate::inner::ticker::{MAX_CLOCKS, MAX_DIRECT_CLOCKS, NUM_OF_BROKER};
use log::{info, warn};
use std::sync::{Arc, LazyLock, Mutex, Once};
use std::thread;
use std::time::{Duration, Instant};
use taskchain::{CondvarPair, Kinds, Signal, TaskChain};
static SEQUENTIAL: LazyLock<Arc<CondvarPair>> =
LazyLock::new(|| Arc::new(CondvarPair::new(Signal::TRIGGER(Kinds::ANY))));
static INIT: Once = Once::new();
fn initialize() {
INIT.call_once(|| {
let _ = Builder::from_env(Env::default().default_filter_or("warn")).try_init();
});
}
fn default_tb(c: crate::Clock, t: Tick) {
c.on_tick(t).unwrap();
}
#[test]
#[ignore="Needs to be run separately"]
fn test_waiting_once() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let c1 = Clock::new(Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
assert_eq!(Ticker::num_of_broker(), 0);
Ticker::add_clock(c1, Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
assert_eq!(Ticker::num_of_broker(), 1);
let now = Instant::now();
let c2 = Clock::new(Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
assert_eq!(Ticker::num_of_broker(), 1);
Ticker::add_clock(c2, Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
assert!(now.elapsed().as_millis() < 100);
assert_eq!(Ticker::num_of_broker(), 2);
}
#[test]
#[ignore="Needs to be run separately"]
fn test_unlimited_direct_clocks_1() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
for i in 0..*MAX_CLOCKS * 2 {
let c1 = Clock::new(None).unwrap();
let r = Ticker::add_clock(c1, None);
assert!(r.is_ok());
if i < *MAX_DIRECT_CLOCKS {
assert!(r.unwrap().0);
} else {
assert!(!r.unwrap().0);
}
}
assert_eq!(Ticker::num_of_clock(), *MAX_DIRECT_CLOCKS);
}
#[test]
#[ignore="Needs to be run separately"]
fn test_unlimited_direct_clocks_2() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
for i in 0..*MAX_CLOCKS * 2 {
let c1 = Clock::new(None).unwrap();
let r = Ticker::add_clock(c1, None);
assert!(r.is_ok());
if i < *MAX_DIRECT_CLOCKS {
assert!(r.unwrap().0);
} else {
assert!(!r.unwrap().0);
}
let c2 = Clock::new(None).unwrap();
let _ = Ticker::add_clock(c2, Some(Box::new(move |c, t| default_tb(c, t))));
}
assert_eq!(Ticker::num_of_clock(), *MAX_CLOCKS);
}
#[test]
#[ignore="Needs to be run separately"]
fn test_unlimited_direct_clocks_3() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
for _i in 0..*MAX_CLOCKS * 2 {
let c2 = Clock::new(None).unwrap();
let _ = Ticker::add_clock(c2, Some(Box::new(move |c, t| default_tb(c, t))));
}
assert_eq!(Ticker::num_of_clock(), *MAX_CLOCKS);
for i in 0..*MAX_CLOCKS * 2 {
let c1 = Clock::new(None).unwrap();
let r = Ticker::add_clock(c1, None);
assert!(r.is_ok());
if i < *MAX_DIRECT_CLOCKS {
assert!(r.unwrap().0);
} else {
assert!(!r.unwrap().0);
}
}
assert_eq!(Ticker::num_of_clock(), *MAX_CLOCKS + *MAX_DIRECT_CLOCKS);
}
#[test]
#[ignore="Needs to be run separately"]
fn test_precision() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let cpu = num_cpus::get();
info!("CPU: {}", cpu);
let jh1 = Ticker::take_join_handle();
assert!(jh1.is_some());
let jh2 = Ticker::take_join_handle();
assert!(jh2.is_none());
let clock = Clock::new(Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
Ticker::add_clock(clock, Some(Box::new(move |c, t| default_tb(c, t)))).unwrap();
thread::sleep(Duration::from_secs(10));
let precision = Clock::precision(clock); assert!(precision >= 400 && precision <= 600)
}
fn test_load_balance(n: usize) {
for _ in 0..n {
if let Some(clock) = Clock::new(Some(Box::new(move |c, t| default_tb(c, t)))) {
let n = Ticker::num_of_clock() + 1;
let r = Ticker::add_clock(clock, Some(Box::new(move |c, t| default_tb(c, t))));
if n <= *ticker::MAX_CLOCKS {
assert!(r.is_ok());
} else {
assert!(r.is_err());
assert_eq!(Ticker::num_of_broker(), *NUM_OF_BROKER);
}
}
}
let (_, min) = Ticker::min_clocks();
let (_, max) = Ticker::max_clocks();
assert!((max - min) <= 1);
}
#[test]
fn test_lb_in_range() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let mut n = 1;
test_load_balance(n);
assert_eq!(Ticker::num_of_clock(), 1);
test_load_balance(num_cpus::get());
n += num_cpus::get();
assert_eq!(Ticker::num_of_clock(), n);
test_load_balance(*ticker::MAX_CLOCKS - 1 - n);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS - 1);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
}
#[test]
#[ignore="Needs to be run separately"] fn test_lb_out_of_range() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), 1);
test_load_balance(*ticker::MAX_CLOCKS - 2);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS - 1);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
}
#[test]
fn test_lb_out_of_range_2() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
test_load_balance(*ticker::MAX_CLOCKS);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
assert_eq!(Ticker::num_of_broker(), *ticker::NUM_OF_BROKER);
test_load_balance(1);
test_load_balance(1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
}
fn test_load_balance_with_clock(n: usize, vec: &Mutex<Vec<ClockPtr>>) {
for _ in 0..n {
if let Some(clock) = Clock::new(Some(Box::new(move |c, t| default_tb(c, t)))) {
let r = Ticker::add_clock(clock, Some(Box::new(move |c, t| default_tb(c, t))));
if r.is_ok() {
vec.lock().unwrap().push(clock);
}
}
}
assert!(vec.lock().unwrap().len() <= *ticker::MAX_CLOCKS);
}
#[test]
fn test_multi_threads() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let vec: Mutex<Vec<ClockPtr>> = Mutex::new(Vec::new());
thread::scope(|s| {
for _ in 0..*ticker::MAX_CLOCKS + 1 {
s.spawn(|| {
test_load_balance_with_clock(1, &vec);
});
}
s.spawn(|| {
thread::sleep(Duration::from_secs(10));
let (_, min) = Ticker::min_clocks();
let (_, max) = Ticker::max_clocks();
warn!(
"min: {}, max: {}, total: {}",
min,
max,
Ticker::num_of_clock()
);
assert!((max - min) <= 1);
assert_eq!(Ticker::num_of_clock(), *ticker::MAX_CLOCKS);
info!("Terminate the timers");
for v in &*vec.lock().unwrap() {
let precision = Clock::precision(*v);
assert!(precision >= 400 && precision <= 600);
}
Ticker::terminate();
});
});
let th = Ticker::take_join_handle().unwrap();
th.join().unwrap();
warn!("Test finished");
}
}