use super::task::{Task, TimerTaskStatus};
use crate::asserted_short_name;
use log::{debug, info, log_enabled, warn};
use std::{
collections::BinaryHeap,
sync::mpsc::{channel, Receiver, Sender, TryRecvError},
thread::{park, park_timeout, Builder, JoinHandle},
time::Duration,
};
#[derive(Debug)]
enum Operation {
Execute(Task),
Terminate,
Clear,
}
pub struct Timer {
tx_task: Sender<Operation>,
jh_executor: JoinHandle<()>,
}
impl Timer {
pub fn new(name: &str) -> Self {
let (tx_task, rx_task) = channel();
let jh_executor = Executor::new(name, rx_task).spawn();
Timer { tx_task, jh_executor }
}
pub fn schedule<T: FnMut() -> TimerTaskStatus + Send + 'static>(&self, name: &str, repeat: Duration, task: T) {
let task = Box::new(task);
let task_schedule = Task::new(name, repeat, task);
self.tx_task.send(Operation::Execute(task_schedule)).unwrap();
self.jh_executor.thread().unpark();
}
pub fn terminate(self) {
self.tx_task.send(Operation::Terminate).unwrap();
self.jh_executor.thread().unpark();
self.jh_executor.join().unwrap();
}
pub fn clear(&self) {
self.tx_task.send(Operation::Clear).unwrap();
self.jh_executor.thread().unpark();
}
}
struct Executor {
name: String,
rx_task: Receiver<Operation>,
tasks: BinaryHeap<Task>,
}
impl Executor {
pub fn new(name: &str, rx_task: Receiver<Operation>) -> Self {
Executor {
name: name.to_owned(),
rx_task,
tasks: BinaryHeap::new(),
}
}
pub fn spawn(self) -> JoinHandle<()> {
Builder::new()
.name(self.name.to_string())
.spawn({
let mut e = self;
move || Executor::run(&mut e)
})
.unwrap()
}
fn run(&mut self) {
loop {
match self.rx_task.try_recv() {
Ok(Operation::Execute(task)) => {
if log_enabled!(log::Level::Info) {
info!("Adding Operation::Execute({})", task);
}
self.tasks.push(task);
}
Ok(operation) => {
if log_enabled!(log::Level::Info) {
info!("{:?} {} and dropping all schedules tasks", operation, asserted_short_name!("Executor", Self));
}
for task in self.tasks.drain() {
if log_enabled!(log::Level::Info) {
info!("Dropping task: {}", task);
drop(task);
}
}
if matches!(operation, Operation::Terminate) {
break;
}
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => {
warn!("{} rx channel is disconnected, looks like you dropped tx. Terminating executing thread", asserted_short_name!("Executor", Self));
for task in self.tasks.drain() {
warn!("Dropping task: {}", task);
}
break;
}
}
use TimerTaskStatus::{Completed, RetryAfter, Terminate};
match self.tasks.pop() {
Some(mut task) => {
let now = chrono::Utc::now();
if task.execute_at() > &now {
let timeout = (*task.execute_at() - now).to_std().unwrap();
if log_enabled!(log::Level::Debug) {
debug!("Not due task: {}. Parking thread for: {:?}", task, timeout);
}
self.tasks.push(task);
park_timeout(timeout);
} else {
if log_enabled!(log::Level::Debug) {
debug!("Executing task: {} ", task);
}
match task.execute() {
Completed => {
if log_enabled!(log::Level::Debug) {
debug!("{:?} task: {}", Completed, task);
}
task.reschedule();
self.tasks.push(task);
}
Terminate => {
if log_enabled!(log::Level::Info) {
info!("{:?} task: {}, will no longer schedule. Task is getting dropped", Terminate, task);
}
drop(task);
}
RetryAfter(retry_after) => {
if log_enabled!(log::Level::Debug) {
debug!("{:?} task: {}", RetryAfter(retry_after), task);
}
task.reschedule_with_interval(retry_after);
self.tasks.push(task);
}
}
}
}
None => {
debug!("Executor has no tasks to run and will park thread until new task added.");
park();
}
}
}
}
}
#[cfg(test)]
mod test {
use more_asserts::assert_lt;
use super::*;
use crate::unittest::setup;
use std::{
sync::atomic::{AtomicU32, Ordering},
thread::sleep,
time::Instant,
};
#[test]
fn test_timer_terminate() {
setup::log::configure_level(log::LevelFilter::Debug);
let timer = Timer::new("unittest");
static TASK1_REMAINING_ITERATIONS: AtomicU32 = AtomicU32::new(5);
static TASK2_REMAINING_ITERATIONS: AtomicU32 = AtomicU32::new(3);
static REPEAT_INTERVAL: Duration = Duration::from_millis(100);
timer.schedule("task1", REPEAT_INTERVAL, || {
let iteration_remaining = TASK1_REMAINING_ITERATIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("task1, iteration {}", iteration_remaining + 1);
if iteration_remaining == 0 {
TimerTaskStatus::Terminate
} else {
TimerTaskStatus::Completed
}
});
timer.schedule("task2", REPEAT_INTERVAL, || {
let iteration_remaining = TASK2_REMAINING_ITERATIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("task2, iterations_remaining {}", iteration_remaining);
if iteration_remaining == 0 {
TimerTaskStatus::Terminate
} else {
TimerTaskStatus::Completed
}
});
let now = Instant::now();
while TASK1_REMAINING_ITERATIONS.load(Ordering::Relaxed) > 0 {}
let elapsed = now.elapsed();
timer.terminate();
let mut expected_completion = REPEAT_INTERVAL * 5;
expected_completion = expected_completion + expected_completion / 10; info!("elapsed: {:?}", elapsed);
info!("expected_completion: {:?}", expected_completion);
assert_lt!(elapsed, expected_completion);
assert_eq!(TASK1_REMAINING_ITERATIONS.load(Ordering::Relaxed), 0);
assert_eq!(TASK2_REMAINING_ITERATIONS.load(Ordering::Relaxed), 0);
}
#[test]
fn test_timer_clear() {
setup::log::configure_level(log::LevelFilter::Debug);
let timer = Timer::new("unittest");
static TASK1_REMAINING_ITERATIONS: AtomicU32 = AtomicU32::new(u32::MAX);
static TASK2_REMAINING_ITERATIONS: AtomicU32 = AtomicU32::new(3);
static REPEAT_INTERVAL: Duration = Duration::from_millis(100);
timer.schedule("task1", REPEAT_INTERVAL, || {
let iteration_remaining = TASK1_REMAINING_ITERATIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("task1, iteration {}", iteration_remaining + 1);
TimerTaskStatus::Completed
});
let now = Instant::now();
while TASK1_REMAINING_ITERATIONS.load(Ordering::Relaxed) > u32::MAX - 5 {}
let elapsed = now.elapsed();
timer.clear();
info!("elapsed: {:?}", elapsed);
sleep(REPEAT_INTERVAL * 2);
let task1_remaining_after_clear = TASK1_REMAINING_ITERATIONS.load(Ordering::Relaxed);
info!("task1_remaining_after_clear: {}", task1_remaining_after_clear);
sleep(REPEAT_INTERVAL * 2);
let task1_remaining_after_sleep = TASK1_REMAINING_ITERATIONS.load(Ordering::Relaxed);
info!("task1_remaining_after_sleep: {}", task1_remaining_after_sleep);
assert_eq!(task1_remaining_after_clear, task1_remaining_after_sleep);
timer.schedule("task2", REPEAT_INTERVAL, || {
let iteration_remaining = TASK2_REMAINING_ITERATIONS.fetch_sub(1, Ordering::Relaxed) - 1;
info!("task2, iterations_remaining {}", iteration_remaining);
if iteration_remaining == 0 {
TimerTaskStatus::Terminate
} else {
TimerTaskStatus::Completed
}
});
sleep(REPEAT_INTERVAL * 2);
}
}