Skip to main content

agent_diva_core/cron/
service.rs

1//! Cron service for scheduling agent tasks
2
3use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::sync::{Mutex, RwLock};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info, warn};
12
13use crate::cron::types::{
14    CreateCronJobRequest, CronJob, CronJobDto, CronJobLifecycleStatus, CronJobState, CronPayload,
15    CronRunSnapshot, CronSchedule, CronStore, CronTrigger, UpdateCronJobRequest,
16};
17
18fn now_ms() -> i64 {
19    SystemTime::now()
20        .duration_since(UNIX_EPOCH)
21        .unwrap()
22        .as_millis() as i64
23}
24
25fn normalize_cron_expr(expr: &str) -> String {
26    let fields: Vec<&str> = expr.split_whitespace().collect();
27    if fields.len() == 5 {
28        format!("0 {}", fields.join(" "))
29    } else {
30        fields.join(" ")
31    }
32}
33
34fn compute_next_run(schedule: &CronSchedule, now_ms: i64) -> Option<i64> {
35    match schedule {
36        CronSchedule::At { at_ms } => {
37            if *at_ms > now_ms {
38                Some(*at_ms)
39            } else {
40                None
41            }
42        }
43        CronSchedule::Every { every_ms } => {
44            if *every_ms <= 0 {
45                None
46            } else {
47                Some(now_ms + every_ms)
48            }
49        }
50        CronSchedule::Cron { expr, tz } => {
51            match cron::Schedule::try_from(normalize_cron_expr(expr).as_str()) {
52                Ok(schedule) => {
53                    let seconds = now_ms / 1000;
54                    let nanoseconds = ((now_ms % 1000) * 1_000_000) as u32;
55                    let now_utc = chrono::DateTime::from_timestamp(seconds, nanoseconds)
56                        .unwrap_or_else(chrono::Utc::now);
57
58                    if let Some(tz_str) = tz {
59                        if let Ok(tz) = tz_str.parse::<chrono_tz::Tz>() {
60                            let dt = now_utc.with_timezone(&tz);
61                            return schedule
62                                .after(&dt)
63                                .next()
64                                .map(|next| next.timestamp_millis());
65                        }
66                        warn!("Invalid timezone '{}', falling back to UTC", tz_str);
67                    }
68
69                    schedule
70                        .after(&now_utc)
71                        .next()
72                        .map(|next| next.timestamp_millis())
73                }
74                Err(e) => {
75                    warn!("Invalid cron expression '{}': {}", expr, e);
76                    None
77                }
78            }
79        }
80    }
81}
82
83pub type JobCallback = Arc<
84    dyn Fn(
85            CronJob,
86            CancellationToken,
87        ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<String>> + Send>>
88        + Send
89        + Sync,
90>;
91
92#[derive(Clone)]
93struct ActiveCronRun {
94    snapshot: CronRunSnapshot,
95    cancel_token: CancellationToken,
96}
97
98pub struct CronService {
99    store_path: PathBuf,
100    on_job: Option<JobCallback>,
101    store: Arc<RwLock<Option<CronStore>>>,
102    timer_task: Arc<Mutex<Option<JoinHandle<()>>>>,
103    running: Arc<RwLock<bool>>,
104    active_runs: Arc<RwLock<HashMap<String, ActiveCronRun>>>,
105}
106
107impl CronService {
108    pub fn new(store_path: PathBuf, on_job: Option<JobCallback>) -> Self {
109        Self {
110            store_path,
111            on_job,
112            store: Arc::new(RwLock::new(None)),
113            timer_task: Arc::new(Mutex::new(None)),
114            running: Arc::new(RwLock::new(false)),
115            active_runs: Arc::new(RwLock::new(HashMap::new())),
116        }
117    }
118
119    async fn load_store(&self) -> CronStore {
120        {
121            let store_guard = self.store.read().await;
122            if let Some(store) = store_guard.as_ref() {
123                return store.clone();
124            }
125        }
126
127        let store = if self.store_path.exists() {
128            match tokio::fs::read_to_string(&self.store_path).await {
129                Ok(content) => match serde_json::from_str::<CronStore>(&content) {
130                    Ok(store) => {
131                        debug!("Loaded {} cron jobs from disk", store.jobs.len());
132                        store
133                    }
134                    Err(e) => {
135                        warn!("Failed to parse cron store: {}", e);
136                        CronStore::default()
137                    }
138                },
139                Err(e) => {
140                    warn!("Failed to read cron store: {}", e);
141                    CronStore::default()
142                }
143            }
144        } else {
145            CronStore::default()
146        };
147
148        let mut store_guard = self.store.write().await;
149        *store_guard = Some(store.clone());
150        store
151    }
152
153    async fn save_store(&self) {
154        let store = {
155            let store_guard = self.store.read().await;
156            match store_guard.as_ref() {
157                Some(s) => s.clone(),
158                None => return,
159            }
160        };
161
162        if let Some(parent) = self.store_path.parent() {
163            let _ = tokio::fs::create_dir_all(parent).await;
164        }
165
166        match serde_json::to_string_pretty(&store) {
167            Ok(content) => {
168                if let Err(e) = tokio::fs::write(&self.store_path, content).await {
169                    error!("Failed to save cron store: {}", e);
170                }
171            }
172            Err(e) => error!("Failed to serialize cron store: {}", e),
173        }
174    }
175
176    pub async fn start(&self) {
177        *self.running.write().await = true;
178
179        let mut store = self.load_store().await;
180        self.recompute_next_runs(&mut store).await;
181
182        let mut store_guard = self.store.write().await;
183        *store_guard = Some(store);
184        drop(store_guard);
185
186        self.save_store().await;
187        self.arm_timer().await;
188    }
189
190    pub async fn stop(&self) {
191        *self.running.write().await = false;
192
193        {
194            let mut timer_guard = self.timer_task.lock().await;
195            if let Some(task) = timer_guard.take() {
196                task.abort();
197            }
198        }
199
200        let tokens = {
201            let active_guard = self.active_runs.read().await;
202            active_guard
203                .values()
204                .map(|run| run.cancel_token.clone())
205                .collect::<Vec<_>>()
206        };
207        for token in tokens {
208            token.cancel();
209        }
210    }
211
212    async fn recompute_next_runs(&self, store: &mut CronStore) {
213        let now = now_ms();
214        for job in &mut store.jobs {
215            if job.enabled {
216                job.state.next_run_at_ms = compute_next_run(&job.schedule, now);
217            } else {
218                job.state.next_run_at_ms = None;
219            }
220        }
221    }
222
223    async fn get_next_wake_ms(&self) -> Option<i64> {
224        let store_guard = self.store.read().await;
225        let store = store_guard.as_ref()?;
226
227        store
228            .jobs
229            .iter()
230            .filter(|job| job.enabled && job.state.next_run_at_ms.is_some())
231            .filter_map(|job| job.state.next_run_at_ms)
232            .min()
233    }
234
235    async fn arm_timer(&self) {
236        {
237            let mut timer_guard = self.timer_task.lock().await;
238            if let Some(task) = timer_guard.take() {
239                task.abort();
240            }
241        }
242
243        let next_wake = self.get_next_wake_ms().await;
244        let is_running = *self.running.read().await;
245        if !is_running {
246            return;
247        }
248
249        let Some(next_wake) = next_wake else {
250            return;
251        };
252
253        let delay_ms = (next_wake - now_ms()).max(0);
254        let delay = tokio::time::Duration::from_millis(delay_ms as u64);
255        let service = Arc::new(CronServiceHandle {
256            store: Arc::clone(&self.store),
257            timer_task: Arc::clone(&self.timer_task),
258            running: Arc::clone(&self.running),
259            on_job: self.on_job.clone(),
260            store_path: self.store_path.clone(),
261            active_runs: Arc::clone(&self.active_runs),
262        });
263
264        let task = tokio::spawn(async move {
265            tokio::time::sleep(delay).await;
266            let is_running = *service.running.read().await;
267            if is_running {
268                service.on_timer().await;
269            }
270        });
271
272        let mut timer_guard = self.timer_task.lock().await;
273        *timer_guard = Some(task);
274    }
275
276    fn to_job_dto(job: &CronJob, active_run: Option<CronRunSnapshot>) -> CronJobDto {
277        let computed_status = if active_run.is_some() {
278            CronJobLifecycleStatus::Running
279        } else if !job.enabled {
280            CronJobLifecycleStatus::Paused
281        } else if job.state.last_status.as_deref() == Some("error") {
282            CronJobLifecycleStatus::Failed
283        } else if job.state.last_run_at_ms.is_some() {
284            CronJobLifecycleStatus::Completed
285        } else {
286            CronJobLifecycleStatus::Scheduled
287        };
288
289        CronJobDto {
290            job: job.clone(),
291            is_running: active_run.is_some(),
292            active_run,
293            computed_status,
294        }
295    }
296
297    async fn active_snapshot_for(&self, job_id: &str) -> Option<CronRunSnapshot> {
298        let active_guard = self.active_runs.read().await;
299        active_guard.get(job_id).map(|run| run.snapshot.clone())
300    }
301
302    async fn register_active_run(
303        &self,
304        job_id: &str,
305        trigger: CronTrigger,
306    ) -> Result<CronRunSnapshot, String> {
307        let mut active_guard = self.active_runs.write().await;
308        if active_guard.contains_key(job_id) {
309            return Err(format!("job {} is already running", job_id));
310        }
311
312        let timestamp = now_ms();
313        let snapshot = CronRunSnapshot {
314            run_id: uuid::Uuid::new_v4().to_string(),
315            job_id: job_id.to_string(),
316            started_at_ms: timestamp,
317            last_heartbeat_at_ms: timestamp,
318            trigger,
319            cancelable: true,
320        };
321
322        active_guard.insert(
323            job_id.to_string(),
324            ActiveCronRun {
325                snapshot: snapshot.clone(),
326                cancel_token: CancellationToken::new(),
327            },
328        );
329        Ok(snapshot)
330    }
331
332    async fn cancel_token_for(&self, job_id: &str) -> Option<CancellationToken> {
333        let active_guard = self.active_runs.read().await;
334        active_guard.get(job_id).map(|run| run.cancel_token.clone())
335    }
336
337    async fn clear_active_run(&self, job_id: &str) {
338        let mut active_guard = self.active_runs.write().await;
339        active_guard.remove(job_id);
340    }
341
342    async fn execute_job_with_trigger(
343        &self,
344        mut job: CronJob,
345        trigger: CronTrigger,
346    ) -> Result<CronJobDto, String> {
347        info!(
348            "Cron: executing job '{}' ({}) trigger={:?}",
349            job.name, job.id, trigger
350        );
351        let _ = self.register_active_run(&job.id, trigger).await?;
352        let cancel_token = self
353            .cancel_token_for(&job.id)
354            .await
355            .ok_or_else(|| format!("missing cancel token for {}", job.id))?;
356
357        let callback_result = if cancel_token.is_cancelled() {
358            Err("job cancelled before start".to_string())
359        } else if let Some(callback) = &self.on_job {
360            match (callback)(job.clone(), cancel_token.clone()).await {
361                Some(response) if cancel_token.is_cancelled() => Err("job cancelled".to_string()),
362                Some(response) if response.to_ascii_lowercase().starts_with("error") => {
363                    Err(response)
364                }
365                Some(_) | None => Ok(()),
366            }
367        } else if cancel_token.is_cancelled() {
368            Err("job cancelled".to_string())
369        } else {
370            Ok(())
371        };
372
373        let now = now_ms();
374        job.state.last_run_at_ms = Some(now);
375        job.updated_at_ms = now;
376        match callback_result {
377            Ok(()) => {
378                job.state.last_status = Some("ok".to_string());
379                job.state.last_error = None;
380                info!(
381                    "Cron: job '{}' ({}) completed successfully",
382                    job.name, job.id
383                );
384            }
385            Err(err) => {
386                job.state.last_status = Some("error".to_string());
387                job.state.last_error = Some(err);
388                warn!("Cron: job '{}' ({}) failed", job.name, job.id);
389            }
390        }
391
392        let should_remove = match &job.schedule {
393            CronSchedule::At { .. } => {
394                if job.delete_after_run && job.state.last_status.as_deref() == Some("ok") {
395                    true
396                } else {
397                    job.enabled = false;
398                    job.state.next_run_at_ms = None;
399                    false
400                }
401            }
402            _ => {
403                job.state.next_run_at_ms = if job.enabled {
404                    compute_next_run(&job.schedule, now_ms())
405                } else {
406                    None
407                };
408                false
409            }
410        };
411
412        {
413            let mut store_guard = self.store.write().await;
414            if let Some(store) = store_guard.as_mut() {
415                if should_remove {
416                    store.jobs.retain(|existing| existing.id != job.id);
417                } else if let Some(existing) =
418                    store.jobs.iter_mut().find(|existing| existing.id == job.id)
419                {
420                    *existing = job.clone();
421                }
422            }
423        }
424
425        self.clear_active_run(&job.id).await;
426        self.save_store().await;
427        self.arm_timer().await;
428
429        self.get_job(&job.id)
430            .await
431            .ok_or_else(|| format!("job {} no longer exists after execution", job.id))
432    }
433
434    pub async fn list_jobs(&self, include_disabled: bool) -> Vec<CronJob> {
435        let store = self.load_store().await;
436        let mut jobs: Vec<CronJob> = if include_disabled {
437            store.jobs
438        } else {
439            store.jobs.into_iter().filter(|job| job.enabled).collect()
440        };
441
442        jobs.sort_by_key(|job| job.state.next_run_at_ms.unwrap_or(i64::MAX));
443        jobs
444    }
445
446    pub async fn list_job_views(&self, include_disabled: bool) -> Vec<CronJobDto> {
447        let jobs = self.list_jobs(include_disabled).await;
448        let active_guard = self.active_runs.read().await;
449        jobs.into_iter()
450            .map(|job| {
451                let active = active_guard.get(&job.id).map(|run| run.snapshot.clone());
452                Self::to_job_dto(&job, active)
453            })
454            .collect()
455    }
456
457    pub async fn get_job(&self, job_id: &str) -> Option<CronJobDto> {
458        let store = self.load_store().await;
459        let job = store.jobs.into_iter().find(|job| job.id == job_id)?;
460        let active = self.active_snapshot_for(job_id).await;
461        Some(Self::to_job_dto(&job, active))
462    }
463
464    #[allow(clippy::too_many_arguments)]
465    pub async fn add_job(
466        &self,
467        name: String,
468        schedule: CronSchedule,
469        message: String,
470        deliver: bool,
471        channel: Option<String>,
472        to: Option<String>,
473        delete_after_run: bool,
474    ) -> CronJob {
475        let now = now_ms();
476        let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
477        let job = CronJob {
478            id: id.clone(),
479            name: name.clone(),
480            enabled: true,
481            schedule: schedule.clone(),
482            payload: CronPayload {
483                kind: "agent_turn".to_string(),
484                message,
485                deliver,
486                channel,
487                to,
488            },
489            state: CronJobState {
490                next_run_at_ms: compute_next_run(&schedule, now),
491                ..Default::default()
492            },
493            created_at_ms: now,
494            updated_at_ms: now,
495            delete_after_run,
496        };
497
498        {
499            let mut store_guard = self.store.write().await;
500            if let Some(store) = store_guard.as_mut() {
501                store.jobs.push(job.clone());
502            } else {
503                let mut store = CronStore::default();
504                store.jobs.push(job.clone());
505                *store_guard = Some(store);
506            }
507        }
508
509        self.save_store().await;
510        self.arm_timer().await;
511        info!("Cron: added job '{}' ({})", name, id);
512        job
513    }
514
515    pub async fn create_job(&self, request: CreateCronJobRequest) -> Result<CronJobDto, String> {
516        let now = now_ms();
517        let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
518        let schedule = request.schedule.clone();
519        let job = CronJob {
520            id,
521            name: request.name,
522            enabled: request.enabled,
523            schedule,
524            payload: request.payload,
525            state: CronJobState {
526                next_run_at_ms: if request.enabled {
527                    compute_next_run(&request.schedule, now)
528                } else {
529                    None
530                },
531                ..Default::default()
532            },
533            created_at_ms: now,
534            updated_at_ms: now,
535            delete_after_run: request.delete_after_run,
536        };
537
538        {
539            let mut store_guard = self.store.write().await;
540            if let Some(store) = store_guard.as_mut() {
541                store.jobs.push(job.clone());
542            } else {
543                let mut store = CronStore::default();
544                store.jobs.push(job.clone());
545                *store_guard = Some(store);
546            }
547        }
548
549        self.save_store().await;
550        self.arm_timer().await;
551        Ok(Self::to_job_dto(&job, None))
552    }
553
554    pub async fn update_job(
555        &self,
556        job_id: &str,
557        request: UpdateCronJobRequest,
558    ) -> Result<CronJobDto, String> {
559        let updated_job = {
560            let mut store_guard = self.store.write().await;
561            let store = store_guard
562                .as_mut()
563                .ok_or_else(|| "cron store not initialized".to_string())?;
564            let job = store
565                .jobs
566                .iter_mut()
567                .find(|job| job.id == job_id)
568                .ok_or_else(|| format!("job {} not found", job_id))?;
569
570            job.name = request.name;
571            job.schedule = request.schedule;
572            job.payload = request.payload;
573            job.delete_after_run = request.delete_after_run;
574            job.enabled = request.enabled;
575            job.updated_at_ms = now_ms();
576            job.state.next_run_at_ms = if job.enabled {
577                compute_next_run(&job.schedule, now_ms())
578            } else {
579                None
580            };
581            job.clone()
582        };
583
584        self.save_store().await;
585        self.arm_timer().await;
586        Ok(Self::to_job_dto(
587            &updated_job,
588            self.active_snapshot_for(job_id).await,
589        ))
590    }
591
592    pub async fn delete_job(&self, job_id: &str) -> Result<(), String> {
593        let _ = self.stop_run(job_id).await;
594        let removed = {
595            let mut store_guard = self.store.write().await;
596            if let Some(store) = store_guard.as_mut() {
597                let before = store.jobs.len();
598                store.jobs.retain(|job| job.id != job_id);
599                store.jobs.len() < before
600            } else {
601                false
602            }
603        };
604
605        self.clear_active_run(job_id).await;
606
607        if removed {
608            self.save_store().await;
609            self.arm_timer().await;
610            Ok(())
611        } else {
612            Err(format!("job {} not found", job_id))
613        }
614    }
615
616    pub async fn remove_job(&self, job_id: &str) -> bool {
617        self.delete_job(job_id).await.is_ok()
618    }
619
620    pub async fn enable_job(&self, job_id: &str, enabled: bool) -> Option<CronJob> {
621        let job = {
622            let mut store_guard = self.store.write().await;
623            if let Some(store) = store_guard.as_mut() {
624                if let Some(job) = store.jobs.iter_mut().find(|job| job.id == job_id) {
625                    job.enabled = enabled;
626                    job.updated_at_ms = now_ms();
627                    job.state.next_run_at_ms = if enabled {
628                        compute_next_run(&job.schedule, now_ms())
629                    } else {
630                        None
631                    };
632                    Some(job.clone())
633                } else {
634                    None
635                }
636            } else {
637                None
638            }
639        };
640
641        if job.is_some() {
642            self.save_store().await;
643            self.arm_timer().await;
644        }
645
646        job
647    }
648
649    pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<CronJobDto, String> {
650        let updated = self
651            .enable_job(job_id, enabled)
652            .await
653            .ok_or_else(|| format!("job {} not found", job_id))?;
654        Ok(Self::to_job_dto(
655            &updated,
656            self.active_snapshot_for(job_id).await,
657        ))
658    }
659
660    pub async fn run_job(&self, job_id: &str, force: bool) -> bool {
661        self.run_job_now(job_id, force).await.is_ok()
662    }
663
664    pub async fn run_job_now(&self, job_id: &str, force: bool) -> Result<CronJobDto, String> {
665        let job = {
666            let store_guard = self.store.read().await;
667            if let Some(store) = store_guard.as_ref() {
668                store
669                    .jobs
670                    .iter()
671                    .find(|job| job.id == job_id)
672                    .filter(|job| force || job.enabled)
673                    .cloned()
674            } else {
675                None
676            }
677        }
678        .ok_or_else(|| format!("job {} not found or disabled", job_id))?;
679
680        self.execute_job_with_trigger(job, CronTrigger::Manual)
681            .await
682    }
683
684    pub async fn stop_run(&self, job_id: &str) -> Result<CronRunSnapshot, String> {
685        let active = {
686            let active_guard = self.active_runs.read().await;
687            active_guard.get(job_id).cloned()
688        }
689        .ok_or_else(|| format!("job {} is not running", job_id))?;
690
691        active.cancel_token.cancel();
692        Ok(active.snapshot)
693    }
694
695    pub async fn status(&self) -> serde_json::Value {
696        let jobs = self.list_job_views(true).await;
697        let is_running = *self.running.read().await;
698        let next_wake = self.get_next_wake_ms().await;
699
700        serde_json::json!({
701            "enabled": is_running,
702            "jobs": jobs.len(),
703            "runningJobs": jobs.iter().filter(|job| job.is_running).count(),
704            "nextWakeAtMs": next_wake,
705        })
706    }
707}
708
709struct CronServiceHandle {
710    store: Arc<RwLock<Option<CronStore>>>,
711    timer_task: Arc<Mutex<Option<JoinHandle<()>>>>,
712    running: Arc<RwLock<bool>>,
713    on_job: Option<JobCallback>,
714    store_path: PathBuf,
715    active_runs: Arc<RwLock<HashMap<String, ActiveCronRun>>>,
716}
717
718impl CronServiceHandle {
719    async fn execute_due_job(&self, mut job: CronJob) {
720        info!("Cron: due job fired '{}' ({})", job.name, job.id);
721        let mut active_guard = self.active_runs.write().await;
722        if active_guard.contains_key(&job.id) {
723            return;
724        }
725
726        let timestamp = now_ms();
727        active_guard.insert(
728            job.id.clone(),
729            ActiveCronRun {
730                snapshot: CronRunSnapshot {
731                    run_id: uuid::Uuid::new_v4().to_string(),
732                    job_id: job.id.clone(),
733                    started_at_ms: timestamp,
734                    last_heartbeat_at_ms: timestamp,
735                    trigger: CronTrigger::Scheduled,
736                    cancelable: true,
737                },
738                cancel_token: CancellationToken::new(),
739            },
740        );
741        let cancel_token = active_guard
742            .get(&job.id)
743            .map(|run| run.cancel_token.clone())
744            .expect("inserted active run");
745        drop(active_guard);
746
747        let result = if cancel_token.is_cancelled() {
748            Err("job cancelled before start".to_string())
749        } else if let Some(callback) = &self.on_job {
750            match (callback)(job.clone(), cancel_token.clone()).await {
751                Some(response) if cancel_token.is_cancelled() => Err("job cancelled".to_string()),
752                Some(response) if response.to_ascii_lowercase().starts_with("error") => {
753                    Err(response)
754                }
755                Some(_) | None => Ok(()),
756            }
757        } else if cancel_token.is_cancelled() {
758            Err("job cancelled".to_string())
759        } else {
760            Ok(())
761        };
762
763        job.state.last_run_at_ms = Some(timestamp);
764        job.updated_at_ms = now_ms();
765        match result {
766            Ok(()) => {
767                job.state.last_status = Some("ok".to_string());
768                job.state.last_error = None;
769                info!(
770                    "Cron: scheduled job '{}' ({}) completed successfully",
771                    job.name, job.id
772                );
773            }
774            Err(err) => {
775                job.state.last_status = Some("error".to_string());
776                job.state.last_error = Some(err);
777                warn!("Cron: scheduled job '{}' ({}) failed", job.name, job.id);
778            }
779        }
780
781        let should_remove = match &job.schedule {
782            CronSchedule::At { .. } => {
783                if job.delete_after_run && job.state.last_status.as_deref() == Some("ok") {
784                    true
785                } else {
786                    job.enabled = false;
787                    job.state.next_run_at_ms = None;
788                    false
789                }
790            }
791            _ => {
792                job.state.next_run_at_ms = if job.enabled {
793                    compute_next_run(&job.schedule, now_ms())
794                } else {
795                    None
796                };
797                false
798            }
799        };
800
801        {
802            let mut store_guard = self.store.write().await;
803            if let Some(store) = store_guard.as_mut() {
804                if should_remove {
805                    store.jobs.retain(|existing| existing.id != job.id);
806                } else if let Some(existing) =
807                    store.jobs.iter_mut().find(|existing| existing.id == job.id)
808                {
809                    *existing = job.clone();
810                }
811            }
812        }
813
814        let mut active_guard = self.active_runs.write().await;
815        active_guard.remove(&job.id);
816    }
817
818    async fn on_timer(&self) {
819        let now = now_ms();
820        let due_jobs = {
821            let store_guard = self.store.read().await;
822            if let Some(store) = store_guard.as_ref() {
823                store
824                    .jobs
825                    .iter()
826                    .filter(|job| {
827                        job.enabled
828                            && job.state.next_run_at_ms.is_some()
829                            && now >= job.state.next_run_at_ms.unwrap()
830                    })
831                    .cloned()
832                    .collect::<Vec<_>>()
833            } else {
834                Vec::new()
835            }
836        };
837
838        for job in due_jobs {
839            self.execute_due_job(job).await;
840        }
841
842        if let Some(parent) = self.store_path.parent() {
843            let _ = tokio::fs::create_dir_all(parent).await;
844        }
845        {
846            let store_guard = self.store.read().await;
847            if let Some(store) = store_guard.as_ref() {
848                if let Ok(content) = serde_json::to_string_pretty(store) {
849                    let _ = tokio::fs::write(&self.store_path, content).await;
850                }
851            }
852        }
853
854        self.rearm_timer();
855    }
856
857    fn rearm_timer(&self) {
858        let store = Arc::clone(&self.store);
859        let timer_task = Arc::clone(&self.timer_task);
860        let running = Arc::clone(&self.running);
861        let on_job = self.on_job.clone();
862        let store_path = self.store_path.clone();
863        let active_runs = Arc::clone(&self.active_runs);
864
865        tokio::spawn(async move {
866            {
867                let mut timer_guard = timer_task.lock().await;
868                if let Some(task) = timer_guard.take() {
869                    task.abort();
870                }
871            }
872
873            let is_running = *running.read().await;
874            if !is_running {
875                return;
876            }
877
878            let next_wake = {
879                let store_guard = store.read().await;
880                store_guard.as_ref().and_then(|cron_store| {
881                    cron_store
882                        .jobs
883                        .iter()
884                        .filter(|job| job.enabled && job.state.next_run_at_ms.is_some())
885                        .filter_map(|job| job.state.next_run_at_ms)
886                        .min()
887                })
888            };
889
890            let Some(next_wake) = next_wake else {
891                return;
892            };
893
894            let delay_ms = (next_wake - now_ms()).max(0);
895            let delay = tokio::time::Duration::from_millis(delay_ms as u64);
896            let service = Arc::new(CronServiceHandle {
897                store: Arc::clone(&store),
898                timer_task: Arc::clone(&timer_task),
899                running: Arc::clone(&running),
900                on_job: on_job.clone(),
901                store_path: store_path.clone(),
902                active_runs: Arc::clone(&active_runs),
903            });
904
905            let task = tokio::spawn(async move {
906                tokio::time::sleep(delay).await;
907                let is_running = *service.running.read().await;
908                if is_running {
909                    service.on_timer().await;
910                }
911            });
912
913            let mut timer_guard = timer_task.lock().await;
914            *timer_guard = Some(task);
915        });
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use super::*;
922    use tempfile::TempDir;
923
924    #[test]
925    fn test_compute_next_run_every() {
926        let schedule = CronSchedule::every(5000);
927        assert_eq!(compute_next_run(&schedule, 1000), Some(6000));
928    }
929
930    #[test]
931    fn test_compute_next_run_cron_five_fields_supported() {
932        let schedule = CronSchedule::cron("* * * * *".to_string(), None);
933        let next = compute_next_run(&schedule, 0).unwrap();
934        assert_eq!(next, 60_000);
935    }
936
937    #[tokio::test]
938    async fn test_enable_disable_updates_status() {
939        let temp_dir = TempDir::new().unwrap();
940        let store_path = temp_dir.path().join("cron.json");
941        let service = CronService::new(store_path, None);
942        service.start().await;
943
944        let job = service
945            .create_job(CreateCronJobRequest {
946                name: "Test".to_string(),
947                schedule: CronSchedule::every(5000),
948                payload: CronPayload::default(),
949                delete_after_run: false,
950                enabled: true,
951            })
952            .await
953            .unwrap();
954
955        let paused = service.set_job_enabled(&job.job.id, false).await.unwrap();
956        assert_eq!(paused.computed_status, CronJobLifecycleStatus::Paused);
957
958        let scheduled = service.set_job_enabled(&job.job.id, true).await.unwrap();
959        assert_eq!(scheduled.computed_status, CronJobLifecycleStatus::Scheduled);
960        service.stop().await;
961    }
962
963    #[tokio::test]
964    async fn test_delete_running_job_removes_it() {
965        let temp_dir = TempDir::new().unwrap();
966        let store_path = temp_dir.path().join("cron.json");
967        let callback: JobCallback = Arc::new(|_job, token| {
968            Box::pin(async move {
969                tokio::select! {
970                    _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => Some("done".to_string()),
971                    _ = token.cancelled() => Some("Error: cancelled".to_string()),
972                }
973            })
974        });
975        let service = Arc::new(CronService::new(store_path, Some(callback)));
976        service.start().await;
977
978        let job = service
979            .create_job(CreateCronJobRequest {
980                name: "Long".to_string(),
981                schedule: CronSchedule::every(5000),
982                payload: CronPayload::default(),
983                delete_after_run: false,
984                enabled: true,
985            })
986            .await
987            .unwrap();
988
989        let job_id = job.job.id.clone();
990        let service_for_run = Arc::clone(&service);
991        let job_id_for_run = job_id.clone();
992        let runner = tokio::spawn(async move {
993            let _ = service_for_run.run_job_now(&job_id_for_run, true).await;
994        });
995
996        tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
997        service.delete_job(&job_id).await.unwrap();
998        let jobs = service.list_job_views(true).await;
999        assert!(jobs.into_iter().all(|job| job.job.id != job_id));
1000
1001        let _ = runner.await;
1002        service.stop().await;
1003    }
1004}