use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde::Serialize;
use crate::cron::CronExpr;
use crate::jobs::{JobHandler, JobQueue};
#[allow(dead_code)]
struct ScheduledTask {
name: String,
cron: CronExpr,
handler: JobHandler,
enabled: bool,
last_run: Option<u64>,
}
#[derive(Debug, Clone, Serialize)]
pub struct TaskInfo {
pub name: String,
pub enabled: bool,
pub last_run: Option<u64>,
}
pub struct Scheduler {
tasks: Mutex<Vec<ScheduledTask>>,
job_queue: Arc<JobQueue>,
running: AtomicBool,
}
impl Scheduler {
pub fn new(job_queue: Arc<JobQueue>) -> Self {
Self {
tasks: Mutex::new(Vec::new()),
job_queue,
running: AtomicBool::new(true),
}
}
pub fn schedule(&self, name: &str, cron_expr: &str, handler: JobHandler) -> Result<(), String> {
let cron = CronExpr::parse(cron_expr)?;
self.job_queue.register(name, Arc::clone(&handler));
self.tasks.lock().unwrap().push(ScheduledTask {
name: name.to_string(),
cron,
handler,
enabled: true,
last_run: None,
});
Ok(())
}
pub fn start(self: Arc<Self>) -> SchedulerHandle {
let scheduler = Arc::clone(&self);
let handle = std::thread::spawn(move || {
while scheduler.running.load(Ordering::Relaxed) {
scheduler.tick();
for _ in 0..30 {
if !scheduler.running.load(Ordering::Relaxed) {
return;
}
std::thread::sleep(Duration::from_secs(1));
}
}
});
SchedulerHandle {
scheduler: self,
handle: Some(handle),
}
}
pub fn tick(&self) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.tick_at(now);
}
fn tick_at(&self, now: u64) {
let current_minute = now / 60;
let mut tasks = self.tasks.lock().unwrap();
for task in tasks.iter_mut() {
if !task.enabled {
continue;
}
let last_minute = task.last_run.map(|t| t / 60).unwrap_or(0);
if current_minute > last_minute && task.cron.matches(now) {
task.last_run = Some(now);
self.job_queue.enqueue(
&task.name,
serde_json::json!({
"scheduled": true,
"timestamp": now,
}),
);
}
}
}
pub fn list_tasks(&self) -> Vec<TaskInfo> {
self.tasks
.lock()
.unwrap()
.iter()
.map(|t| TaskInfo {
name: t.name.clone(),
enabled: t.enabled,
last_run: t.last_run,
})
.collect()
}
pub fn set_enabled(&self, name: &str, enabled: bool) -> bool {
let mut tasks = self.tasks.lock().unwrap();
if let Some(task) = tasks.iter_mut().find(|t| t.name == name) {
task.enabled = enabled;
true
} else {
false
}
}
pub fn trigger(&self, name: &str) -> bool {
let tasks = self.tasks.lock().unwrap();
if tasks.iter().any(|t| t.name == name) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
drop(tasks);
self.job_queue.enqueue(
name,
serde_json::json!({
"scheduled": true,
"manual_trigger": true,
"timestamp": now,
}),
);
true
} else {
false
}
}
}
pub struct SchedulerHandle {
scheduler: Arc<Scheduler>,
#[allow(dead_code)]
handle: Option<std::thread::JoinHandle<()>>,
}
impl SchedulerHandle {
pub fn stop(&self) {
self.scheduler.running.store(false, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::jobs::JobResult;
#[test]
fn schedule_registers_handler() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("cleanup", "*/5 * * * *", Arc::new(|_| JobResult::Success))
.unwrap();
let tasks = sched.list_tasks();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].name, "cleanup");
assert!(tasks[0].enabled);
assert!(tasks[0].last_run.is_none());
let stats = q.stats();
assert!(stats.handlers.contains(&"cleanup".to_string()));
}
#[test]
fn schedule_rejects_bad_cron() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
let result = sched.schedule("bad", "not a cron", Arc::new(|_| JobResult::Success));
assert!(result.is_err());
}
#[test]
fn tick_enqueues_matching_tasks() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("every_min", "* * * * *", Arc::new(|_| JobResult::Success))
.unwrap();
sched.tick_at(1705314600);
assert_eq!(q.pending_count(), 1);
}
#[test]
fn tick_deduplicates_within_same_minute() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("dedup", "* * * * *", Arc::new(|_| JobResult::Success))
.unwrap();
sched.tick_at(1705314600);
sched.tick_at(1705314615);
assert_eq!(q.pending_count(), 1);
}
#[test]
fn tick_enqueues_again_next_minute() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("repeat", "* * * * *", Arc::new(|_| JobResult::Success))
.unwrap();
sched.tick_at(1705314600);
sched.tick_at(1705314660);
assert_eq!(q.pending_count(), 2);
}
#[test]
fn tick_skips_disabled_tasks() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("disabled", "* * * * *", Arc::new(|_| JobResult::Success))
.unwrap();
sched.set_enabled("disabled", false);
sched.tick_at(1705314600);
assert_eq!(q.pending_count(), 0);
}
#[test]
fn set_enabled_returns_false_for_unknown() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
assert!(!sched.set_enabled("nonexistent", false));
}
#[test]
fn trigger_enqueues_immediately() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("manual", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
.unwrap();
assert!(sched.trigger("manual"));
assert_eq!(q.pending_count(), 1);
}
#[test]
fn trigger_returns_false_for_unknown() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
assert!(!sched.trigger("nonexistent"));
}
#[test]
fn tick_does_not_match_wrong_schedule() {
let q = Arc::new(JobQueue::new(100));
let sched = Scheduler::new(Arc::clone(&q));
sched
.schedule("yearly", "0 0 1 1 *", Arc::new(|_| JobResult::Success))
.unwrap();
sched.tick_at(1705314600);
assert_eq!(q.pending_count(), 0);
}
}