use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::SystemTime;
use parking_lot::Mutex;
use thiserror::Error;
use crate::qname::QName;
use crate::traits::background::{CancellationToken, Schedule};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum SchedulerJobStatus {
Pending,
Running,
Idle,
FailedRetrying,
Cancelled,
}
#[derive(Clone, Debug)]
pub struct SchedulerJobRecord {
pub id: QName,
pub status: SchedulerJobStatus,
pub next_fire_at: Option<SystemTime>,
pub last_started_at: Option<SystemTime>,
pub last_finished_at: Option<SystemTime>,
pub consecutive_failures: u32,
pub schedule: Schedule,
pub cancel: CancellationToken,
}
impl SchedulerJobRecord {
#[must_use]
pub fn pending(id: QName) -> Self {
Self::pending_with_schedule(id, Schedule::Manual, SystemTime::now())
}
#[must_use]
pub fn pending_with_schedule(id: QName, schedule: Schedule, now: SystemTime) -> Self {
let next_fire_at = schedule.next_after(now);
Self {
id,
status: SchedulerJobStatus::Pending,
next_fire_at,
last_started_at: None,
last_finished_at: None,
consecutive_failures: 0,
schedule,
cancel: CancellationToken::new(),
}
}
}
#[derive(Debug)]
pub struct Scheduler {
records: Mutex<Vec<SchedulerJobRecord>>,
paused: AtomicBool,
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}
impl Scheduler {
#[must_use]
pub fn new() -> Self {
Self {
records: Mutex::new(Vec::new()),
paused: AtomicBool::new(true),
}
}
pub fn add_job(&self, id: QName) {
self.add_scheduled_job(id, Schedule::Manual);
}
pub fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
let now = SystemTime::now();
self.records
.lock()
.push(SchedulerJobRecord::pending_with_schedule(id, schedule, now));
}
pub fn cancel(&self, id: &QName) -> bool {
let mut records = self.records.lock();
let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
return false;
};
r.status = SchedulerJobStatus::Cancelled;
r.cancel.cancel();
true
}
#[must_use]
pub fn list(&self) -> Vec<SchedulerJobRecord> {
self.records.lock().clone()
}
#[must_use]
pub fn cancel_token_for(&self, id: &QName) -> Option<CancellationToken> {
self.records
.lock()
.iter()
.find(|r| &r.id == id)
.map(|r| r.cancel.clone())
}
pub fn resume(&self) {
self.paused.store(false, Ordering::SeqCst);
}
pub fn tick(&self) -> Vec<QName> {
self.tick_at(SystemTime::now())
}
pub fn tick_at(&self, now: SystemTime) -> Vec<QName> {
if self.is_paused() {
return Vec::new();
}
let mut records = self.records.lock();
let mut due: Vec<QName> = Vec::new();
for r in records.iter_mut() {
if !matches!(r.status, SchedulerJobStatus::Pending) {
continue;
}
if r.cancel.is_cancelled() {
r.status = SchedulerJobStatus::Cancelled;
continue;
}
if let Some(fire_at) = r.next_fire_at
&& fire_at > now
{
continue;
}
r.status = SchedulerJobStatus::Running;
r.last_started_at = Some(now);
due.push(r.id.clone());
}
due
}
#[must_use]
pub fn running_count(&self) -> usize {
self.records
.lock()
.iter()
.filter(|r| matches!(r.status, SchedulerJobStatus::Running))
.count()
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.records
.lock()
.iter()
.filter(|r| matches!(r.status, SchedulerJobStatus::Pending))
.count()
}
pub fn requeue_orphaned_runs(&self) -> usize {
let mut records = self.records.lock();
let mut count = 0;
for r in records.iter_mut() {
if matches!(r.status, SchedulerJobStatus::Running) {
r.status = SchedulerJobStatus::Pending;
count += 1;
}
}
count
}
pub fn pause(&self) {
self.paused.store(true, Ordering::SeqCst);
}
#[must_use]
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::SeqCst)
}
pub fn mark_started(&self, id: &QName) {
let mut records = self.records.lock();
if let Some(r) = records.iter_mut().find(|r| &r.id == id) {
r.status = SchedulerJobStatus::Running;
r.last_started_at = Some(SystemTime::now());
}
}
pub fn mark_finished(&self, id: &QName, success: bool) {
let now = SystemTime::now();
let mut records = self.records.lock();
let Some(r) = records.iter_mut().find(|r| &r.id == id) else {
return;
};
r.last_finished_at = Some(now);
let next = r.schedule.next_after(now);
let has_next =
matches!(r.schedule, Schedule::Periodic(_) | Schedule::Cron(_)) && next.is_some();
if has_next {
r.status = SchedulerJobStatus::Pending;
r.next_fire_at = next;
} else {
r.status = if success {
SchedulerJobStatus::Idle
} else {
SchedulerJobStatus::FailedRetrying
};
if success {
r.next_fire_at = None;
}
}
if success {
r.consecutive_failures = 0;
} else {
r.consecutive_failures = r.consecutive_failures.saturating_add(1);
}
}
}
impl PartialEq for SchedulerJobRecord {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
&& self.status == other.status
&& self.next_fire_at == other.next_fire_at
&& self.last_started_at == other.last_started_at
&& self.last_finished_at == other.last_finished_at
&& self.consecutive_failures == other.consecutive_failures
&& self.schedule == other.schedule
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SchedulerPersistenceError {
#[error("scheduler persistence: {0}")]
Backend(String),
}
pub trait SchedulerPersistence: Send + Sync + std::fmt::Debug {
fn record_scheduled(
&self,
_id: &QName,
_schedule: &Schedule,
) -> Result<(), SchedulerPersistenceError> {
Ok(())
}
fn record_started(
&self,
id: &QName,
started_at: SystemTime,
) -> Result<(), SchedulerPersistenceError>;
fn record_finished(
&self,
id: &QName,
finished_at: SystemTime,
success: bool,
) -> Result<(), SchedulerPersistenceError>;
fn cancel(&self, id: &QName) -> Result<(), SchedulerPersistenceError>;
fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError>;
fn flush_checkpoint(&self) -> Result<(), SchedulerPersistenceError> {
Ok(())
}
}
#[derive(Debug, Default)]
pub struct MemoryPersistence;
impl SchedulerPersistence for MemoryPersistence {
fn record_started(
&self,
_id: &QName,
_started_at: SystemTime,
) -> Result<(), SchedulerPersistenceError> {
Ok(())
}
fn record_finished(
&self,
_id: &QName,
_finished_at: SystemTime,
_success: bool,
) -> Result<(), SchedulerPersistenceError> {
Ok(())
}
fn cancel(&self, _id: &QName) -> Result<(), SchedulerPersistenceError> {
Ok(())
}
fn load_all(&self) -> Result<Vec<SchedulerJobRecord>, SchedulerPersistenceError> {
Ok(Vec::new())
}
}
pub trait SchedulerControl: Send + Sync + std::fmt::Debug {
fn add_scheduled_job(&self, id: QName, schedule: Schedule);
fn cancel(&self, id: &QName) -> bool;
fn list(&self) -> Vec<SchedulerJobRecord>;
fn submit_cypher(&self, _cypher: &str) -> Result<(), crate::FnError> {
Err(crate::FnError::new(
0xD20,
"scheduler: submit_cypher not supported by this control (no host wired)",
))
}
fn flush_checkpoint(&self) -> Result<(), crate::FnError> {
Ok(())
}
}
impl SchedulerControl for Scheduler {
fn add_scheduled_job(&self, id: QName, schedule: Schedule) {
Self::add_scheduled_job(self, id, schedule);
}
fn cancel(&self, id: &QName) -> bool {
Self::cancel(self, id)
}
fn list(&self) -> Vec<SchedulerJobRecord> {
Self::list(self)
}
}
#[derive(Clone, Debug)]
pub struct SchedulerHandle {
inner: Arc<Scheduler>,
}
impl SchedulerHandle {
#[must_use]
pub fn new(scheduler: Arc<Scheduler>) -> Self {
Self { inner: scheduler }
}
#[must_use]
pub fn scheduler(&self) -> &Scheduler {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn scheduler_default_is_paused() {
let s = Scheduler::new();
assert!(s.is_paused());
assert!(s.list().is_empty());
}
#[test]
fn scheduler_resume_pause_round_trip() {
let s = Scheduler::new();
s.resume();
assert!(!s.is_paused());
s.pause();
assert!(s.is_paused());
}
#[test]
fn add_job_and_cancel() {
let s = Scheduler::new();
s.add_job(QName::builtin("ttl_sweep"));
assert_eq!(s.list().len(), 1);
assert!(s.cancel(&QName::builtin("ttl_sweep")));
let recs = s.list();
assert_eq!(recs[0].status, SchedulerJobStatus::Cancelled);
assert!(recs[0].cancel.is_cancelled());
}
#[test]
fn cancel_unknown_job_returns_false() {
let s = Scheduler::new();
assert!(!s.cancel(&QName::builtin("nope")));
}
#[test]
fn run_lifecycle_increments_failures_then_resets() {
let s = Scheduler::new();
let id = QName::builtin("flaky");
s.add_job(id.clone());
s.mark_started(&id);
s.mark_finished(&id, false);
s.mark_started(&id);
s.mark_finished(&id, false);
let recs = s.list();
assert_eq!(recs[0].consecutive_failures, 2);
assert_eq!(recs[0].status, SchedulerJobStatus::FailedRetrying);
s.mark_started(&id);
s.mark_finished(&id, true);
let recs = s.list();
assert_eq!(recs[0].consecutive_failures, 0);
assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
}
#[test]
fn tick_returns_empty_when_paused() {
let s = Scheduler::new();
s.add_job(QName::builtin("job1"));
assert!(s.tick().is_empty());
}
#[test]
fn tick_dispatches_pending_jobs_when_resumed() {
let s = Scheduler::new();
s.add_job(QName::builtin("job1"));
s.add_job(QName::builtin("job2"));
s.resume();
let due = s.tick();
assert_eq!(due.len(), 2);
assert!(due.iter().any(|q| q.local() == "job1"));
assert!(due.iter().any(|q| q.local() == "job2"));
assert_eq!(s.running_count(), 2);
assert_eq!(s.pending_count(), 0);
}
#[test]
fn tick_skips_cancelled_jobs() {
let s = Scheduler::new();
s.add_job(QName::builtin("doomed"));
s.cancel(&QName::builtin("doomed"));
s.resume();
let due = s.tick();
assert!(due.is_empty(), "cancelled job should not be dispatched");
}
#[test]
fn second_tick_returns_empty_until_jobs_marked_pending() {
let s = Scheduler::new();
s.add_job(QName::builtin("once"));
s.resume();
assert_eq!(s.tick().len(), 1);
assert!(s.tick().is_empty());
s.mark_finished(&QName::builtin("once"), true);
assert!(s.tick().is_empty());
}
#[test]
fn requeue_orphaned_runs_moves_running_back_to_pending() {
let s = Scheduler::new();
s.add_job(QName::builtin("orphan"));
s.resume();
s.tick();
assert_eq!(s.running_count(), 1);
let count = s.requeue_orphaned_runs();
assert_eq!(count, 1);
assert_eq!(s.running_count(), 0);
assert_eq!(s.pending_count(), 1);
assert_eq!(s.tick().len(), 1);
}
#[test]
fn schedule_once_fires_only_after_instant() {
use std::time::Duration;
let s = Scheduler::new();
s.resume();
let future = SystemTime::now() + Duration::from_secs(60);
s.add_scheduled_job(QName::builtin("once"), Schedule::Once(future));
let due_now = s.tick_at(SystemTime::now());
assert!(
due_now.is_empty(),
"Once job should not fire before its instant"
);
let due_after = s.tick_at(future + Duration::from_secs(1));
assert_eq!(due_after.len(), 1);
assert_eq!(due_after[0].local(), "once");
}
#[test]
fn schedule_once_does_not_reschedule_after_finish() {
use std::time::Duration;
let s = Scheduler::new();
s.resume();
let past = SystemTime::now() - Duration::from_secs(1);
s.add_scheduled_job(QName::builtin("once"), Schedule::Once(past));
let due = s.tick_at(SystemTime::now());
assert_eq!(due.len(), 1);
s.mark_finished(&QName::builtin("once"), true);
let recs = s.list();
assert_eq!(recs[0].status, SchedulerJobStatus::Idle);
assert!(recs[0].next_fire_at.is_none());
assert!(
s.tick_at(SystemTime::now() + Duration::from_secs(3600))
.is_empty()
);
}
#[test]
fn schedule_periodic_reschedules_after_finish() {
use std::time::Duration;
let s = Scheduler::new();
s.resume();
let start = SystemTime::now();
s.add_scheduled_job(
QName::builtin("ticker"),
Schedule::Periodic(Duration::from_secs(10)),
);
assert!(s.tick_at(start + Duration::from_secs(5)).is_empty());
let due = s.tick_at(start + Duration::from_secs(11));
assert_eq!(due.len(), 1);
s.mark_finished(&QName::builtin("ticker"), true);
let recs = s.list();
assert_eq!(recs[0].status, SchedulerJobStatus::Pending);
assert!(recs[0].next_fire_at.is_some());
}
#[test]
fn schedule_cron_emits_future_fire() {
use std::time::Duration;
let s = Scheduler::new();
s.resume();
s.add_scheduled_job(
QName::builtin("every_min"),
Schedule::Cron(smol_str::SmolStr::new("0 * * * * *")),
);
let recs = s.list();
let next = recs[0].next_fire_at.expect("cron must produce a next fire");
assert!(next > SystemTime::now() - Duration::from_secs(1));
}
#[test]
fn manual_schedule_is_immediately_due() {
let s = Scheduler::new();
s.resume();
s.add_scheduled_job(QName::builtin("legacy"), Schedule::Manual);
let due = s.tick();
assert_eq!(due.len(), 1);
assert_eq!(due[0].local(), "legacy");
}
#[test]
fn pending_count_and_running_count_track_lifecycle() {
let s = Scheduler::new();
for n in 0..5 {
s.add_job(QName::builtin(format!("job{n}")));
}
s.resume();
assert_eq!(s.pending_count(), 5);
assert_eq!(s.running_count(), 0);
let due = s.tick();
assert_eq!(due.len(), 5);
assert_eq!(s.pending_count(), 0);
assert_eq!(s.running_count(), 5);
s.mark_finished(&QName::builtin("job0"), true);
s.mark_finished(&QName::builtin("job1"), false);
assert_eq!(s.running_count(), 3, "two have finished");
}
}