1use crate::job::{JobOptions, JobResult};
4use crate::queue::Queue;
5use cron::Schedule;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11
12#[derive(Clone)]
14pub struct ScheduledJob {
15 pub name: String,
17 pub cron: String,
19 pub data: serde_json::Value,
21 pub options: JobOptions,
23 pub enabled: bool,
25}
26
27pub struct Scheduler {
29 queue: Arc<Queue>,
31 jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
33 running: Arc<RwLock<bool>>,
35}
36
37impl Scheduler {
38 pub fn new(queue: Queue) -> Self {
40 Self {
41 queue: Arc::new(queue),
42 jobs: Arc::new(RwLock::new(HashMap::new())),
43 running: Arc::new(RwLock::new(false)),
44 }
45 }
46
47 pub fn add(&self, id: &str, job: ScheduledJob) -> JobResult<()> {
49 Schedule::from_str(&job.cron).map_err(|e| {
51 crate::job::JobError::Other(anyhow::anyhow!("无效的 Cron 表达式: {}", e))
52 })?;
53
54 let mut jobs = self.jobs.write();
55 jobs.insert(id.to_string(), job);
56 Ok(())
57 }
58
59 pub fn remove(&self, id: &str) {
61 let mut jobs = self.jobs.write();
62 jobs.remove(id);
63 }
64
65 pub fn enable(&self, id: &str) {
67 let mut jobs = self.jobs.write();
68 if let Some(job) = jobs.get_mut(id) {
69 job.enabled = true;
70 }
71 }
72
73 pub fn disable(&self, id: &str) {
75 let mut jobs = self.jobs.write();
76 if let Some(job) = jobs.get_mut(id) {
77 job.enabled = false;
78 }
79 }
80
81 pub fn list(&self) -> Vec<(String, ScheduledJob)> {
83 let jobs = self.jobs.read();
84 jobs.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
85 }
86
87 pub async fn start(&self) {
89 {
90 let mut running = self.running.write();
91 if *running {
92 return;
93 }
94 *running = true;
95 }
96
97 tracing::info!("定时任务调度器已启动");
98
99 let queue = Arc::clone(&self.queue);
100 let jobs = Arc::clone(&self.jobs);
101 let running = Arc::clone(&self.running);
102
103 tokio::spawn(async move {
104 loop {
105 if !*running.read() {
106 break;
107 }
108
109 let now = chrono::Utc::now();
110
111 let jobs_snapshot: Vec<(String, ScheduledJob)> = {
113 let jobs = jobs.read();
114 jobs.iter()
115 .filter(|(_, j)| j.enabled)
116 .map(|(k, v)| (k.clone(), v.clone()))
117 .collect()
118 };
119
120 for (id, job) in jobs_snapshot {
121 if let Ok(schedule) = Schedule::from_str(&job.cron) {
122 if let Some(next) = schedule.upcoming(chrono::Utc).next() {
124 let diff = (next - now).num_seconds();
125 if (0..=1).contains(&diff) {
126 tracing::info!("触发定时任务: {} ({})", job.name, id);
127
128 if let Err(e) = queue
130 .add(&job.name, job.data.clone(), job.options.clone())
131 .await
132 {
133 tracing::error!("添加定时任务失败: {} - {}", id, e);
134 }
135 }
136 }
137 }
138 }
139
140 tokio::time::sleep(Duration::from_secs(1)).await;
142 }
143
144 tracing::info!("定时任务调度器已停止");
145 });
146 }
147
148 pub fn stop(&self) {
150 let mut running = self.running.write();
151 *running = false;
152 }
153
154 pub fn is_running(&self) -> bool {
156 *self.running.read()
157 }
158}