1use std::collections::HashMap;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use tokio::sync::{Mutex, RwLock};
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, error, info, warn};
12
13use crate::cron::types::{
14 CreateCronJobRequest, CronJob, CronJobDto, CronJobLifecycleStatus, CronJobState, CronPayload,
15 CronRunSnapshot, CronSchedule, CronStore, CronTrigger, UpdateCronJobRequest,
16};
17
18fn now_ms() -> i64 {
19 SystemTime::now()
20 .duration_since(UNIX_EPOCH)
21 .unwrap()
22 .as_millis() as i64
23}
24
25fn normalize_cron_expr(expr: &str) -> String {
26 let fields: Vec<&str> = expr.split_whitespace().collect();
27 if fields.len() == 5 {
28 format!("0 {}", fields.join(" "))
29 } else {
30 fields.join(" ")
31 }
32}
33
34fn compute_next_run(schedule: &CronSchedule, now_ms: i64) -> Option<i64> {
35 match schedule {
36 CronSchedule::At { at_ms } => {
37 if *at_ms > now_ms {
38 Some(*at_ms)
39 } else {
40 None
41 }
42 }
43 CronSchedule::Every { every_ms } => {
44 if *every_ms <= 0 {
45 None
46 } else {
47 Some(now_ms + every_ms)
48 }
49 }
50 CronSchedule::Cron { expr, tz } => {
51 match cron::Schedule::try_from(normalize_cron_expr(expr).as_str()) {
52 Ok(schedule) => {
53 let seconds = now_ms / 1000;
54 let nanoseconds = ((now_ms % 1000) * 1_000_000) as u32;
55 let now_utc = chrono::DateTime::from_timestamp(seconds, nanoseconds)
56 .unwrap_or_else(chrono::Utc::now);
57
58 if let Some(tz_str) = tz {
59 if let Ok(tz) = tz_str.parse::<chrono_tz::Tz>() {
60 let dt = now_utc.with_timezone(&tz);
61 return schedule
62 .after(&dt)
63 .next()
64 .map(|next| next.timestamp_millis());
65 }
66 warn!("Invalid timezone '{}', falling back to UTC", tz_str);
67 }
68
69 schedule
70 .after(&now_utc)
71 .next()
72 .map(|next| next.timestamp_millis())
73 }
74 Err(e) => {
75 warn!("Invalid cron expression '{}': {}", expr, e);
76 None
77 }
78 }
79 }
80 }
81}
82
83pub type JobCallback = Arc<
84 dyn Fn(
85 CronJob,
86 CancellationToken,
87 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Option<String>> + Send>>
88 + Send
89 + Sync,
90>;
91
92#[derive(Clone)]
93struct ActiveCronRun {
94 snapshot: CronRunSnapshot,
95 cancel_token: CancellationToken,
96}
97
98pub struct CronService {
99 store_path: PathBuf,
100 on_job: Option<JobCallback>,
101 store: Arc<RwLock<Option<CronStore>>>,
102 timer_task: Arc<Mutex<Option<JoinHandle<()>>>>,
103 running: Arc<RwLock<bool>>,
104 active_runs: Arc<RwLock<HashMap<String, ActiveCronRun>>>,
105}
106
107impl CronService {
108 pub fn new(store_path: PathBuf, on_job: Option<JobCallback>) -> Self {
109 Self {
110 store_path,
111 on_job,
112 store: Arc::new(RwLock::new(None)),
113 timer_task: Arc::new(Mutex::new(None)),
114 running: Arc::new(RwLock::new(false)),
115 active_runs: Arc::new(RwLock::new(HashMap::new())),
116 }
117 }
118
119 async fn load_store(&self) -> CronStore {
120 {
121 let store_guard = self.store.read().await;
122 if let Some(store) = store_guard.as_ref() {
123 return store.clone();
124 }
125 }
126
127 let store = if self.store_path.exists() {
128 match tokio::fs::read_to_string(&self.store_path).await {
129 Ok(content) => match serde_json::from_str::<CronStore>(&content) {
130 Ok(store) => {
131 debug!("Loaded {} cron jobs from disk", store.jobs.len());
132 store
133 }
134 Err(e) => {
135 warn!("Failed to parse cron store: {}", e);
136 CronStore::default()
137 }
138 },
139 Err(e) => {
140 warn!("Failed to read cron store: {}", e);
141 CronStore::default()
142 }
143 }
144 } else {
145 CronStore::default()
146 };
147
148 let mut store_guard = self.store.write().await;
149 *store_guard = Some(store.clone());
150 store
151 }
152
153 async fn save_store(&self) {
154 let store = {
155 let store_guard = self.store.read().await;
156 match store_guard.as_ref() {
157 Some(s) => s.clone(),
158 None => return,
159 }
160 };
161
162 if let Some(parent) = self.store_path.parent() {
163 let _ = tokio::fs::create_dir_all(parent).await;
164 }
165
166 match serde_json::to_string_pretty(&store) {
167 Ok(content) => {
168 if let Err(e) = tokio::fs::write(&self.store_path, content).await {
169 error!("Failed to save cron store: {}", e);
170 }
171 }
172 Err(e) => error!("Failed to serialize cron store: {}", e),
173 }
174 }
175
176 pub async fn start(&self) {
177 *self.running.write().await = true;
178
179 let mut store = self.load_store().await;
180 self.recompute_next_runs(&mut store).await;
181
182 let mut store_guard = self.store.write().await;
183 *store_guard = Some(store);
184 drop(store_guard);
185
186 self.save_store().await;
187 self.arm_timer().await;
188 }
189
190 pub async fn stop(&self) {
191 *self.running.write().await = false;
192
193 {
194 let mut timer_guard = self.timer_task.lock().await;
195 if let Some(task) = timer_guard.take() {
196 task.abort();
197 }
198 }
199
200 let tokens = {
201 let active_guard = self.active_runs.read().await;
202 active_guard
203 .values()
204 .map(|run| run.cancel_token.clone())
205 .collect::<Vec<_>>()
206 };
207 for token in tokens {
208 token.cancel();
209 }
210 }
211
212 async fn recompute_next_runs(&self, store: &mut CronStore) {
213 let now = now_ms();
214 for job in &mut store.jobs {
215 if job.enabled {
216 job.state.next_run_at_ms = compute_next_run(&job.schedule, now);
217 } else {
218 job.state.next_run_at_ms = None;
219 }
220 }
221 }
222
223 async fn get_next_wake_ms(&self) -> Option<i64> {
224 let store_guard = self.store.read().await;
225 let store = store_guard.as_ref()?;
226
227 store
228 .jobs
229 .iter()
230 .filter(|job| job.enabled && job.state.next_run_at_ms.is_some())
231 .filter_map(|job| job.state.next_run_at_ms)
232 .min()
233 }
234
235 async fn arm_timer(&self) {
236 {
237 let mut timer_guard = self.timer_task.lock().await;
238 if let Some(task) = timer_guard.take() {
239 task.abort();
240 }
241 }
242
243 let next_wake = self.get_next_wake_ms().await;
244 let is_running = *self.running.read().await;
245 if !is_running {
246 return;
247 }
248
249 let Some(next_wake) = next_wake else {
250 return;
251 };
252
253 let delay_ms = (next_wake - now_ms()).max(0);
254 let delay = tokio::time::Duration::from_millis(delay_ms as u64);
255 let service = Arc::new(CronServiceHandle {
256 store: Arc::clone(&self.store),
257 timer_task: Arc::clone(&self.timer_task),
258 running: Arc::clone(&self.running),
259 on_job: self.on_job.clone(),
260 store_path: self.store_path.clone(),
261 active_runs: Arc::clone(&self.active_runs),
262 });
263
264 let task = tokio::spawn(async move {
265 tokio::time::sleep(delay).await;
266 let is_running = *service.running.read().await;
267 if is_running {
268 service.on_timer().await;
269 }
270 });
271
272 let mut timer_guard = self.timer_task.lock().await;
273 *timer_guard = Some(task);
274 }
275
276 fn to_job_dto(job: &CronJob, active_run: Option<CronRunSnapshot>) -> CronJobDto {
277 let computed_status = if active_run.is_some() {
278 CronJobLifecycleStatus::Running
279 } else if !job.enabled {
280 CronJobLifecycleStatus::Paused
281 } else if job.state.last_status.as_deref() == Some("error") {
282 CronJobLifecycleStatus::Failed
283 } else if job.state.last_run_at_ms.is_some() {
284 CronJobLifecycleStatus::Completed
285 } else {
286 CronJobLifecycleStatus::Scheduled
287 };
288
289 CronJobDto {
290 job: job.clone(),
291 is_running: active_run.is_some(),
292 active_run,
293 computed_status,
294 }
295 }
296
297 async fn active_snapshot_for(&self, job_id: &str) -> Option<CronRunSnapshot> {
298 let active_guard = self.active_runs.read().await;
299 active_guard.get(job_id).map(|run| run.snapshot.clone())
300 }
301
302 async fn register_active_run(
303 &self,
304 job_id: &str,
305 trigger: CronTrigger,
306 ) -> Result<CronRunSnapshot, String> {
307 let mut active_guard = self.active_runs.write().await;
308 if active_guard.contains_key(job_id) {
309 return Err(format!("job {} is already running", job_id));
310 }
311
312 let timestamp = now_ms();
313 let snapshot = CronRunSnapshot {
314 run_id: uuid::Uuid::new_v4().to_string(),
315 job_id: job_id.to_string(),
316 started_at_ms: timestamp,
317 last_heartbeat_at_ms: timestamp,
318 trigger,
319 cancelable: true,
320 };
321
322 active_guard.insert(
323 job_id.to_string(),
324 ActiveCronRun {
325 snapshot: snapshot.clone(),
326 cancel_token: CancellationToken::new(),
327 },
328 );
329 Ok(snapshot)
330 }
331
332 async fn cancel_token_for(&self, job_id: &str) -> Option<CancellationToken> {
333 let active_guard = self.active_runs.read().await;
334 active_guard.get(job_id).map(|run| run.cancel_token.clone())
335 }
336
337 async fn clear_active_run(&self, job_id: &str) {
338 let mut active_guard = self.active_runs.write().await;
339 active_guard.remove(job_id);
340 }
341
342 async fn execute_job_with_trigger(
343 &self,
344 mut job: CronJob,
345 trigger: CronTrigger,
346 ) -> Result<CronJobDto, String> {
347 info!(
348 "Cron: executing job '{}' ({}) trigger={:?}",
349 job.name, job.id, trigger
350 );
351 let _ = self.register_active_run(&job.id, trigger).await?;
352 let cancel_token = self
353 .cancel_token_for(&job.id)
354 .await
355 .ok_or_else(|| format!("missing cancel token for {}", job.id))?;
356
357 let callback_result = if cancel_token.is_cancelled() {
358 Err("job cancelled before start".to_string())
359 } else if let Some(callback) = &self.on_job {
360 match (callback)(job.clone(), cancel_token.clone()).await {
361 Some(response) if cancel_token.is_cancelled() => Err("job cancelled".to_string()),
362 Some(response) if response.to_ascii_lowercase().starts_with("error") => {
363 Err(response)
364 }
365 Some(_) | None => Ok(()),
366 }
367 } else if cancel_token.is_cancelled() {
368 Err("job cancelled".to_string())
369 } else {
370 Ok(())
371 };
372
373 let now = now_ms();
374 job.state.last_run_at_ms = Some(now);
375 job.updated_at_ms = now;
376 match callback_result {
377 Ok(()) => {
378 job.state.last_status = Some("ok".to_string());
379 job.state.last_error = None;
380 info!(
381 "Cron: job '{}' ({}) completed successfully",
382 job.name, job.id
383 );
384 }
385 Err(err) => {
386 job.state.last_status = Some("error".to_string());
387 job.state.last_error = Some(err);
388 warn!("Cron: job '{}' ({}) failed", job.name, job.id);
389 }
390 }
391
392 let should_remove = match &job.schedule {
393 CronSchedule::At { .. } => {
394 if job.delete_after_run && job.state.last_status.as_deref() == Some("ok") {
395 true
396 } else {
397 job.enabled = false;
398 job.state.next_run_at_ms = None;
399 false
400 }
401 }
402 _ => {
403 job.state.next_run_at_ms = if job.enabled {
404 compute_next_run(&job.schedule, now_ms())
405 } else {
406 None
407 };
408 false
409 }
410 };
411
412 {
413 let mut store_guard = self.store.write().await;
414 if let Some(store) = store_guard.as_mut() {
415 if should_remove {
416 store.jobs.retain(|existing| existing.id != job.id);
417 } else if let Some(existing) =
418 store.jobs.iter_mut().find(|existing| existing.id == job.id)
419 {
420 *existing = job.clone();
421 }
422 }
423 }
424
425 self.clear_active_run(&job.id).await;
426 self.save_store().await;
427 self.arm_timer().await;
428
429 self.get_job(&job.id)
430 .await
431 .ok_or_else(|| format!("job {} no longer exists after execution", job.id))
432 }
433
434 pub async fn list_jobs(&self, include_disabled: bool) -> Vec<CronJob> {
435 let store = self.load_store().await;
436 let mut jobs: Vec<CronJob> = if include_disabled {
437 store.jobs
438 } else {
439 store.jobs.into_iter().filter(|job| job.enabled).collect()
440 };
441
442 jobs.sort_by_key(|job| job.state.next_run_at_ms.unwrap_or(i64::MAX));
443 jobs
444 }
445
446 pub async fn list_job_views(&self, include_disabled: bool) -> Vec<CronJobDto> {
447 let jobs = self.list_jobs(include_disabled).await;
448 let active_guard = self.active_runs.read().await;
449 jobs.into_iter()
450 .map(|job| {
451 let active = active_guard.get(&job.id).map(|run| run.snapshot.clone());
452 Self::to_job_dto(&job, active)
453 })
454 .collect()
455 }
456
457 pub async fn get_job(&self, job_id: &str) -> Option<CronJobDto> {
458 let store = self.load_store().await;
459 let job = store.jobs.into_iter().find(|job| job.id == job_id)?;
460 let active = self.active_snapshot_for(job_id).await;
461 Some(Self::to_job_dto(&job, active))
462 }
463
464 #[allow(clippy::too_many_arguments)]
465 pub async fn add_job(
466 &self,
467 name: String,
468 schedule: CronSchedule,
469 message: String,
470 deliver: bool,
471 channel: Option<String>,
472 to: Option<String>,
473 delete_after_run: bool,
474 ) -> CronJob {
475 let now = now_ms();
476 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
477 let job = CronJob {
478 id: id.clone(),
479 name: name.clone(),
480 enabled: true,
481 schedule: schedule.clone(),
482 payload: CronPayload {
483 kind: "agent_turn".to_string(),
484 message,
485 deliver,
486 channel,
487 to,
488 },
489 state: CronJobState {
490 next_run_at_ms: compute_next_run(&schedule, now),
491 ..Default::default()
492 },
493 created_at_ms: now,
494 updated_at_ms: now,
495 delete_after_run,
496 };
497
498 {
499 let mut store_guard = self.store.write().await;
500 if let Some(store) = store_guard.as_mut() {
501 store.jobs.push(job.clone());
502 } else {
503 let mut store = CronStore::default();
504 store.jobs.push(job.clone());
505 *store_guard = Some(store);
506 }
507 }
508
509 self.save_store().await;
510 self.arm_timer().await;
511 info!("Cron: added job '{}' ({})", name, id);
512 job
513 }
514
515 pub async fn create_job(&self, request: CreateCronJobRequest) -> Result<CronJobDto, String> {
516 let now = now_ms();
517 let id = uuid::Uuid::new_v4().to_string()[..8].to_string();
518 let schedule = request.schedule.clone();
519 let job = CronJob {
520 id,
521 name: request.name,
522 enabled: request.enabled,
523 schedule,
524 payload: request.payload,
525 state: CronJobState {
526 next_run_at_ms: if request.enabled {
527 compute_next_run(&request.schedule, now)
528 } else {
529 None
530 },
531 ..Default::default()
532 },
533 created_at_ms: now,
534 updated_at_ms: now,
535 delete_after_run: request.delete_after_run,
536 };
537
538 {
539 let mut store_guard = self.store.write().await;
540 if let Some(store) = store_guard.as_mut() {
541 store.jobs.push(job.clone());
542 } else {
543 let mut store = CronStore::default();
544 store.jobs.push(job.clone());
545 *store_guard = Some(store);
546 }
547 }
548
549 self.save_store().await;
550 self.arm_timer().await;
551 Ok(Self::to_job_dto(&job, None))
552 }
553
554 pub async fn update_job(
555 &self,
556 job_id: &str,
557 request: UpdateCronJobRequest,
558 ) -> Result<CronJobDto, String> {
559 let updated_job = {
560 let mut store_guard = self.store.write().await;
561 let store = store_guard
562 .as_mut()
563 .ok_or_else(|| "cron store not initialized".to_string())?;
564 let job = store
565 .jobs
566 .iter_mut()
567 .find(|job| job.id == job_id)
568 .ok_or_else(|| format!("job {} not found", job_id))?;
569
570 job.name = request.name;
571 job.schedule = request.schedule;
572 job.payload = request.payload;
573 job.delete_after_run = request.delete_after_run;
574 job.enabled = request.enabled;
575 job.updated_at_ms = now_ms();
576 job.state.next_run_at_ms = if job.enabled {
577 compute_next_run(&job.schedule, now_ms())
578 } else {
579 None
580 };
581 job.clone()
582 };
583
584 self.save_store().await;
585 self.arm_timer().await;
586 Ok(Self::to_job_dto(
587 &updated_job,
588 self.active_snapshot_for(job_id).await,
589 ))
590 }
591
592 pub async fn delete_job(&self, job_id: &str) -> Result<(), String> {
593 let _ = self.stop_run(job_id).await;
594 let removed = {
595 let mut store_guard = self.store.write().await;
596 if let Some(store) = store_guard.as_mut() {
597 let before = store.jobs.len();
598 store.jobs.retain(|job| job.id != job_id);
599 store.jobs.len() < before
600 } else {
601 false
602 }
603 };
604
605 self.clear_active_run(job_id).await;
606
607 if removed {
608 self.save_store().await;
609 self.arm_timer().await;
610 Ok(())
611 } else {
612 Err(format!("job {} not found", job_id))
613 }
614 }
615
616 pub async fn remove_job(&self, job_id: &str) -> bool {
617 self.delete_job(job_id).await.is_ok()
618 }
619
620 pub async fn enable_job(&self, job_id: &str, enabled: bool) -> Option<CronJob> {
621 let job = {
622 let mut store_guard = self.store.write().await;
623 if let Some(store) = store_guard.as_mut() {
624 if let Some(job) = store.jobs.iter_mut().find(|job| job.id == job_id) {
625 job.enabled = enabled;
626 job.updated_at_ms = now_ms();
627 job.state.next_run_at_ms = if enabled {
628 compute_next_run(&job.schedule, now_ms())
629 } else {
630 None
631 };
632 Some(job.clone())
633 } else {
634 None
635 }
636 } else {
637 None
638 }
639 };
640
641 if job.is_some() {
642 self.save_store().await;
643 self.arm_timer().await;
644 }
645
646 job
647 }
648
649 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> Result<CronJobDto, String> {
650 let updated = self
651 .enable_job(job_id, enabled)
652 .await
653 .ok_or_else(|| format!("job {} not found", job_id))?;
654 Ok(Self::to_job_dto(
655 &updated,
656 self.active_snapshot_for(job_id).await,
657 ))
658 }
659
660 pub async fn run_job(&self, job_id: &str, force: bool) -> bool {
661 self.run_job_now(job_id, force).await.is_ok()
662 }
663
664 pub async fn run_job_now(&self, job_id: &str, force: bool) -> Result<CronJobDto, String> {
665 let job = {
666 let store_guard = self.store.read().await;
667 if let Some(store) = store_guard.as_ref() {
668 store
669 .jobs
670 .iter()
671 .find(|job| job.id == job_id)
672 .filter(|job| force || job.enabled)
673 .cloned()
674 } else {
675 None
676 }
677 }
678 .ok_or_else(|| format!("job {} not found or disabled", job_id))?;
679
680 self.execute_job_with_trigger(job, CronTrigger::Manual)
681 .await
682 }
683
684 pub async fn stop_run(&self, job_id: &str) -> Result<CronRunSnapshot, String> {
685 let active = {
686 let active_guard = self.active_runs.read().await;
687 active_guard.get(job_id).cloned()
688 }
689 .ok_or_else(|| format!("job {} is not running", job_id))?;
690
691 active.cancel_token.cancel();
692 Ok(active.snapshot)
693 }
694
695 pub async fn status(&self) -> serde_json::Value {
696 let jobs = self.list_job_views(true).await;
697 let is_running = *self.running.read().await;
698 let next_wake = self.get_next_wake_ms().await;
699
700 serde_json::json!({
701 "enabled": is_running,
702 "jobs": jobs.len(),
703 "runningJobs": jobs.iter().filter(|job| job.is_running).count(),
704 "nextWakeAtMs": next_wake,
705 })
706 }
707}
708
709struct CronServiceHandle {
710 store: Arc<RwLock<Option<CronStore>>>,
711 timer_task: Arc<Mutex<Option<JoinHandle<()>>>>,
712 running: Arc<RwLock<bool>>,
713 on_job: Option<JobCallback>,
714 store_path: PathBuf,
715 active_runs: Arc<RwLock<HashMap<String, ActiveCronRun>>>,
716}
717
718impl CronServiceHandle {
719 async fn execute_due_job(&self, mut job: CronJob) {
720 info!("Cron: due job fired '{}' ({})", job.name, job.id);
721 let mut active_guard = self.active_runs.write().await;
722 if active_guard.contains_key(&job.id) {
723 return;
724 }
725
726 let timestamp = now_ms();
727 active_guard.insert(
728 job.id.clone(),
729 ActiveCronRun {
730 snapshot: CronRunSnapshot {
731 run_id: uuid::Uuid::new_v4().to_string(),
732 job_id: job.id.clone(),
733 started_at_ms: timestamp,
734 last_heartbeat_at_ms: timestamp,
735 trigger: CronTrigger::Scheduled,
736 cancelable: true,
737 },
738 cancel_token: CancellationToken::new(),
739 },
740 );
741 let cancel_token = active_guard
742 .get(&job.id)
743 .map(|run| run.cancel_token.clone())
744 .expect("inserted active run");
745 drop(active_guard);
746
747 let result = if cancel_token.is_cancelled() {
748 Err("job cancelled before start".to_string())
749 } else if let Some(callback) = &self.on_job {
750 match (callback)(job.clone(), cancel_token.clone()).await {
751 Some(response) if cancel_token.is_cancelled() => Err("job cancelled".to_string()),
752 Some(response) if response.to_ascii_lowercase().starts_with("error") => {
753 Err(response)
754 }
755 Some(_) | None => Ok(()),
756 }
757 } else if cancel_token.is_cancelled() {
758 Err("job cancelled".to_string())
759 } else {
760 Ok(())
761 };
762
763 job.state.last_run_at_ms = Some(timestamp);
764 job.updated_at_ms = now_ms();
765 match result {
766 Ok(()) => {
767 job.state.last_status = Some("ok".to_string());
768 job.state.last_error = None;
769 info!(
770 "Cron: scheduled job '{}' ({}) completed successfully",
771 job.name, job.id
772 );
773 }
774 Err(err) => {
775 job.state.last_status = Some("error".to_string());
776 job.state.last_error = Some(err);
777 warn!("Cron: scheduled job '{}' ({}) failed", job.name, job.id);
778 }
779 }
780
781 let should_remove = match &job.schedule {
782 CronSchedule::At { .. } => {
783 if job.delete_after_run && job.state.last_status.as_deref() == Some("ok") {
784 true
785 } else {
786 job.enabled = false;
787 job.state.next_run_at_ms = None;
788 false
789 }
790 }
791 _ => {
792 job.state.next_run_at_ms = if job.enabled {
793 compute_next_run(&job.schedule, now_ms())
794 } else {
795 None
796 };
797 false
798 }
799 };
800
801 {
802 let mut store_guard = self.store.write().await;
803 if let Some(store) = store_guard.as_mut() {
804 if should_remove {
805 store.jobs.retain(|existing| existing.id != job.id);
806 } else if let Some(existing) =
807 store.jobs.iter_mut().find(|existing| existing.id == job.id)
808 {
809 *existing = job.clone();
810 }
811 }
812 }
813
814 let mut active_guard = self.active_runs.write().await;
815 active_guard.remove(&job.id);
816 }
817
818 async fn on_timer(&self) {
819 let now = now_ms();
820 let due_jobs = {
821 let store_guard = self.store.read().await;
822 if let Some(store) = store_guard.as_ref() {
823 store
824 .jobs
825 .iter()
826 .filter(|job| {
827 job.enabled
828 && job.state.next_run_at_ms.is_some()
829 && now >= job.state.next_run_at_ms.unwrap()
830 })
831 .cloned()
832 .collect::<Vec<_>>()
833 } else {
834 Vec::new()
835 }
836 };
837
838 for job in due_jobs {
839 self.execute_due_job(job).await;
840 }
841
842 if let Some(parent) = self.store_path.parent() {
843 let _ = tokio::fs::create_dir_all(parent).await;
844 }
845 {
846 let store_guard = self.store.read().await;
847 if let Some(store) = store_guard.as_ref() {
848 if let Ok(content) = serde_json::to_string_pretty(store) {
849 let _ = tokio::fs::write(&self.store_path, content).await;
850 }
851 }
852 }
853
854 self.rearm_timer();
855 }
856
857 fn rearm_timer(&self) {
858 let store = Arc::clone(&self.store);
859 let timer_task = Arc::clone(&self.timer_task);
860 let running = Arc::clone(&self.running);
861 let on_job = self.on_job.clone();
862 let store_path = self.store_path.clone();
863 let active_runs = Arc::clone(&self.active_runs);
864
865 tokio::spawn(async move {
866 {
867 let mut timer_guard = timer_task.lock().await;
868 if let Some(task) = timer_guard.take() {
869 task.abort();
870 }
871 }
872
873 let is_running = *running.read().await;
874 if !is_running {
875 return;
876 }
877
878 let next_wake = {
879 let store_guard = store.read().await;
880 store_guard.as_ref().and_then(|cron_store| {
881 cron_store
882 .jobs
883 .iter()
884 .filter(|job| job.enabled && job.state.next_run_at_ms.is_some())
885 .filter_map(|job| job.state.next_run_at_ms)
886 .min()
887 })
888 };
889
890 let Some(next_wake) = next_wake else {
891 return;
892 };
893
894 let delay_ms = (next_wake - now_ms()).max(0);
895 let delay = tokio::time::Duration::from_millis(delay_ms as u64);
896 let service = Arc::new(CronServiceHandle {
897 store: Arc::clone(&store),
898 timer_task: Arc::clone(&timer_task),
899 running: Arc::clone(&running),
900 on_job: on_job.clone(),
901 store_path: store_path.clone(),
902 active_runs: Arc::clone(&active_runs),
903 });
904
905 let task = tokio::spawn(async move {
906 tokio::time::sleep(delay).await;
907 let is_running = *service.running.read().await;
908 if is_running {
909 service.on_timer().await;
910 }
911 });
912
913 let mut timer_guard = timer_task.lock().await;
914 *timer_guard = Some(task);
915 });
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use super::*;
922 use tempfile::TempDir;
923
924 #[test]
925 fn test_compute_next_run_every() {
926 let schedule = CronSchedule::every(5000);
927 assert_eq!(compute_next_run(&schedule, 1000), Some(6000));
928 }
929
930 #[test]
931 fn test_compute_next_run_cron_five_fields_supported() {
932 let schedule = CronSchedule::cron("* * * * *".to_string(), None);
933 let next = compute_next_run(&schedule, 0).unwrap();
934 assert_eq!(next, 60_000);
935 }
936
937 #[tokio::test]
938 async fn test_enable_disable_updates_status() {
939 let temp_dir = TempDir::new().unwrap();
940 let store_path = temp_dir.path().join("cron.json");
941 let service = CronService::new(store_path, None);
942 service.start().await;
943
944 let job = service
945 .create_job(CreateCronJobRequest {
946 name: "Test".to_string(),
947 schedule: CronSchedule::every(5000),
948 payload: CronPayload::default(),
949 delete_after_run: false,
950 enabled: true,
951 })
952 .await
953 .unwrap();
954
955 let paused = service.set_job_enabled(&job.job.id, false).await.unwrap();
956 assert_eq!(paused.computed_status, CronJobLifecycleStatus::Paused);
957
958 let scheduled = service.set_job_enabled(&job.job.id, true).await.unwrap();
959 assert_eq!(scheduled.computed_status, CronJobLifecycleStatus::Scheduled);
960 service.stop().await;
961 }
962
963 #[tokio::test]
964 async fn test_delete_running_job_removes_it() {
965 let temp_dir = TempDir::new().unwrap();
966 let store_path = temp_dir.path().join("cron.json");
967 let callback: JobCallback = Arc::new(|_job, token| {
968 Box::pin(async move {
969 tokio::select! {
970 _ = tokio::time::sleep(tokio::time::Duration::from_millis(200)) => Some("done".to_string()),
971 _ = token.cancelled() => Some("Error: cancelled".to_string()),
972 }
973 })
974 });
975 let service = Arc::new(CronService::new(store_path, Some(callback)));
976 service.start().await;
977
978 let job = service
979 .create_job(CreateCronJobRequest {
980 name: "Long".to_string(),
981 schedule: CronSchedule::every(5000),
982 payload: CronPayload::default(),
983 delete_after_run: false,
984 enabled: true,
985 })
986 .await
987 .unwrap();
988
989 let job_id = job.job.id.clone();
990 let service_for_run = Arc::clone(&service);
991 let job_id_for_run = job_id.clone();
992 let runner = tokio::spawn(async move {
993 let _ = service_for_run.run_job_now(&job_id_for_run, true).await;
994 });
995
996 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
997 service.delete_job(&job_id).await.unwrap();
998 let jobs = service.list_job_views(true).await;
999 assert!(jobs.into_iter().all(|job| job.job.id != job_id));
1000
1001 let _ = runner.await;
1002 service.stop().await;
1003 }
1004}