use std::{
cmp::{Ordering as CmpOrdering, Reverse},
collections::BinaryHeap,
panic::{AssertUnwindSafe, catch_unwind},
sync::{
Arc, Condvar, Mutex,
atomic::{AtomicBool, Ordering},
},
time::{Duration, Instant},
};
use crate::Cancellable;
use crate::stream::runtime::dispatch_stream_job;
pub(super) struct TimerDriver {
inner: Mutex<TimerInner>,
condvar: Condvar,
#[cfg(test)]
live: Arc<AtomicBool>,
#[cfg(test)]
thread_name: Arc<str>,
}
impl std::fmt::Debug for TimerDriver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimerDriver").finish_non_exhaustive()
}
}
struct TimerInner {
heap: BinaryHeap<Reverse<TimerEntry>>,
next_id: u64,
stopped: bool,
}
struct TimerEntry {
deadline: Instant,
id: u64,
kind: TimerKind,
}
enum TimerKind {
Once {
task: Option<Box<dyn FnOnce() + Send>>,
cancelled: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
},
FixedDelay {
task: Arc<dyn Fn() + Send + Sync>,
delay: Duration,
cancelled: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
},
FixedRate {
task: Arc<dyn Fn() + Send + Sync>,
interval: Duration,
cancelled: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
},
}
impl PartialEq for TimerEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline && self.id == other.id
}
}
impl Eq for TimerEntry {}
impl PartialOrd for TimerEntry {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
impl Ord for TimerEntry {
fn cmp(&self, other: &Self) -> CmpOrdering {
self.deadline
.cmp(&other.deadline)
.then_with(|| self.id.cmp(&other.id))
}
}
impl TimerDriver {
pub(super) fn launch(name: &str) -> Arc<Self> {
let driver = Arc::new(Self {
inner: Mutex::new(TimerInner {
heap: BinaryHeap::new(),
next_id: 0,
stopped: false,
}),
condvar: Condvar::new(),
#[cfg(test)]
live: Arc::new(AtomicBool::new(false)),
#[cfg(test)]
thread_name: Arc::from(name),
});
let driver_clone = Arc::clone(&driver);
std::thread::Builder::new()
.name(name.to_owned())
.spawn(move || {
#[cfg(test)]
struct LiveGuard {
live: Arc<AtomicBool>,
}
#[cfg(test)]
impl Drop for LiveGuard {
fn drop(&mut self) {
self.live.store(false, Ordering::SeqCst);
}
}
#[cfg(test)]
let _guard = LiveGuard {
live: Arc::clone(&driver_clone.live),
};
#[cfg(test)]
driver_clone.live.store(true, Ordering::SeqCst);
driver_clone.run()
})
.expect("timer driver thread spawns");
driver
}
fn run(self: Arc<Self>) {
loop {
let entry = {
let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
loop {
if inner.stopped {
return;
}
if let Some(Reverse(entry)) = inner.heap.peek() {
let now = Instant::now();
if entry.deadline <= now {
let Reverse(entry) = inner.heap.pop().expect("just peeked");
break entry;
}
let wait = entry.deadline - now;
let (next, timeout) = self
.condvar
.wait_timeout(inner, wait)
.unwrap_or_else(|p| p.into_inner());
inner = next;
if timeout.timed_out() {
}
} else {
if inner.stopped {
return;
}
inner = self.condvar.wait(inner).unwrap_or_else(|p| p.into_inner());
}
}
};
self.fire(entry);
}
}
fn fire(self: &Arc<Self>, entry: TimerEntry) {
let shutdown = match &entry.kind {
TimerKind::Once { shutdown, .. }
| TimerKind::FixedDelay { shutdown, .. }
| TimerKind::FixedRate { shutdown, .. } => shutdown,
};
if shutdown.load(Ordering::SeqCst) {
return;
}
match entry.kind {
TimerKind::Once {
task, cancelled, ..
} => {
if cancelled.load(Ordering::SeqCst) {
return;
}
if let Some(task) = task {
dispatch_stream_job(Box::new(move || {
let _ = catch_unwind(AssertUnwindSafe(task));
}));
}
}
TimerKind::FixedDelay {
task,
delay,
cancelled,
shutdown,
} => {
if cancelled.load(Ordering::SeqCst) {
return;
}
let driver = Arc::clone(self);
dispatch_stream_job(Box::new(move || {
if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
return;
}
let completed = catch_unwind(AssertUnwindSafe(|| task())).is_ok();
if !completed
|| cancelled.load(Ordering::SeqCst)
|| shutdown.load(Ordering::SeqCst)
{
return;
}
driver.schedule_kind(
Instant::now() + delay,
TimerKind::FixedDelay {
task,
delay,
cancelled,
shutdown,
},
);
}));
}
TimerKind::FixedRate {
task,
interval,
cancelled,
shutdown,
} => {
if cancelled.load(Ordering::SeqCst) {
return;
}
let driver = Arc::clone(self);
dispatch_stream_job(Box::new(move || {
if cancelled.load(Ordering::SeqCst) || shutdown.load(Ordering::SeqCst) {
return;
}
let completed = catch_unwind(AssertUnwindSafe(|| task())).is_ok();
if !completed
|| cancelled.load(Ordering::SeqCst)
|| shutdown.load(Ordering::SeqCst)
{
return;
}
driver.schedule_kind(
(entry.deadline + interval).max(Instant::now()),
TimerKind::FixedRate {
task,
interval,
cancelled,
shutdown,
},
);
}));
}
}
}
fn schedule_kind(&self, deadline: Instant, kind: TimerKind) {
let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
let id = inner.next_id;
inner.next_id = inner.next_id.wrapping_add(1);
inner.heap.push(Reverse(TimerEntry { deadline, id, kind }));
drop(inner);
self.condvar.notify_one();
}
pub(super) fn schedule_once(
self: &Arc<Self>,
delay: Duration,
task: impl FnOnce() + Send + 'static,
shutdown: Arc<AtomicBool>,
keep_alive: Arc<dyn Send + Sync>,
) -> Cancellable {
let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
let cancelled = Arc::clone(&cancellable.cancelled);
self.schedule_kind(
Instant::now() + delay,
TimerKind::Once {
task: Some(Box::new(task)),
cancelled,
shutdown,
},
);
cancellable
}
pub(super) fn schedule_with_fixed_delay(
self: &Arc<Self>,
initial_delay: Duration,
delay: Duration,
task: impl Fn() + Send + Sync + 'static,
shutdown: Arc<AtomicBool>,
keep_alive: Arc<dyn Send + Sync>,
) -> Cancellable {
assert!(delay > Duration::ZERO);
let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
let cancelled = Arc::clone(&cancellable.cancelled);
self.schedule_kind(
Instant::now() + initial_delay,
TimerKind::FixedDelay {
task: Arc::new(task),
delay,
cancelled,
shutdown,
},
);
cancellable
}
pub(super) fn schedule_at_fixed_rate(
self: &Arc<Self>,
initial_delay: Duration,
interval: Duration,
task: impl Fn() + Send + Sync + 'static,
shutdown: Arc<AtomicBool>,
keep_alive: Arc<dyn Send + Sync>,
) -> Cancellable {
assert!(interval > Duration::ZERO);
let cancellable = Cancellable::new_with_keep_alive(Some(keep_alive));
let cancelled = Arc::clone(&cancellable.cancelled);
self.schedule_kind(
Instant::now() + initial_delay,
TimerKind::FixedRate {
task: Arc::new(task),
interval,
cancelled,
shutdown,
},
);
cancellable
}
pub(super) fn stop(&self) {
let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
inner.stopped = true;
inner.heap.clear();
drop(inner);
self.condvar.notify_all();
}
}
#[cfg(test)]
impl TimerDriver {
pub(super) fn is_live(&self) -> bool {
self.live.load(Ordering::SeqCst)
}
pub(super) fn thread_name(&self) -> &str {
&self.thread_name
}
}