1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use tokio::sync::RwLock;
7use uuid::Uuid;
8
9use super::registry::CronRegistry;
10use forge_core::CircuitBreakerClient;
11use forge_core::cron::CronContext;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum CronStatus {
16 Pending,
18 Running,
20 Completed,
22 Failed,
24}
25
26impl CronStatus {
27 pub fn as_str(&self) -> &'static str {
29 match self {
30 Self::Pending => "pending",
31 Self::Running => "running",
32 Self::Completed => "completed",
33 Self::Failed => "failed",
34 }
35 }
36}
37
38impl FromStr for CronStatus {
39 type Err = std::convert::Infallible;
40
41 fn from_str(s: &str) -> Result<Self, Self::Err> {
42 Ok(match s {
43 "pending" => Self::Pending,
44 "running" => Self::Running,
45 "completed" => Self::Completed,
46 "failed" => Self::Failed,
47 _ => Self::Pending,
48 })
49 }
50}
51
52#[derive(Debug, Clone)]
54pub struct CronRecord {
55 pub id: Uuid,
57 pub cron_name: String,
59 pub scheduled_time: DateTime<Utc>,
61 pub timezone: String,
63 pub status: CronStatus,
65 pub node_id: Option<Uuid>,
67 pub started_at: Option<DateTime<Utc>>,
69 pub completed_at: Option<DateTime<Utc>>,
71 pub error: Option<String>,
73}
74
75impl CronRecord {
76 pub fn new(
78 cron_name: impl Into<String>,
79 scheduled_time: DateTime<Utc>,
80 timezone: impl Into<String>,
81 ) -> Self {
82 Self {
83 id: Uuid::new_v4(),
84 cron_name: cron_name.into(),
85 scheduled_time,
86 timezone: timezone.into(),
87 status: CronStatus::Pending,
88 node_id: None,
89 started_at: None,
90 completed_at: None,
91 error: None,
92 }
93 }
94}
95
96#[derive(Debug, Clone)]
98pub struct CronRunnerConfig {
99 pub poll_interval: Duration,
101 pub node_id: Uuid,
103 pub is_leader: bool,
105}
106
107impl Default for CronRunnerConfig {
108 fn default() -> Self {
109 Self {
110 poll_interval: Duration::from_secs(1),
111 node_id: Uuid::new_v4(),
112 is_leader: true,
113 }
114 }
115}
116
117pub struct CronRunner {
119 registry: Arc<CronRegistry>,
120 pool: sqlx::PgPool,
121 http_client: CircuitBreakerClient,
122 config: CronRunnerConfig,
123 is_running: Arc<RwLock<bool>>,
124}
125
126impl CronRunner {
127 pub fn new(
129 registry: Arc<CronRegistry>,
130 pool: sqlx::PgPool,
131 http_client: CircuitBreakerClient,
132 config: CronRunnerConfig,
133 ) -> Self {
134 Self {
135 registry,
136 pool,
137 http_client,
138 config,
139 is_running: Arc::new(RwLock::new(false)),
140 }
141 }
142
143 pub async fn run(&self) -> forge_core::Result<()> {
145 {
146 let mut running = self.is_running.write().await;
147 if *running {
148 return Ok(());
149 }
150 *running = true;
151 }
152
153 tracing::info!("Cron runner starting");
154
155 loop {
156 if !*self.is_running.read().await {
157 break;
158 }
159
160 if self.config.is_leader {
161 if let Err(e) = self.tick().await {
162 tracing::error!(error = %e, "Cron tick failed");
163 }
164 }
165
166 tokio::time::sleep(self.config.poll_interval).await;
167 }
168
169 tracing::info!("Cron runner stopped");
170 Ok(())
171 }
172
173 pub async fn stop(&self) {
175 let mut running = self.is_running.write().await;
176 *running = false;
177 }
178
179 async fn tick(&self) -> forge_core::Result<()> {
181 let now = Utc::now();
182 let window_start = now
184 - chrono::Duration::from_std(self.config.poll_interval * 2)
185 .unwrap_or(chrono::Duration::seconds(2));
186
187 let cron_list = self.registry.list();
188
189 if cron_list.is_empty() {
190 tracing::trace!("Cron tick: no crons registered");
191 } else {
192 tracing::trace!(
193 cron_count = cron_list.len(),
194 "Cron tick checking {} registered crons",
195 cron_list.len()
196 );
197 }
198
199 for entry in cron_list {
200 let info = &entry.info;
201
202 let scheduled_times = info
203 .schedule
204 .between_in_tz(window_start, now, info.timezone);
205 if !scheduled_times.is_empty() {
206 tracing::trace!(
207 cron = info.name,
208 schedule = info.schedule.expression(),
209 scheduled_count = scheduled_times.len(),
210 "Found scheduled cron runs"
211 );
212 }
213
214 for scheduled in scheduled_times {
215 if let Ok(claimed) = self.try_claim(info.name, scheduled, info.timezone).await {
217 if claimed {
218 self.execute_cron(entry, scheduled, false).await;
220 }
221 }
222 }
223
224 if info.catch_up {
226 if let Err(e) = self.handle_catch_up(entry).await {
227 tracing::warn!(
228 cron = info.name,
229 error = %e,
230 "Failed to process catch-up runs"
231 );
232 }
233 }
234 }
235
236 Ok(())
237 }
238
239 async fn try_claim(
241 &self,
242 cron_name: &str,
243 scheduled_time: DateTime<Utc>,
244 _timezone: &str,
245 ) -> forge_core::Result<bool> {
246 let result = sqlx::query(
248 r#"
249 INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
250 VALUES ($1, $2, $3, 'running', $4, NOW())
251 ON CONFLICT (cron_name, scheduled_time) DO NOTHING
252 "#,
253 )
254 .bind(Uuid::new_v4())
255 .bind(cron_name)
256 .bind(scheduled_time)
257 .bind(self.config.node_id)
258 .execute(&self.pool)
259 .await
260 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
261
262 Ok(result.rows_affected() > 0)
263 }
264
265 async fn execute_cron(
267 &self,
268 entry: &super::registry::CronEntry,
269 scheduled_time: DateTime<Utc>,
270 is_catch_up: bool,
271 ) {
272 let info = &entry.info;
273 let run_id = Uuid::new_v4();
274
275 tracing::debug!(
276 cron = info.name,
277 scheduled_time = %scheduled_time,
278 is_catch_up = is_catch_up,
279 "Executing cron"
280 );
281
282 let ctx = CronContext::new(
283 run_id,
284 info.name.to_string(),
285 scheduled_time,
286 info.timezone.to_string(),
287 is_catch_up,
288 self.pool.clone(),
289 self.http_client.inner().clone(),
290 );
291
292 let handler = entry.handler.clone();
294 let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
295
296 match result {
297 Ok(Ok(())) => {
298 tracing::info!(
299 cron = info.name,
300 scheduled_time = %scheduled_time,
301 "Cron executed"
302 );
303 self.mark_completed(info.name, scheduled_time).await;
304 }
305 Ok(Err(e)) => {
306 tracing::error!(cron = info.name, error = %e, "Cron failed");
307 self.mark_failed(info.name, scheduled_time, &e.to_string())
308 .await;
309 }
310 Err(_) => {
311 tracing::error!(cron = info.name, "Cron timed out");
312 self.mark_failed(info.name, scheduled_time, "Execution timed out")
313 .await;
314 }
315 }
316 }
317
318 async fn mark_completed(&self, cron_name: &str, scheduled_time: DateTime<Utc>) {
320 let _ = sqlx::query(
321 r#"
322 UPDATE forge_cron_runs
323 SET status = 'completed', completed_at = NOW()
324 WHERE cron_name = $1 AND scheduled_time = $2
325 "#,
326 )
327 .bind(cron_name)
328 .bind(scheduled_time)
329 .execute(&self.pool)
330 .await;
331 }
332
333 async fn mark_failed(&self, cron_name: &str, scheduled_time: DateTime<Utc>, error: &str) {
335 let _ = sqlx::query(
336 r#"
337 UPDATE forge_cron_runs
338 SET status = 'failed', completed_at = NOW(), error = $3
339 WHERE cron_name = $1 AND scheduled_time = $2
340 "#,
341 )
342 .bind(cron_name)
343 .bind(scheduled_time)
344 .bind(error)
345 .execute(&self.pool)
346 .await;
347 }
348
349 async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
351 let info = &entry.info;
352 let now = Utc::now();
353
354 let last_run: Option<(DateTime<Utc>,)> = sqlx::query_as(
356 r#"
357 SELECT scheduled_time
358 FROM forge_cron_runs
359 WHERE cron_name = $1 AND status = 'completed'
360 ORDER BY scheduled_time DESC
361 LIMIT 1
362 "#,
363 )
364 .bind(info.name)
365 .fetch_optional(&self.pool)
366 .await
367 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
368
369 let start_time = last_run
370 .map(|(t,)| t)
371 .unwrap_or(now - chrono::Duration::days(1));
372
373 let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
375
376 let to_catch_up: Vec<_> = missed_times
378 .into_iter()
379 .take(info.catch_up_limit as usize)
380 .collect();
381
382 for scheduled in to_catch_up {
383 if self.try_claim(info.name, scheduled, info.timezone).await? {
385 self.execute_cron(entry, scheduled, true).await;
386 }
387 }
388
389 Ok(())
390 }
391}
392
393#[cfg(test)]
394mod tests {
395 use super::*;
396
397 #[test]
398 fn test_cron_status_conversion() {
399 assert_eq!(CronStatus::Pending.as_str(), "pending");
400 assert_eq!(CronStatus::Running.as_str(), "running");
401 assert_eq!(CronStatus::Completed.as_str(), "completed");
402 assert_eq!(CronStatus::Failed.as_str(), "failed");
403
404 assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
405 assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
406 assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
407 assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
408 }
409
410 #[test]
411 fn test_cron_record_creation() {
412 let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
413 assert_eq!(record.cron_name, "daily_cleanup");
414 assert_eq!(record.timezone, "UTC");
415 assert_eq!(record.status, CronStatus::Pending);
416 assert!(record.node_id.is_none());
417 }
418
419 #[test]
420 fn test_cron_runner_config_default() {
421 let config = CronRunnerConfig::default();
422 assert_eq!(config.poll_interval, Duration::from_secs(1));
423 assert!(config.is_leader);
424 }
425}