#![allow(dead_code)]
use std::time::Duration;
pub mod error;
mod inner;
use anyhow::Result as AnyResult;
pub use error::MyError;
pub use inner::Tick;
pub(self) use inner::{timer, ClockPtr};
use std::thread::JoinHandle;
#[cfg(test)]
use std::sync::Mutex;
pub type NonBlockTickBridge = dyn Fn(Clock, Tick) + Send + Sync;
pub struct Timer(timer::Timer);
impl Timer {
pub fn new<F>(clk: Clock, duration: Duration, f: F, name: String) -> AnyResult<Self>
where
F: FnOnce() + Send + Sync + 'static,
{
inner::Clock::new_timer(clk.0, duration, f, name).map(|t| Timer(t))
}
pub fn cancel(&mut self) -> AnyResult<()> {
self.0.cancel()
}
}
pub struct AutoDropTimer(timer::AutoDropTimer);
impl AutoDropTimer {
pub fn from_timer(t: Timer) -> Self {
Self(timer::AutoDropTimer::from_timer(t.0))
}
pub fn new<F>(clk: Clock, duration: Duration, f: F, name: String) -> AnyResult<Self>
where
F: FnOnce() + Send + Sync + 'static,
{
Timer::new(clk, duration, f, name).map(|t| Self::from_timer(t))
}
pub fn cancel(&mut self) -> AnyResult<()> {
self.0.cancel()
}
pub fn take(&mut self) -> Option<Timer> {
self.0.take().map(|t| Timer(t))
}
pub fn expired(&mut self) {
let _ = self.0.take();
}
}
#[derive(Clone, Copy)]
pub struct Clock(ClockPtr);
impl Clock {
pub fn new(tb: Option<Box<NonBlockTickBridge>>) -> Option<Self> {
inner::Clock::new(tb).map(|c| Self(c))
}
pub fn take_thread_handle() -> Option<JoinHandle<()>> {
inner::ticker::Ticker::take_join_handle()
}
pub fn terminate_for_process_exit() {
inner::ticker::Ticker::terminate();
}
pub fn on_tick(self, tick: Tick) -> AnyResult<()> {
inner::Clock::on_tick(self.0, tick)
}
pub fn len(self) -> isize {
inner::Clock::clock_len(self.0)
}
pub fn now(self) -> u64 {
inner::Clock::now(self.0).to_u64()
}
#[cfg(test)]
pub fn inner_address(self) -> *const usize {
self.0.get_ref() as *const Mutex<inner::Clock> as *const usize
}
}
#[cfg(all(test, not(feature = "mock_clock")))]
mod test {
use super::*;
extern crate env_logger;
use env_logger::{Builder, Env};
use log::{trace, warn};
use std::sync::atomic::AtomicIsize;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Condvar, LazyLock, Mutex, Once,
};
use std::thread::{self, sleep, JoinHandle};
use std::time::{Duration, Instant};
use taskchain::{CondvarPair, Kinds, Signal, TaskChain};
static INIT: Once = Once::new();
static mut SEQ: AtomicUsize = AtomicUsize::new(1);
static SEQUENTIAL: LazyLock<Arc<CondvarPair>> =
LazyLock::new(|| Arc::new(CondvarPair::new(Signal::TRIGGER(Kinds::ANY))));
const EXPIRES: [Duration; 5] = [
Duration::from_millis(100),
Duration::from_millis(300),
Duration::from_secs(1),
Duration::from_secs(5),
Duration::from_secs(10),
];
fn initialize() {
INIT.call_once(|| {
let _ = Builder::from_env(Env::default().default_filter_or("warn")).try_init();
});
}
fn single_timer_cancel(clk: Clock, duration: Duration, vec: &mut Vec<JoinHandle<()>>) {
let name = format!("TimerForCancel-{:?}", unsafe {
SEQ.fetch_add(1, Ordering::Relaxed)
})
.to_string();
let n2 = name.clone();
let count = Arc::new(AtomicUsize::new(0));
let local = count.clone();
let cv1 = Arc::new((Mutex::new(false), Condvar::new()));
let cv2 = cv1.clone();
let mut timer = Timer::new(
clk,
duration,
move || {
count.fetch_add(1, Ordering::SeqCst);
},
name,
)
.expect("Failed to create timer");
let verification = thread::Builder::new()
.name(n2)
.spawn(move || {
let (lock, cv) = &*cv1;
let mut go = lock.lock().unwrap();
while !*go {
go = cv.wait(go).unwrap();
}
sleep(duration - Duration::from_millis(100));
assert_eq!(local.load(Ordering::SeqCst), 0);
let r = timer.cancel();
assert!(r.is_ok());
sleep(Duration::from_secs(1));
assert_eq!(local.load(Ordering::SeqCst), 0); })
.unwrap();
{
let (lock, cv) = &*cv2;
*lock.lock().unwrap() = true;
cv.notify_all();
}
vec.push(verification);
}
fn single_timer_drop(clk: Clock, duration: Duration, vec: &mut Vec<JoinHandle<()>>) {
let name = format!("TimerForDrop-{:?}", unsafe {
SEQ.fetch_add(1, Ordering::Relaxed)
})
.to_string();
let n2 = name.clone();
let count = Arc::new(AtomicUsize::new(0));
let local = count.clone();
let cv1 = Arc::new((Mutex::new(false), Condvar::new()));
let cv2 = cv1.clone();
let timer = Timer::new(
clk,
duration,
move || {
count.fetch_add(1, Ordering::SeqCst);
},
name,
)
.expect("Failed to create timer");
let verification = thread::Builder::new()
.name(n2)
.spawn(move || {
let (lock, cv) = &*cv1;
let mut go = lock.lock().unwrap();
while !*go {
go = cv.wait(go).unwrap();
}
let _ = AutoDropTimer::from_timer(timer);
{
sleep(duration - Duration::from_millis(100));
assert_eq!(local.load(Ordering::SeqCst), 0);
}
sleep(Duration::from_secs(1));
assert_eq!(local.load(Ordering::SeqCst), 0); })
.unwrap();
{
let (lock, cv) = &*cv2;
*lock.lock().unwrap() = true;
cv.notify_all();
}
vec.push(verification);
}
fn single_timer_timeout(clk: Clock, duration: Duration, vec: &mut Vec<JoinHandle<()>>) {
let name = format!("TimerForExpire-{:?}", unsafe {
SEQ.fetch_add(1, Ordering::Relaxed)
})
.to_string();
let n2 = name.clone();
let count = Arc::new(AtomicUsize::new(0));
let local = count.clone();
let cv1 = Arc::new((Mutex::new(false), Condvar::new()));
let cv2 = cv1.clone();
let verification = thread::Builder::new()
.name(n2)
.spawn(move || {
let (lock, cv) = &*cv1;
let mut go = lock.lock().unwrap();
while !*go {
go = cv.wait(go).unwrap();
}
let now = Instant::now();
sleep(duration + Duration::from_millis(100));
let elapsed = now.elapsed();
if local.load(Ordering::SeqCst) == 0 {
warn!(
"timer of {:?} not triggered in {:?}, num of timers:{}",
duration,
elapsed,
clk.len()
);
}
})
.unwrap();
thread::spawn(move || {
Timer::new(
clk,
duration,
move || {
count.fetch_add(1, Ordering::SeqCst);
},
name,
)
.expect("Failed to create timer");
{
let (lock, cv) = &*cv2;
*lock.lock().unwrap() = true;
cv.notify_all();
}
sleep(Duration::from_secs(1));
});
vec.push(verification);
}
fn run_tests(
n_clock: usize,
repeat: usize,
num_exp: usize,
fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>>,
) {
let mut vec = Vec::<JoinHandle<()>>::new();
for _ in 0..n_clock {
if let Some(clk) = Clock::new(None) {
for duration in EXPIRES[0..num_exp].iter() {
for _ in 0..repeat {
for f in &fns {
f(clk, *duration, &mut vec);
}
}
sleep(Duration::from_secs(1));
}
} else {
break;
}
}
for h in vec {
trace!("joining {:?}", h);
h.join().unwrap();
}
}
#[test]
fn test_time_out() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
warn!("Application started");
let num_exp = EXPIRES.len();
let repeat: usize = 10;
warn!("Repeat {} times", repeat);
let mut fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>> = Vec::new();
fns.push(Box::new(single_timer_timeout));
run_tests(10, repeat, num_exp, fns);
}
#[test]
fn test_cancel() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
warn!("Application started");
let num_exp = EXPIRES.len();
let repeat = 10;
warn!("Repeat {} times", repeat);
let mut fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>> = Vec::new();
fns.push(Box::new(single_timer_cancel));
run_tests(10, repeat, num_exp, fns);
}
#[test]
fn test_drop() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
warn!("Application started");
let num_exp = EXPIRES.len();
let repeat: usize = 10;
warn!("Repeat {} times", repeat);
let mut fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>> = Vec::new();
fns.push(Box::new(single_timer_drop));
run_tests(10, repeat, num_exp, fns);
}
#[test]
fn test_combination() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
warn!("Application started");
let num_exp = EXPIRES.len();
let repeat: usize = 10;
warn!("Repeat {} times", repeat);
let mut fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>> = Vec::new();
fns.push(Box::new(single_timer_timeout));
fns.push(Box::new(single_timer_cancel));
fns.push(Box::new(single_timer_drop));
run_tests(10, repeat, num_exp, fns);
}
enum AppMessage {
Tick(Clock, Tick),
Terminate,
}
fn run_tests_with_collocated_clock(
n_clock: usize,
repeat: usize,
num_exp: usize,
fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>>,
) {
let mut vec = Vec::<JoinHandle<()>>::new();
let mut app_threads = Vec::<JoinHandle<()>>::new();
let mut txes = Vec::<crossbeam_channel::Sender<AppMessage>>::new();
let count = Arc::new(AtomicUsize::new(0));
for i in 0..n_clock {
let (tx, rx) = crossbeam_channel::bounded::<AppMessage>(1000);
let th = thread::Builder::new()
.name("Thread".to_string() + i.to_string().as_str())
.spawn(move || {
while let Ok(m) = rx.recv() {
match m {
AppMessage::Tick(c, t) => {
let _ = c.on_tick(t);
}
AppMessage::Terminate => {
return;
}
}
}
})
.unwrap();
app_threads.push(th);
txes.push(tx.clone());
let count = count.clone();
let tb = move |c: Clock, t: Tick| {
count.fetch_add(1, Ordering::Relaxed);
let _ = tx.try_send(AppMessage::Tick(c, t));
};
if let Some(clk) = Clock::new(Some(Box::new(tb))) {
for duration in EXPIRES[0..num_exp].iter() {
for _ in 0..repeat {
for f in &fns {
f(clk, *duration, &mut vec);
}
}
sleep(Duration::from_secs(1));
}
} else {
break;
}
}
for h in vec {
trace!("joining {:?}", h);
h.join().unwrap();
}
for tx in txes.iter() {
let _ = tx.send(AppMessage::Terminate);
}
assert!(count.load(Ordering::Relaxed) > 0);
for h in app_threads {
h.join().unwrap();
}
}
#[test]
fn test_combination_collocated() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
warn!("Application started");
let num_exp = EXPIRES.len();
let repeat: usize = 10;
warn!("Repeat {} times", repeat);
let mut fns: Vec<Box<dyn Fn(Clock, Duration, &mut Vec<JoinHandle<()>>)>> = Vec::new();
fns.push(Box::new(single_timer_timeout));
fns.push(Box::new(single_timer_cancel));
fns.push(Box::new(single_timer_drop));
run_tests_with_collocated_clock(10, repeat, num_exp, fns);
}
fn cycle_for_precision(
clk: Clock,
duration: Duration,
repeat: i32,
count: Arc<AtomicIsize>,
errors: Arc<Mutex<Vec<isize>>>,
jhs: &mut Mutex<Vec<JoinHandle<()>>>,
) {
let jh = thread::spawn(move || {
thread::scope(|s| {
thread::Builder::new()
.name(format!("ThreadAddingExpire-{}", unsafe {
SEQ.fetch_add(1, Ordering::Relaxed)
}))
.spawn_scoped(s, || {
for _ in 0..repeat {
let errors = errors.clone();
let count = count.clone();
let name = format!("TimerForExpire-{:?}", unsafe {
SEQ.fetch_add(1, Ordering::Relaxed)
})
.to_string();
let now = Instant::now();
count.fetch_add(1, Ordering::AcqRel);
Timer::new(
clk,
duration,
move || {
errors.lock().unwrap().push(
now.elapsed().as_micros() as isize
- duration.as_micros() as isize,
);
count.fetch_sub(1, Ordering::AcqRel);
},
name,
)
.expect("Failed to create timer");
}
})
.unwrap();
})
});
jhs.lock().unwrap().push(jh);
}
#[test]
fn test_precision() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let num_of_clock = 8;
let num_of_thread_group = 2;
let repeat = 2;
let count = Arc::new(AtomicIsize::new(0));
let errors = Arc::new(Mutex::new(Vec::<isize>::new()));
let mut jhs = Mutex::new(Vec::<JoinHandle<()>>::new());
for _ in 0..num_of_clock {
if let Some(clk) = Clock::new(None) {
for _ in 0..num_of_thread_group {
for d in EXPIRES {
cycle_for_precision(
clk,
d,
repeat,
count.clone(),
errors.clone(),
&mut jhs,
);
}
}
} else {
break;
}
}
loop {
sleep(Duration::from_secs(5));
let c = count.load(Ordering::Acquire);
if c == 0 {
break;
}
}
let lock = errors.lock().unwrap();
let errors = &*lock;
let s: usize = errors.iter().map(|&x| (x) * (x)).sum::<isize>() as usize;
let std = s / (errors.len() as usize);
let mean = errors.iter().sum::<isize>() / (errors.len() as isize);
let min = *errors.iter().min().unwrap();
let max = *errors.iter().max().unwrap();
drop(lock);
warn!(
"Derivations: std:{}ms, mean:{}ms, min:{}ms, max:{}ms",
(std as f64).sqrt() as usize / 1000,
mean / 1000,
min / 1000,
max / 1000
);
}
#[test]
fn test_autodrop_timer() {
let mut pl = TaskChain::new(
Arc::clone(&SEQUENTIAL),
Kinds::ANY,
Signal::TRIGGER(Kinds::ANY),
);
pl.wait(Duration::ZERO);
initialize();
let clk = Clock::new(None).unwrap();
let _adt = AutoDropTimer::new(clk, EXPIRES[0], || {}, "test".to_string()).unwrap();
assert_eq!(clk.len(), 1);
sleep(EXPIRES[0] + Duration::from_millis(100));
assert_eq!(clk.len(), 0);
{
let _adt = AutoDropTimer::new(clk, EXPIRES[1], || {}, "test".to_string()).unwrap();
assert_eq!(clk.len(), 1);
}
assert_eq!(clk.len(), 0);
let mut adt = AutoDropTimer::new(clk, EXPIRES[0], || {}, "test".to_string()).unwrap();
assert_eq!(clk.len(), 1);
adt.cancel().unwrap();
assert_eq!(clk.len(), 0);
{
let mut adt =
AutoDropTimer::new(clk, Duration::from_secs(1000), || {}, "test".to_string())
.unwrap();
assert_eq!(clk.len(), 1);
let _t = adt.take().unwrap();
let _ = adt.cancel();
}
assert_eq!(clk.len(), 1);
{
let mut adt =
AutoDropTimer::new(clk, Duration::from_secs(1000), || {}, "test".to_string())
.unwrap();
assert_eq!(clk.len(), 2);
let _ = adt.cancel();
let _ = adt.cancel();
let _ = adt.cancel();
}
assert_eq!(clk.len(), 1);
}
}