1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use chrono::{DateTime, Utc};
6use tokio::sync::RwLock;
7use tracing::{Instrument, Span, field};
8use uuid::Uuid;
9
10use super::registry::CronRegistry;
11use crate::cluster::LeaderElection;
12use forge_core::CircuitBreakerClient;
13use forge_core::cron::CronContext;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CronStatus {
18 Pending,
20 Running,
22 Completed,
24 Failed,
26}
27
28impl CronStatus {
29 pub fn as_str(&self) -> &'static str {
31 match self {
32 Self::Pending => "pending",
33 Self::Running => "running",
34 Self::Completed => "completed",
35 Self::Failed => "failed",
36 }
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct ParseCronStatusError(pub String);
42
43impl std::fmt::Display for ParseCronStatusError {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "invalid cron status: '{}'", self.0)
46 }
47}
48
49impl std::error::Error for ParseCronStatusError {}
50
51impl FromStr for CronStatus {
52 type Err = ParseCronStatusError;
53
54 fn from_str(s: &str) -> Result<Self, Self::Err> {
55 match s {
56 "pending" => Ok(Self::Pending),
57 "running" => Ok(Self::Running),
58 "completed" => Ok(Self::Completed),
59 "failed" => Ok(Self::Failed),
60 _ => Err(ParseCronStatusError(s.to_string())),
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct CronRecord {
68 pub id: Uuid,
70 pub cron_name: String,
72 pub scheduled_time: DateTime<Utc>,
74 pub timezone: String,
76 pub status: CronStatus,
78 pub node_id: Option<Uuid>,
80 pub started_at: Option<DateTime<Utc>>,
82 pub completed_at: Option<DateTime<Utc>>,
84 pub error: Option<String>,
86}
87
88impl CronRecord {
89 pub fn new(
91 cron_name: impl Into<String>,
92 scheduled_time: DateTime<Utc>,
93 timezone: impl Into<String>,
94 ) -> Self {
95 Self {
96 id: Uuid::new_v4(),
97 cron_name: cron_name.into(),
98 scheduled_time,
99 timezone: timezone.into(),
100 status: CronStatus::Pending,
101 node_id: None,
102 started_at: None,
103 completed_at: None,
104 error: None,
105 }
106 }
107}
108
109#[derive(Clone)]
111pub struct CronRunnerConfig {
112 pub poll_interval: Duration,
114 pub node_id: Uuid,
116 pub is_leader: bool,
118 pub leader_election: Option<Arc<LeaderElection>>,
120 pub run_stale_threshold: Duration,
122}
123
124impl Default for CronRunnerConfig {
125 fn default() -> Self {
126 Self {
127 poll_interval: Duration::from_secs(1),
128 node_id: Uuid::new_v4(),
129 is_leader: true,
130 leader_election: None,
131 run_stale_threshold: Duration::from_secs(15 * 60),
132 }
133 }
134}
135
136pub struct CronRunner {
138 registry: Arc<CronRegistry>,
139 pool: sqlx::PgPool,
140 http_client: CircuitBreakerClient,
141 config: CronRunnerConfig,
142 is_running: Arc<RwLock<bool>>,
143}
144
145impl CronRunner {
146 pub fn new(
148 registry: Arc<CronRegistry>,
149 pool: sqlx::PgPool,
150 http_client: CircuitBreakerClient,
151 config: CronRunnerConfig,
152 ) -> Self {
153 Self {
154 registry,
155 pool,
156 http_client,
157 config,
158 is_running: Arc::new(RwLock::new(false)),
159 }
160 }
161
162 pub async fn run(&self) -> forge_core::Result<()> {
164 {
165 let mut running = self.is_running.write().await;
166 if *running {
167 return Ok(());
168 }
169 *running = true;
170 }
171
172 tracing::debug!("Cron runner starting");
173
174 loop {
175 if !*self.is_running.read().await {
176 break;
177 }
178
179 if self.is_leader()
180 && let Err(e) = self.tick().await
181 {
182 tracing::warn!(error = %e, "Cron tick failed");
183 }
184
185 tokio::time::sleep(self.config.poll_interval).await;
186 }
187
188 tracing::debug!("Cron runner stopped");
189 Ok(())
190 }
191
192 pub async fn stop(&self) {
194 let mut running = self.is_running.write().await;
195 *running = false;
196 }
197
198 fn is_leader(&self) -> bool {
199 self.config
200 .leader_election
201 .as_ref()
202 .map(|e| e.is_leader())
203 .unwrap_or(self.config.is_leader)
204 }
205
206 async fn tick(&self) -> forge_core::Result<()> {
208 let tick_span = tracing::info_span!(
209 "cron.tick",
210 cron.tick_id = %Uuid::new_v4(),
211 cron.jobs_checked = field::Empty,
212 cron.jobs_executed = field::Empty,
213 );
214
215 async {
216 let now = Utc::now();
217 let window_start = now
219 - chrono::Duration::from_std(self.config.poll_interval * 2)
220 .unwrap_or(chrono::Duration::seconds(2));
221
222 let cron_list = self.registry.list();
223 let mut jobs_executed = 0u32;
224
225 Span::current().record("cron.jobs_checked", cron_list.len());
226
227 if cron_list.is_empty() {
228 tracing::trace!("Cron tick: no crons registered");
229 } else {
230 tracing::trace!(
231 cron_count = cron_list.len(),
232 "Cron tick checking {} registered crons",
233 cron_list.len()
234 );
235 }
236
237 for entry in cron_list {
238 let info = &entry.info;
239
240 let scheduled_times = info
241 .schedule
242 .between_in_tz(window_start, now, info.timezone);
243
244 if scheduled_times.len() > 1 {
246 tracing::info!(
247 cron.name = info.name,
248 cron.missed_count = scheduled_times.len() - 1,
249 "Detected missed cron runs"
250 );
251 Span::current().record("cron.missed_runs", scheduled_times.len() - 1);
252 }
253
254 if !scheduled_times.is_empty() {
255 tracing::trace!(
256 cron = info.name,
257 schedule = info.schedule.expression(),
258 scheduled_count = scheduled_times.len(),
259 "Found scheduled cron runs"
260 );
261 }
262
263 for scheduled in scheduled_times {
264 if let Ok(Some(run_id)) =
266 self.try_claim(info.name, scheduled, info.timezone).await
267 {
268 self.execute_cron(entry, run_id, scheduled, false).await;
269 jobs_executed += 1;
270 }
271 }
272
273 if info.catch_up
275 && let Err(e) = self.handle_catch_up(entry).await
276 {
277 tracing::warn!(
278 cron = info.name,
279 error = %e,
280 "Failed to process catch-up runs"
281 );
282 }
283 }
284
285 Span::current().record("cron.jobs_executed", jobs_executed);
286 Ok(())
287 }
288 .instrument(tick_span)
289 .await
290 }
291
292 async fn try_claim(
296 &self,
297 cron_name: &str,
298 scheduled_time: DateTime<Utc>,
299 _timezone: &str,
300 ) -> forge_core::Result<Option<Uuid>> {
301 let claim_id = Uuid::new_v4();
302 let stale_threshold = chrono::Duration::from_std(self.config.run_stale_threshold)
303 .unwrap_or(chrono::Duration::minutes(15));
304
305 let result = sqlx::query(
307 r#"
308 INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
309 VALUES ($1, $2, $3, 'running', $4, NOW())
310 ON CONFLICT (cron_name, scheduled_time) DO UPDATE
311 SET
312 id = EXCLUDED.id,
313 status = 'running',
314 node_id = EXCLUDED.node_id,
315 started_at = NOW(),
316 completed_at = NULL,
317 error = NULL
318 WHERE forge_cron_runs.status = 'running'
319 AND forge_cron_runs.started_at < NOW() - $5
320 "#,
321 )
322 .bind(claim_id)
323 .bind(cron_name)
324 .bind(scheduled_time)
325 .bind(self.config.node_id)
326 .bind(stale_threshold)
327 .execute(&self.pool)
328 .await
329 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
330
331 if result.rows_affected() > 0 {
332 Ok(Some(claim_id))
333 } else {
334 Ok(None)
335 }
336 }
337
338 async fn execute_cron(
340 &self,
341 entry: &super::registry::CronEntry,
342 run_id: Uuid,
343 scheduled_time: DateTime<Utc>,
344 is_catch_up: bool,
345 ) {
346 let info = &entry.info;
347 let start_time = Instant::now();
348
349 let exec_span = tracing::info_span!(
350 "cron.execute",
351 cron.name = info.name,
352 cron.run_id = %run_id,
353 cron.schedule = info.schedule.expression(),
354 cron.timezone = info.timezone,
355 cron.scheduled_time = %scheduled_time,
356 cron.is_catch_up = is_catch_up,
357 cron.duration_ms = field::Empty,
358 cron.status = field::Empty,
359 otel.name = %format!("cron {}", info.name),
360 );
361
362 async {
363 tracing::trace!("Executing cron");
364
365 if is_catch_up {
366 tracing::info!(
367 cron.name = info.name,
368 cron.scheduled_time = %scheduled_time,
369 "Executing catch-up run"
370 );
371 }
372
373 let mut ctx = CronContext::new(
374 run_id,
375 info.name.to_string(),
376 scheduled_time,
377 info.timezone.to_string(),
378 is_catch_up,
379 self.pool.clone(),
380 self.http_client.clone(),
381 );
382 ctx.set_http_timeout(info.http_timeout);
383
384 let handler = entry.handler.clone();
386 let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
387
388 let duration_ms = start_time.elapsed().as_millis() as u64;
389 Span::current().record("cron.duration_ms", duration_ms);
390
391 match result {
392 Ok(Ok(())) => {
393 Span::current().record("cron.status", "completed");
394 tracing::debug!(cron.duration_ms = duration_ms, "Cron executed");
395 self.mark_completed(run_id, info.name).await;
396 }
397 Ok(Err(e)) => {
398 Span::current().record("cron.status", "failed");
399 tracing::error!(
400 cron.duration_ms = duration_ms,
401 error = %e,
402 "Cron failed"
403 );
404 self.mark_failed(run_id, info.name, &e.to_string()).await;
405 }
406 Err(_) => {
407 Span::current().record("cron.status", "timeout");
408 tracing::error!(
409 cron.duration_ms = duration_ms,
410 cron.timeout_ms = info.timeout.as_millis() as u64,
411 "Cron timed out"
412 );
413 self.mark_failed(run_id, info.name, "Execution timed out")
414 .await;
415 }
416 }
417 }
418 .instrument(exec_span)
419 .await
420 }
421
422 async fn mark_completed(&self, run_id: Uuid, cron_name: &str) {
424 if let Err(e) = sqlx::query(
425 r#"
426 UPDATE forge_cron_runs
427 SET status = 'completed', completed_at = NOW()
428 WHERE id = $1 AND node_id = $2
429 "#,
430 )
431 .bind(run_id)
432 .bind(self.config.node_id)
433 .execute(&self.pool)
434 .await
435 {
436 tracing::error!(cron = cron_name, error = %e, "Failed to mark cron completed");
437 }
438 }
439
440 async fn mark_failed(&self, run_id: Uuid, cron_name: &str, error: &str) {
442 if let Err(e) = sqlx::query(
443 r#"
444 UPDATE forge_cron_runs
445 SET status = 'failed', completed_at = NOW(), error = $3
446 WHERE id = $1 AND node_id = $2
447 "#,
448 )
449 .bind(run_id)
450 .bind(self.config.node_id)
451 .bind(error)
452 .execute(&self.pool)
453 .await
454 {
455 tracing::error!(cron = cron_name, error = %e, "Failed to mark cron failed");
456 }
457 }
458
459 async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
461 let info = &entry.info;
462 let now = Utc::now();
463
464 let catch_up_span = tracing::info_span!(
465 "cron.catch_up",
466 cron.name = info.name,
467 cron.missed_count = field::Empty,
468 cron.executed_count = field::Empty,
469 );
470
471 async {
472 let last_run = sqlx::query_scalar!(
474 r#"
475 SELECT scheduled_time
476 FROM forge_cron_runs
477 WHERE cron_name = $1 AND status = 'completed'
478 ORDER BY scheduled_time DESC
479 LIMIT 1
480 "#,
481 info.name
482 )
483 .fetch_optional(&self.pool)
484 .await
485 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
486
487 let start_time = last_run.unwrap_or(now - chrono::Duration::days(1));
488
489 let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
491
492 let to_catch_up: Vec<_> = missed_times
494 .into_iter()
495 .take(info.catch_up_limit as usize)
496 .collect();
497
498 Span::current().record("cron.missed_count", to_catch_up.len());
499
500 if !to_catch_up.is_empty() {
501 tracing::info!(
502 cron.name = info.name,
503 cron.catch_up_count = to_catch_up.len(),
504 cron.catch_up_limit = info.catch_up_limit,
505 "Processing catch-up runs"
506 );
507 }
508
509 let mut executed_count = 0u32;
510 for scheduled in to_catch_up {
511 if let Some(run_id) = self.try_claim(info.name, scheduled, info.timezone).await? {
513 self.execute_cron(entry, run_id, scheduled, true).await;
514 executed_count += 1;
515 }
516 }
517
518 Span::current().record("cron.executed_count", executed_count);
519 Ok(())
520 }
521 .instrument(catch_up_span)
522 .await
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529
530 #[test]
531 fn test_cron_status_conversion() {
532 assert_eq!(CronStatus::Pending.as_str(), "pending");
533 assert_eq!(CronStatus::Running.as_str(), "running");
534 assert_eq!(CronStatus::Completed.as_str(), "completed");
535 assert_eq!(CronStatus::Failed.as_str(), "failed");
536
537 assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
538 assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
539 assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
540 assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
541 assert!("invalid".parse::<CronStatus>().is_err());
542 }
543
544 #[test]
545 fn test_cron_record_creation() {
546 let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
547 assert_eq!(record.cron_name, "daily_cleanup");
548 assert_eq!(record.timezone, "UTC");
549 assert_eq!(record.status, CronStatus::Pending);
550 assert!(record.node_id.is_none());
551 }
552
553 #[test]
554 fn test_cron_runner_config_default() {
555 let config = CronRunnerConfig::default();
556 assert_eq!(config.poll_interval, Duration::from_secs(1));
557 assert!(config.is_leader);
558 }
559}