cool_task/
scheduler.rs

1//! 定时任务调度器
2
3use crate::job::{JobOptions, JobResult};
4use crate::queue::Queue;
5use cron::Schedule;
6use parking_lot::RwLock;
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::Arc;
10use std::time::Duration;
11
12/// 定时任务
13#[derive(Clone)]
14pub struct ScheduledJob {
15    /// 任务名称
16    pub name: String,
17    /// Cron 表达式
18    pub cron: String,
19    /// 任务数据
20    pub data: serde_json::Value,
21    /// 任务选项
22    pub options: JobOptions,
23    /// 是否启用
24    pub enabled: bool,
25}
26
27/// 定时任务调度器
28pub struct Scheduler {
29    /// 队列
30    queue: Arc<Queue>,
31    /// 定时任务映射
32    jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
33    /// 是否运行中
34    running: Arc<RwLock<bool>>,
35}
36
37impl Scheduler {
38    /// 创建调度器
39    pub fn new(queue: Queue) -> Self {
40        Self {
41            queue: Arc::new(queue),
42            jobs: Arc::new(RwLock::new(HashMap::new())),
43            running: Arc::new(RwLock::new(false)),
44        }
45    }
46
47    /// 添加定时任务
48    pub fn add(&self, id: &str, job: ScheduledJob) -> JobResult<()> {
49        // 验证 Cron 表达式
50        Schedule::from_str(&job.cron).map_err(|e| {
51            crate::job::JobError::Other(anyhow::anyhow!("无效的 Cron 表达式: {}", e))
52        })?;
53
54        let mut jobs = self.jobs.write();
55        jobs.insert(id.to_string(), job);
56        Ok(())
57    }
58
59    /// 移除定时任务
60    pub fn remove(&self, id: &str) {
61        let mut jobs = self.jobs.write();
62        jobs.remove(id);
63    }
64
65    /// 启用定时任务
66    pub fn enable(&self, id: &str) {
67        let mut jobs = self.jobs.write();
68        if let Some(job) = jobs.get_mut(id) {
69            job.enabled = true;
70        }
71    }
72
73    /// 禁用定时任务
74    pub fn disable(&self, id: &str) {
75        let mut jobs = self.jobs.write();
76        if let Some(job) = jobs.get_mut(id) {
77            job.enabled = false;
78        }
79    }
80
81    /// 获取所有定时任务
82    pub fn list(&self) -> Vec<(String, ScheduledJob)> {
83        let jobs = self.jobs.read();
84        jobs.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
85    }
86
87    /// 启动调度器
88    pub async fn start(&self) {
89        {
90            let mut running = self.running.write();
91            if *running {
92                return;
93            }
94            *running = true;
95        }
96
97        tracing::info!("定时任务调度器已启动");
98
99        let queue = Arc::clone(&self.queue);
100        let jobs = Arc::clone(&self.jobs);
101        let running = Arc::clone(&self.running);
102
103        tokio::spawn(async move {
104            loop {
105                if !*running.read() {
106                    break;
107                }
108
109                let now = chrono::Utc::now();
110
111                // 检查所有定时任务
112                let jobs_snapshot: Vec<(String, ScheduledJob)> = {
113                    let jobs = jobs.read();
114                    jobs.iter()
115                        .filter(|(_, j)| j.enabled)
116                        .map(|(k, v)| (k.clone(), v.clone()))
117                        .collect()
118                };
119
120                for (id, job) in jobs_snapshot {
121                    if let Ok(schedule) = Schedule::from_str(&job.cron) {
122                        // 检查是否应该在当前时间执行
123                        if let Some(next) = schedule.upcoming(chrono::Utc).next() {
124                            let diff = (next - now).num_seconds();
125                            if (0..=1).contains(&diff) {
126                                tracing::info!("触发定时任务: {} ({})", job.name, id);
127
128                                // 添加任务到队列
129                                if let Err(e) = queue
130                                    .add(&job.name, job.data.clone(), job.options.clone())
131                                    .await
132                                {
133                                    tracing::error!("添加定时任务失败: {} - {}", id, e);
134                                }
135                            }
136                        }
137                    }
138                }
139
140                // 每秒检查一次
141                tokio::time::sleep(Duration::from_secs(1)).await;
142            }
143
144            tracing::info!("定时任务调度器已停止");
145        });
146    }
147
148    /// 停止调度器
149    pub fn stop(&self) {
150        let mut running = self.running.write();
151        *running = false;
152    }
153
154    /// 是否运行中
155    pub fn is_running(&self) -> bool {
156        *self.running.read()
157    }
158}