use crate::queue::Notifier;
use crate::NotificationId;
use crossbeam_queue::SegQueue;
use std::cmp::Ordering as CmpOrdering;
use std::collections::{BTreeSet, HashSet};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::ops::{Add, Sub};
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::thread::JoinHandle;
use std::{
fmt, thread,
time::{Duration, Instant},
};
#[derive(Debug)]
pub struct NotificationScheduler {
notifier: Arc<dyn Notifier>,
scheduler: Arc<Scheduler>,
}
impl NotificationScheduler {
pub fn new(notifier: Arc<dyn Notifier>, scheduler: Arc<Scheduler>) -> NotificationScheduler {
NotificationScheduler {
notifier,
scheduler,
}
}
pub fn notify_with_fixed_interval<I: Into<Option<Duration>>>(
&self,
id: NotificationId,
interval: Duration,
initial_delay: I,
name: Option<String>,
) -> ScheduleEntryId {
let notifier = Arc::clone(&self.notifier);
let entry = ScheduleEntry::with_interval(interval, initial_delay, name, move || {
let _ = notifier.notify(id);
});
let id = entry.id;
self.scheduler.schedule(entry);
id
}
pub fn notify_once_after_delay(
&self,
id: NotificationId,
delay: Duration,
name: Option<String>,
) -> ScheduleEntryId {
let notifier = Arc::clone(&self.notifier);
let entry = ScheduleEntry::one_time(delay, name, move || {
let _ = notifier.notify(id);
});
let id = entry.id;
self.scheduler.schedule(entry);
id
}
pub fn cancel(&self, id: ScheduleEntryId) {
self.scheduler.cancel(id);
}
}
type Callback = dyn Fn() + Send + Sync + 'static;
#[derive(Clone)]
pub struct ScheduleEntry {
start: Instant,
interval: Option<Duration>,
callback: Arc<Callback>,
pub name: Option<String>,
pub id: ScheduleEntryId,
}
impl ScheduleEntry {
pub fn with_interval<I, F>(
interval: Duration,
initial_delay: I,
name: Option<String>,
callback: F,
) -> ScheduleEntry
where
I: Into<Option<Duration>>,
F: Fn() + Send + Sync + 'static,
{
let now = Instant::now();
ScheduleEntry {
start: initial_delay.into().map(|d| now.add(d)).unwrap_or(now),
interval: Some(interval),
callback: Arc::new(callback),
name,
id: ScheduleEntryId::gen_next(),
}
}
pub fn one_time<F>(delay: Duration, name: Option<String>, callback: F) -> ScheduleEntry
where
F: Fn() + Send + Sync + 'static,
{
ScheduleEntry {
start: Instant::now().add(delay),
interval: None,
callback: Arc::new(callback),
name,
id: ScheduleEntryId::gen_next(),
}
}
}
impl fmt::Debug for ScheduleEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ScheduleEntry")
.field("start", &self.start)
.field("interval", &self.interval)
.field("name", &self.name)
.field("id", &self.id)
.finish()
}
}
impl Hash for ScheduleEntry {
fn hash<H: Hasher>(&self, hasher: &mut H) {
self.start.hash(hasher);
self.id.hash(hasher);
}
}
impl Eq for ScheduleEntry {}
impl PartialEq for ScheduleEntry {
fn eq(&self, other: &Self) -> bool {
self.start == other.start && self.id == other.id
}
}
impl Ord for ScheduleEntry {
fn cmp(&self, other: &Self) -> CmpOrdering {
self.start.cmp(&other.start).then(self.id.cmp(&other.id))
}
}
impl PartialOrd for ScheduleEntry {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
static NEXT_SCHEDULE_ENTRY_ID: AtomicU32 = AtomicU32::new(1);
#[derive(Copy, Clone, Debug, PartialEq, Ord, PartialOrd, Eq, Hash)]
pub struct ScheduleEntryId(u32);
impl ScheduleEntryId {
pub fn gen_next() -> ScheduleEntryId {
let id = NEXT_SCHEDULE_ENTRY_ID.fetch_add(1, Ordering::SeqCst);
ScheduleEntryId(id)
}
pub fn id(&self) -> u32 {
self.0
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum SchedulerStatus {
Active,
Parked,
ParkedTimeout,
}
#[derive(Debug)]
pub struct Scheduler {
shutdown: Arc<AtomicBool>,
thread_handle: JoinHandle<()>,
schedule_queue: Arc<SegQueue<ScheduleEntry>>,
cancel_queue: Arc<SegQueue<ScheduleEntryId>>,
name: String,
status: Arc<RwLock<SchedulerStatus>>,
entry_count: Arc<AtomicU32>,
}
impl Default for Scheduler {
fn default() -> Scheduler {
Scheduler::new(None)
}
}
static SCHEDULER_THREAD_ID: AtomicU32 = AtomicU32::new(1);
impl Scheduler {
pub fn name(&self) -> &str {
self.name.as_str()
}
pub fn schedule(&self, entry: ScheduleEntry) {
self.schedule_queue.push(entry);
self.thread_handle.thread().unpark();
}
pub fn cancel(&self, id: ScheduleEntryId) {
self.cancel_queue.push(id);
self.thread_handle.thread().unpark();
}
pub fn status(&self) -> SchedulerStatus {
*(self.status.read().unwrap())
}
pub fn entry_count(&self) -> u32 {
self.entry_count.load(Ordering::SeqCst)
}
pub fn new(name: Option<String>) -> Scheduler {
let t_id = SCHEDULER_THREAD_ID.fetch_add(1, Ordering::SeqCst);
let name_prefix = "mio-misc-scheduler";
let name = name
.map(|n| format!("{}-{}-{}", name_prefix, n, t_id))
.unwrap_or_else(|| format!("{}-{}", name_prefix, t_id));
let name_clone = name.clone();
let shut_down = Arc::new(AtomicBool::new(false));
let shutdown_clone = Arc::clone(&shut_down);
let entry_count = Arc::new(AtomicU32::new(0));
let entry_count_clone = Arc::clone(&entry_count);
let schedule_queue = Arc::new(SegQueue::new());
let schedule_queue_clone = Arc::clone(&schedule_queue);
let cancel_queue = Arc::new(SegQueue::new());
let cancel_queue_clone = Arc::clone(&cancel_queue);
let status = Arc::new(RwLock::new(SchedulerStatus::Active));
let status_clone = Arc::clone(&status);
let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let mut entries: BTreeSet<ScheduleEntry> = BTreeSet::new();
let mut entries_to_cancel: HashSet<ScheduleEntryId> = HashSet::new();
while !shut_down.load(Ordering::SeqCst) {
while let Some(entry_id) = cancel_queue.pop() {
trace!(
"{}: cancelling scheduler entry with id {:?};",
name,
entry_id
);
let _ = entries_to_cancel.insert(entry_id);
}
if let Some(entry) = schedule_queue.pop() {
trace!("{}: scheduling entry; {:?};", name, entry);
if entries.insert(entry) {
entry_count.fetch_add(1, Ordering::SeqCst);
}
}
if let Some(entry) = entries.iter().next().cloned() {
let now = Instant::now();
if now.ge(&entry.start) {
entries.remove(&entry);
if !entries_to_cancel.contains(&entry.id) {
trace!("{}: executing scheduler entry; {:?}", name, entry);
let cb = Arc::clone(&entry.callback);
cb();
if let Some(interval) = entry.interval {
let updated_entry = ScheduleEntry {
start: Instant::now().add(interval),
interval: entry.interval,
callback: entry.callback,
name: entry.name,
id: entry.id,
};
entries.insert(updated_entry);
}
} else {
trace!("{}: cancelling scheduler entry; {:?}", name, entry);
if entries_to_cancel.remove(&entry.id) {
entry_count.fetch_sub(1, Ordering::SeqCst);
}
}
} else {
let timeout_dur = entry.start.sub(now);
trace!("{}: parking scheduler for {:?}", name, timeout_dur);
*status.write().unwrap() = SchedulerStatus::ParkedTimeout;
thread::park_timeout(timeout_dur);
*status.write().unwrap() = SchedulerStatus::Active;
}
} else {
trace!("{}: parking scheduler until being un-parked", name);
*status.write().unwrap() = SchedulerStatus::Parked;
thread::park();
*status.write().unwrap() = SchedulerStatus::Active;
}
}
})
.unwrap();
Scheduler {
shutdown: shutdown_clone,
thread_handle,
schedule_queue: schedule_queue_clone,
cancel_queue: cancel_queue_clone,
name: name_clone,
status: status_clone,
entry_count: entry_count_clone,
}
}
}
impl Drop for Scheduler {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::SeqCst);
self.thread_handle.thread().unpark();
}
}