1use std::str::FromStr;
2use std::sync::Arc;
3use std::time::{Duration, Instant};
4
5use chrono::{DateTime, Utc};
6use tokio::sync::RwLock;
7use tracing::{Instrument, Span, field};
8use uuid::Uuid;
9
10use super::registry::CronRegistry;
11use crate::cluster::LeaderElection;
12use forge_core::CircuitBreakerClient;
13use forge_core::cron::CronContext;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CronStatus {
18 Pending,
20 Running,
22 Completed,
24 Failed,
26}
27
28impl CronStatus {
29 pub fn as_str(&self) -> &'static str {
31 match self {
32 Self::Pending => "pending",
33 Self::Running => "running",
34 Self::Completed => "completed",
35 Self::Failed => "failed",
36 }
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
41pub struct ParseCronStatusError(pub String);
42
43impl std::fmt::Display for ParseCronStatusError {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 write!(f, "invalid cron status: '{}'", self.0)
46 }
47}
48
49impl std::error::Error for ParseCronStatusError {}
50
51impl FromStr for CronStatus {
52 type Err = ParseCronStatusError;
53
54 fn from_str(s: &str) -> Result<Self, Self::Err> {
55 match s {
56 "pending" => Ok(Self::Pending),
57 "running" => Ok(Self::Running),
58 "completed" => Ok(Self::Completed),
59 "failed" => Ok(Self::Failed),
60 _ => Err(ParseCronStatusError(s.to_string())),
61 }
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct CronRecord {
68 pub id: Uuid,
70 pub cron_name: String,
72 pub scheduled_time: DateTime<Utc>,
74 pub timezone: String,
76 pub status: CronStatus,
78 pub node_id: Option<Uuid>,
80 pub started_at: Option<DateTime<Utc>>,
82 pub completed_at: Option<DateTime<Utc>>,
84 pub error: Option<String>,
86}
87
88impl CronRecord {
89 pub fn new(
91 cron_name: impl Into<String>,
92 scheduled_time: DateTime<Utc>,
93 timezone: impl Into<String>,
94 ) -> Self {
95 Self {
96 id: Uuid::new_v4(),
97 cron_name: cron_name.into(),
98 scheduled_time,
99 timezone: timezone.into(),
100 status: CronStatus::Pending,
101 node_id: None,
102 started_at: None,
103 completed_at: None,
104 error: None,
105 }
106 }
107}
108
109#[derive(Clone)]
111pub struct CronRunnerConfig {
112 pub poll_interval: Duration,
114 pub node_id: Uuid,
116 pub is_leader: bool,
118 pub leader_election: Option<Arc<LeaderElection>>,
120 pub run_stale_threshold: Duration,
122}
123
124impl Default for CronRunnerConfig {
125 fn default() -> Self {
126 Self {
127 poll_interval: Duration::from_secs(1),
128 node_id: Uuid::new_v4(),
129 is_leader: true,
130 leader_election: None,
131 run_stale_threshold: Duration::from_secs(15 * 60),
132 }
133 }
134}
135
136pub struct CronRunner {
138 registry: Arc<CronRegistry>,
139 pool: sqlx::PgPool,
140 http_client: CircuitBreakerClient,
141 config: CronRunnerConfig,
142 is_running: Arc<RwLock<bool>>,
143}
144
145impl CronRunner {
146 pub fn new(
148 registry: Arc<CronRegistry>,
149 pool: sqlx::PgPool,
150 http_client: CircuitBreakerClient,
151 config: CronRunnerConfig,
152 ) -> Self {
153 Self {
154 registry,
155 pool,
156 http_client,
157 config,
158 is_running: Arc::new(RwLock::new(false)),
159 }
160 }
161
162 pub async fn run(&self) -> forge_core::Result<()> {
164 {
165 let mut running = self.is_running.write().await;
166 if *running {
167 return Ok(());
168 }
169 *running = true;
170 }
171
172 tracing::debug!("Cron runner starting");
173
174 loop {
175 if !*self.is_running.read().await {
176 break;
177 }
178
179 if self.is_leader()
180 && let Err(e) = self.tick().await
181 {
182 tracing::warn!(error = %e, "Cron tick failed");
183 }
184
185 tokio::time::sleep(self.config.poll_interval).await;
186 }
187
188 tracing::debug!("Cron runner stopped");
189 Ok(())
190 }
191
192 pub async fn stop(&self) {
194 let mut running = self.is_running.write().await;
195 *running = false;
196 }
197
198 fn is_leader(&self) -> bool {
199 self.config
200 .leader_election
201 .as_ref()
202 .map(|e| e.is_leader())
203 .unwrap_or(self.config.is_leader)
204 }
205
206 async fn tick(&self) -> forge_core::Result<()> {
208 let tick_span = tracing::info_span!(
209 "cron.tick",
210 cron.tick_id = %Uuid::new_v4(),
211 cron.jobs_checked = field::Empty,
212 cron.jobs_executed = field::Empty,
213 );
214
215 async {
216 let now = Utc::now();
217 let window_start = now
219 - chrono::Duration::from_std(self.config.poll_interval * 2)
220 .unwrap_or(chrono::Duration::seconds(2));
221
222 let cron_list = self.registry.list();
223 let mut jobs_executed = 0u32;
224
225 Span::current().record("cron.jobs_checked", cron_list.len());
226
227 if cron_list.is_empty() {
228 tracing::trace!("Cron tick: no crons registered");
229 } else {
230 tracing::trace!(
231 cron_count = cron_list.len(),
232 "Cron tick checking {} registered crons",
233 cron_list.len()
234 );
235 }
236
237 for entry in cron_list {
238 let info = &entry.info;
239
240 let scheduled_times = info
241 .schedule
242 .between_in_tz(window_start, now, info.timezone);
243
244 if scheduled_times.len() > 1 {
246 tracing::info!(
247 cron.name = info.name,
248 cron.missed_count = scheduled_times.len() - 1,
249 "Detected missed cron runs"
250 );
251 Span::current().record("cron.missed_runs", scheduled_times.len() - 1);
252 }
253
254 if !scheduled_times.is_empty() {
255 tracing::trace!(
256 cron = info.name,
257 schedule = info.schedule.expression(),
258 scheduled_count = scheduled_times.len(),
259 "Found scheduled cron runs"
260 );
261 }
262
263 for scheduled in scheduled_times {
264 if let Ok(Some(run_id)) =
266 self.try_claim(info.name, scheduled, info.timezone).await
267 {
268 self.execute_cron(entry, run_id, scheduled, false).await;
269 jobs_executed += 1;
270 }
271 }
272
273 if info.catch_up
275 && let Err(e) = self.handle_catch_up(entry).await
276 {
277 tracing::warn!(
278 cron = info.name,
279 error = %e,
280 "Failed to process catch-up runs"
281 );
282 }
283 }
284
285 Span::current().record("cron.jobs_executed", jobs_executed);
286 Ok(())
287 }
288 .instrument(tick_span)
289 .await
290 }
291
292 async fn try_claim(
296 &self,
297 cron_name: &str,
298 scheduled_time: DateTime<Utc>,
299 _timezone: &str,
300 ) -> forge_core::Result<Option<Uuid>> {
301 let claim_id = Uuid::new_v4();
302 let stale_threshold = chrono::Duration::from_std(self.config.run_stale_threshold)
303 .unwrap_or(chrono::Duration::minutes(15));
304
305 let result = sqlx::query(
307 r#"
308 INSERT INTO forge_cron_runs (id, cron_name, scheduled_time, status, node_id, started_at)
309 VALUES ($1, $2, $3, 'running', $4, NOW())
310 ON CONFLICT (cron_name, scheduled_time) DO UPDATE
311 SET
312 id = EXCLUDED.id,
313 status = 'running',
314 node_id = EXCLUDED.node_id,
315 started_at = NOW(),
316 completed_at = NULL,
317 error = NULL
318 WHERE forge_cron_runs.status = 'running'
319 AND forge_cron_runs.started_at < NOW() - $5
320 "#,
321 )
322 .bind(claim_id)
323 .bind(cron_name)
324 .bind(scheduled_time)
325 .bind(self.config.node_id)
326 .bind(stale_threshold)
327 .execute(&self.pool)
328 .await
329 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
330
331 if result.rows_affected() > 0 {
332 Ok(Some(claim_id))
333 } else {
334 Ok(None)
335 }
336 }
337
338 async fn execute_cron(
340 &self,
341 entry: &super::registry::CronEntry,
342 run_id: Uuid,
343 scheduled_time: DateTime<Utc>,
344 is_catch_up: bool,
345 ) {
346 let info = &entry.info;
347 let start_time = Instant::now();
348
349 let exec_span = tracing::info_span!(
350 "cron.execute",
351 cron.name = info.name,
352 cron.run_id = %run_id,
353 cron.schedule = info.schedule.expression(),
354 cron.timezone = info.timezone,
355 cron.scheduled_time = %scheduled_time,
356 cron.is_catch_up = is_catch_up,
357 cron.duration_ms = field::Empty,
358 cron.status = field::Empty,
359 otel.name = %format!("cron {}", info.name),
360 );
361
362 async {
363 tracing::trace!("Executing cron");
364
365 if is_catch_up {
366 tracing::info!(
367 cron.name = info.name,
368 cron.scheduled_time = %scheduled_time,
369 "Executing catch-up run"
370 );
371 }
372
373 let ctx = CronContext::new(
374 run_id,
375 info.name.to_string(),
376 scheduled_time,
377 info.timezone.to_string(),
378 is_catch_up,
379 self.pool.clone(),
380 self.http_client.inner().clone(),
381 );
382
383 let handler = entry.handler.clone();
385 let result = tokio::time::timeout(info.timeout, handler(&ctx)).await;
386
387 let duration_ms = start_time.elapsed().as_millis() as u64;
388 Span::current().record("cron.duration_ms", duration_ms);
389
390 match result {
391 Ok(Ok(())) => {
392 Span::current().record("cron.status", "completed");
393 tracing::debug!(cron.duration_ms = duration_ms, "Cron executed");
394 self.mark_completed(run_id, info.name).await;
395 }
396 Ok(Err(e)) => {
397 Span::current().record("cron.status", "failed");
398 tracing::warn!(
399 cron.duration_ms = duration_ms,
400 error = %e,
401 "Cron failed"
402 );
403 self.mark_failed(run_id, info.name, &e.to_string()).await;
404 }
405 Err(_) => {
406 Span::current().record("cron.status", "timeout");
407 tracing::warn!(
408 cron.duration_ms = duration_ms,
409 cron.timeout_ms = info.timeout.as_millis() as u64,
410 "Cron timed out"
411 );
412 self.mark_failed(run_id, info.name, "Execution timed out")
413 .await;
414 }
415 }
416 }
417 .instrument(exec_span)
418 .await
419 }
420
421 async fn mark_completed(&self, run_id: Uuid, cron_name: &str) {
423 if let Err(e) = sqlx::query(
424 r#"
425 UPDATE forge_cron_runs
426 SET status = 'completed', completed_at = NOW()
427 WHERE id = $1 AND node_id = $2
428 "#,
429 )
430 .bind(run_id)
431 .bind(self.config.node_id)
432 .execute(&self.pool)
433 .await
434 {
435 tracing::error!(cron = cron_name, error = %e, "Failed to mark cron completed");
436 }
437 }
438
439 async fn mark_failed(&self, run_id: Uuid, cron_name: &str, error: &str) {
441 if let Err(e) = sqlx::query(
442 r#"
443 UPDATE forge_cron_runs
444 SET status = 'failed', completed_at = NOW(), error = $3
445 WHERE id = $1 AND node_id = $2
446 "#,
447 )
448 .bind(run_id)
449 .bind(self.config.node_id)
450 .bind(error)
451 .execute(&self.pool)
452 .await
453 {
454 tracing::error!(cron = cron_name, error = %e, "Failed to mark cron failed");
455 }
456 }
457
458 async fn handle_catch_up(&self, entry: &super::registry::CronEntry) -> forge_core::Result<()> {
460 let info = &entry.info;
461 let now = Utc::now();
462
463 let catch_up_span = tracing::info_span!(
464 "cron.catch_up",
465 cron.name = info.name,
466 cron.missed_count = field::Empty,
467 cron.executed_count = field::Empty,
468 );
469
470 async {
471 let last_run: Option<(DateTime<Utc>,)> = sqlx::query_as(
473 r#"
474 SELECT scheduled_time
475 FROM forge_cron_runs
476 WHERE cron_name = $1 AND status = 'completed'
477 ORDER BY scheduled_time DESC
478 LIMIT 1
479 "#,
480 )
481 .bind(info.name)
482 .fetch_optional(&self.pool)
483 .await
484 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
485
486 let start_time = last_run
487 .map(|(t,)| t)
488 .unwrap_or(now - chrono::Duration::days(1));
489
490 let missed_times = info.schedule.between_in_tz(start_time, now, info.timezone);
492
493 let to_catch_up: Vec<_> = missed_times
495 .into_iter()
496 .take(info.catch_up_limit as usize)
497 .collect();
498
499 Span::current().record("cron.missed_count", to_catch_up.len());
500
501 if !to_catch_up.is_empty() {
502 tracing::info!(
503 cron.name = info.name,
504 cron.catch_up_count = to_catch_up.len(),
505 cron.catch_up_limit = info.catch_up_limit,
506 "Processing catch-up runs"
507 );
508 }
509
510 let mut executed_count = 0u32;
511 for scheduled in to_catch_up {
512 if let Some(run_id) = self.try_claim(info.name, scheduled, info.timezone).await? {
514 self.execute_cron(entry, run_id, scheduled, true).await;
515 executed_count += 1;
516 }
517 }
518
519 Span::current().record("cron.executed_count", executed_count);
520 Ok(())
521 }
522 .instrument(catch_up_span)
523 .await
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530
531 #[test]
532 fn test_cron_status_conversion() {
533 assert_eq!(CronStatus::Pending.as_str(), "pending");
534 assert_eq!(CronStatus::Running.as_str(), "running");
535 assert_eq!(CronStatus::Completed.as_str(), "completed");
536 assert_eq!(CronStatus::Failed.as_str(), "failed");
537
538 assert_eq!("pending".parse::<CronStatus>(), Ok(CronStatus::Pending));
539 assert_eq!("running".parse::<CronStatus>(), Ok(CronStatus::Running));
540 assert_eq!("completed".parse::<CronStatus>(), Ok(CronStatus::Completed));
541 assert_eq!("failed".parse::<CronStatus>(), Ok(CronStatus::Failed));
542 assert!("invalid".parse::<CronStatus>().is_err());
543 }
544
545 #[test]
546 fn test_cron_record_creation() {
547 let record = CronRecord::new("daily_cleanup", Utc::now(), "UTC");
548 assert_eq!(record.cron_name, "daily_cleanup");
549 assert_eq!(record.timezone, "UTC");
550 assert_eq!(record.status, CronStatus::Pending);
551 assert!(record.node_id.is_none());
552 }
553
554 #[test]
555 fn test_cron_runner_config_default() {
556 let config = CronRunnerConfig::default();
557 assert_eq!(config.poll_interval, Duration::from_secs(1));
558 assert!(config.is_leader);
559 }
560}