use std::{
collections::HashMap,
fmt,
sync::{
Arc, Weak,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use crate::arch::sync::Mutex;
use super::task::{Priority, Task};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct TimerId(u64);
impl TimerId {
#[must_use]
pub fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::Relaxed))
}
#[inline]
#[must_use]
pub const fn as_u64(self) -> u64 {
self.0
}
#[inline]
#[must_use]
pub const fn from_raw(value: u64) -> Self {
Self(value)
}
}
impl Default for TimerId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for TimerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Timer({})", self.0)
}
}
#[derive(Debug, Clone)]
pub struct TimerConfig {
pub delay: Duration,
pub interval: Option<Duration>,
pub priority: Priority,
}
impl Default for TimerConfig {
fn default() -> Self {
Self {
delay: Duration::ZERO,
interval: None,
priority: Priority::NORMAL,
}
}
}
pub(super) type BoxedOneShotCallback = Box<dyn FnOnce() + Send + 'static>;
pub(super) type BoxedPeriodicCallback = Arc<dyn Fn() + Send + Sync + 'static>;
pub(super) enum TimerWork {
OneShot(Option<BoxedOneShotCallback>),
Periodic(BoxedPeriodicCallback),
}
impl fmt::Debug for TimerWork {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OneShot(_) => write!(f, "OneShot(...)"),
Self::Periodic(_) => write!(f, "Periodic(...)"),
}
}
}
pub(super) struct TimerEntry {
pub(super) id: TimerId,
pub(super) deadline: Instant,
pub(super) interval: Option<Duration>,
pub(super) priority: Priority,
pub(super) work: TimerWork,
pub(super) cancelled: AtomicBool,
}
impl TimerEntry {
#[inline]
pub(super) fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Acquire)
}
#[inline]
pub(super) fn cancel(&self) {
self.cancelled.store(true, Ordering::Release);
}
}
impl fmt::Debug for TimerEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimerEntry")
.field("id", &self.id)
.field("deadline", &self.deadline)
.field("interval", &self.interval)
.field("priority", &self.priority)
.field("work", &self.work)
.field("cancelled", &self.is_cancelled())
.finish()
}
}
pub const DEFAULT_MAX_TIMERS: usize = 1024;
pub struct TimerWheel {
timers: Mutex<HashMap<TimerId, TimerEntry>>,
next_id: AtomicU64,
max_timers: usize,
dropped: AtomicU64,
}
impl TimerWheel {
#[must_use]
pub fn new() -> Self {
Self::with_max_timers(DEFAULT_MAX_TIMERS)
}
#[must_use]
pub fn with_max_timers(max_timers: usize) -> Self {
Self {
timers: Mutex::new(HashMap::new()),
next_id: AtomicU64::new(1),
max_timers,
dropped: AtomicU64::new(0),
}
}
pub fn schedule_oneshot<F>(
self: &Arc<Self>,
delay: Duration,
priority: Priority,
work: F,
) -> TimerHandle
where
F: FnOnce() + Send + 'static,
{
self.schedule_internal(delay, None, priority, TimerWork::OneShot(Some(Box::new(work))))
}
pub fn schedule_periodic<F>(
self: &Arc<Self>,
interval: Duration,
priority: Priority,
work: F,
) -> TimerHandle
where
F: Fn() + Send + Sync + 'static,
{
self.schedule_internal(
interval,
Some(interval),
priority,
TimerWork::Periodic(Arc::new(work)),
)
}
fn schedule_internal(
self: &Arc<Self>,
delay: Duration,
interval: Option<Duration>,
priority: Priority,
work: TimerWork,
) -> TimerHandle {
let id = TimerId(self.next_id.fetch_add(1, Ordering::Relaxed));
let deadline = Instant::now() + delay;
let mut timers = self.timers.lock();
if timers.len() >= self.max_timers {
self.dropped.fetch_add(1, Ordering::Relaxed);
return TimerHandle::failed(id);
}
let entry = TimerEntry {
id,
deadline,
interval,
priority,
work,
cancelled: AtomicBool::new(false),
};
timers.insert(id, entry);
drop(timers);
TimerHandle::new(id, Arc::downgrade(self))
}
pub fn cancel(&self, id: TimerId) -> bool {
let timers = self.timers.lock();
timers.get(&id).is_some_and(|entry| {
if entry.is_cancelled() {
false
} else {
entry.cancel();
true
}
})
}
pub fn is_pending(&self, id: TimerId) -> bool {
let timers = self.timers.lock();
timers.get(&id).is_some_and(|entry| !entry.is_cancelled())
}
#[cfg_attr(coverage_nightly, coverage(off))]
pub fn tick(&self, now: Instant) -> Vec<Task> {
let mut tasks = Vec::new();
let mut to_remove = Vec::new();
let mut to_reschedule = Vec::new();
{
let mut timers = self.timers.lock();
for (id, entry) in timers.iter_mut() {
if entry.is_cancelled() {
to_remove.push(*id);
continue;
}
if entry.deadline <= now {
match &mut entry.work {
TimerWork::OneShot(work_opt) => {
if let Some(work) = work_opt.take() {
let task = Task::with_priority(entry.priority, work);
tasks.push(task);
}
to_remove.push(*id);
}
TimerWork::Periodic(work) => {
let work_clone = Arc::clone(work);
let task = Task::with_priority(entry.priority, move || work_clone());
tasks.push(task);
if let Some(interval) = entry.interval {
to_reschedule.push((*id, now + interval));
}
}
}
}
}
for id in to_remove {
timers.remove(&id);
}
for (id, new_deadline) in to_reschedule {
if let Some(entry) = timers.get_mut(&id) {
entry.deadline = new_deadline;
}
}
}
tasks
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.timers.lock().len()
}
#[must_use]
pub fn dropped_count(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
}
impl Default for TimerWheel {
fn default() -> Self {
Self::new()
}
}
impl fmt::Debug for TimerWheel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimerWheel")
.field("pending", &self.pending_count())
.field("max_timers", &self.max_timers)
.field("dropped", &self.dropped_count())
.finish_non_exhaustive()
}
}
pub struct TimerHandle {
id: TimerId,
wheel: Option<Weak<TimerWheel>>,
failed: bool,
}
impl TimerHandle {
pub(crate) const fn new(id: TimerId, wheel: Weak<TimerWheel>) -> Self {
Self {
id,
wheel: Some(wheel),
failed: false,
}
}
pub(crate) const fn failed(id: TimerId) -> Self {
Self {
id,
wheel: None,
failed: true,
}
}
#[inline]
#[must_use]
pub const fn id(&self) -> TimerId {
self.id
}
#[inline]
#[must_use]
pub const fn is_failed(&self) -> bool {
self.failed
}
#[must_use]
pub fn detach(mut self) -> TimerId {
let id = self.id;
self.wheel = None; id
}
}
impl Drop for TimerHandle {
fn drop(&mut self) {
if let Some(ref weak) = self.wheel
&& let Some(wheel) = weak.upgrade()
{
wheel.cancel(self.id);
}
}
}
impl fmt::Debug for TimerHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimerHandle")
.field("id", &self.id)
.field("failed", &self.failed)
.field("wheel_alive", &self.wheel.as_ref().is_some_and(|w| w.strong_count() > 0))
.finish()
}
}