use crate::job::{JobOptions, JobResult};
use crate::queue::Queue;
use cron::Schedule;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
pub struct ScheduledJob {
pub name: String,
pub cron: String,
pub data: serde_json::Value,
pub options: JobOptions,
pub enabled: bool,
}
pub struct Scheduler {
queue: Arc<Queue>,
jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
running: Arc<RwLock<bool>>,
}
impl Scheduler {
pub fn new(queue: Queue) -> Self {
Self {
queue: Arc::new(queue),
jobs: Arc::new(RwLock::new(HashMap::new())),
running: Arc::new(RwLock::new(false)),
}
}
pub fn add(&self, id: &str, job: ScheduledJob) -> JobResult<()> {
Schedule::from_str(&job.cron).map_err(|e| {
crate::job::JobError::Other(anyhow::anyhow!("无效的 Cron 表达式: {}", e))
})?;
let mut jobs = self.jobs.write();
jobs.insert(id.to_string(), job);
Ok(())
}
pub fn remove(&self, id: &str) {
let mut jobs = self.jobs.write();
jobs.remove(id);
}
pub fn enable(&self, id: &str) {
let mut jobs = self.jobs.write();
if let Some(job) = jobs.get_mut(id) {
job.enabled = true;
}
}
pub fn disable(&self, id: &str) {
let mut jobs = self.jobs.write();
if let Some(job) = jobs.get_mut(id) {
job.enabled = false;
}
}
pub fn list(&self) -> Vec<(String, ScheduledJob)> {
let jobs = self.jobs.read();
jobs.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
}
pub async fn start(&self) {
{
let mut running = self.running.write();
if *running {
return;
}
*running = true;
}
tracing::info!("定时任务调度器已启动");
let queue = Arc::clone(&self.queue);
let jobs = Arc::clone(&self.jobs);
let running = Arc::clone(&self.running);
tokio::spawn(async move {
loop {
if !*running.read() {
break;
}
let now = chrono::Utc::now();
let jobs_snapshot: Vec<(String, ScheduledJob)> = {
let jobs = jobs.read();
jobs.iter()
.filter(|(_, j)| j.enabled)
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
};
for (id, job) in jobs_snapshot {
if let Ok(schedule) = Schedule::from_str(&job.cron) {
if let Some(next) = schedule.upcoming(chrono::Utc).next() {
let diff = (next - now).num_seconds();
if (0..=1).contains(&diff) {
tracing::info!("触发定时任务: {} ({})", job.name, id);
if let Err(e) = queue
.add(&job.name, job.data.clone(), job.options.clone())
.await
{
tracing::error!("添加定时任务失败: {} - {}", id, e);
}
}
}
}
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
tracing::info!("定时任务调度器已停止");
});
}
pub fn stop(&self) {
let mut running = self.running.write();
*running = false;
}
pub fn is_running(&self) -> bool {
*self.running.read()
}
}