1use std::collections::{HashMap, HashSet};
5use std::sync::Arc;
6use std::time::Duration;
7#[allow(unused_imports)]
8use zeph_db::sql;
9
10use chrono::Utc;
11use tokio::sync::{Mutex, mpsc, watch};
12
13use crate::error::SchedulerError;
14use crate::sanitize::sanitize_task_prompt;
15use crate::store::JobStore;
16use crate::task::{ScheduledTask, TaskDescriptor, TaskHandler, TaskKind, TaskMode};
17
18pub enum SchedulerMessage {
50 Add(Box<TaskDescriptor>),
52 Cancel(String),
54}
55
56pub struct Scheduler {
83 tasks: Vec<ScheduledTask>,
84 store: JobStore,
85 handlers: HashMap<String, Box<dyn TaskHandler>>,
86 shutdown_rx: watch::Receiver<bool>,
87 task_rx: mpsc::Receiver<SchedulerMessage>,
88 custom_task_tx: Option<mpsc::Sender<String>>,
90 max_tasks: usize,
91 in_flight: Arc<Mutex<HashSet<String>>>,
96}
97
98impl Scheduler {
99 #[must_use]
104 pub fn new(
105 store: JobStore,
106 shutdown_rx: watch::Receiver<bool>,
107 ) -> (Self, mpsc::Sender<SchedulerMessage>) {
108 Self::with_max_tasks(store, shutdown_rx, 100)
109 }
110
111 #[must_use]
119 pub fn with_max_tasks(
120 store: JobStore,
121 shutdown_rx: watch::Receiver<bool>,
122 max_tasks: usize,
123 ) -> (Self, mpsc::Sender<SchedulerMessage>) {
124 let (tx, rx) = mpsc::channel(64);
125 let scheduler = Self {
126 tasks: Vec::new(),
127 store,
128 handlers: HashMap::new(),
129 shutdown_rx,
130 task_rx: rx,
131 custom_task_tx: None,
132 max_tasks,
133 in_flight: Arc::new(Mutex::new(HashSet::new())),
134 };
135 (scheduler, tx)
136 }
137
138 #[must_use]
140 pub fn with_custom_task_sender(mut self, tx: mpsc::Sender<String>) -> Self {
141 self.custom_task_tx = Some(tx);
142 self
143 }
144
145 pub fn add_task(&mut self, task: ScheduledTask) {
151 self.tasks.push(task);
152 }
153
154 pub fn register_handler(&mut self, kind: &TaskKind, handler: Box<dyn TaskHandler>) {
160 self.handlers.insert(kind.as_str().to_owned(), handler);
161 }
162
163 pub async fn init(&mut self) -> Result<(), SchedulerError> {
176 self.store.init().await?;
177 let now = Utc::now();
178 for task in &self.tasks {
179 match &task.mode {
180 TaskMode::Periodic { schedule } => {
181 self.store
182 .upsert_job_with_mode(
183 &task.name,
184 &schedule.to_string(),
185 task.kind.as_str(),
186 "periodic",
187 None,
188 "",
189 )
190 .await?;
191 if self.store.get_next_run(&task.name).await?.is_none() {
193 match schedule.after(&now).next() {
194 Some(next) => {
195 self.store
196 .set_next_run(&task.name, &next.to_rfc3339())
197 .await?;
198 }
199 None => {
200 tracing::warn!(
201 task = %task.name,
202 "cron produces no future occurrence, skipping next_run"
203 );
204 }
205 }
206 }
207 }
208 TaskMode::OneShot { run_at } => {
209 self.store
210 .upsert_job_with_mode(
211 &task.name,
212 "",
213 task.kind.as_str(),
214 "oneshot",
215 Some(&run_at.to_rfc3339()),
216 "",
217 )
218 .await?;
219 }
220 }
221 }
222
223 let stored_jobs = self.store.list_jobs_full().await?;
227 let static_names: std::collections::HashSet<String> =
229 self.tasks.iter().map(|t| t.name.clone()).collect();
230
231 for job in stored_jobs {
232 if job.task_mode != "periodic" || static_names.contains(&job.name) {
233 continue;
234 }
235 match ScheduledTask::periodic(
236 job.name.clone(),
237 &job.cron_expr,
238 crate::task::TaskKind::from_str_kind(&job.kind),
239 serde_json::Value::Null,
240 ) {
241 Ok(task) => {
242 if self.store.get_next_run(&job.name).await?.is_none()
244 && let Some(schedule) = task.cron_schedule()
245 {
246 match schedule.after(&now).next() {
247 Some(next) => {
248 if let Err(e) =
249 self.store.set_next_run(&job.name, &next.to_rfc3339()).await
250 {
251 tracing::warn!(
252 task = %job.name,
253 "failed to persist next_run for hydrated job: {e}"
254 );
255 }
256 }
257 None => {
258 tracing::warn!(
259 task = %job.name,
260 "cron produces no future occurrence, skipping next_run"
261 );
262 }
263 }
264 }
265 tracing::debug!(task = %job.name, "hydrated CLI-added periodic job from store");
266 self.tasks.push(task);
267 }
268 Err(e) => {
269 tracing::warn!(
270 task = %job.name,
271 cron_expr = %job.cron_expr,
272 "skipping persisted job with invalid cron expression: {e}"
273 );
274 }
275 }
276 }
277
278 Ok(())
279 }
280
281 pub async fn catch_up_missed(&mut self) -> Result<(), SchedulerError> {
294 let _span =
295 tracing::info_span!("scheduler.daemon.catch_up", tasks = self.tasks.len()).entered();
296
297 let now = chrono::Utc::now();
298 let mut replayed = 0usize;
299
300 let overdue: Vec<_> = {
302 let mut v = Vec::new();
303 for task in &self.tasks {
304 let TaskMode::Periodic { .. } = &task.mode else {
305 continue;
306 };
307 if let Ok(Some(ref s)) = self.store.get_next_run(&task.name).await
308 && s.parse::<chrono::DateTime<chrono::Utc>>()
309 .is_ok_and(|dt| dt <= now)
310 {
311 v.push(task.name.clone());
312 }
313 }
314 v
315 };
316
317 for name in &overdue {
318 {
320 let mut guard = self.in_flight.lock().await;
321 if guard.contains(name.as_str()) {
322 tracing::debug!(task = %name, "catch_up_missed: task in-flight, skipping");
323 continue;
324 }
325 guard.insert(name.clone());
326 }
327
328 let result = self.run_periodic_task_by_name(name, &now).await;
329
330 self.in_flight.lock().await.remove(name.as_str());
331
332 match result {
333 Ok(true) => replayed += 1,
334 Ok(false) => {}
335 Err(e) => tracing::warn!(task = %name, "catch_up_missed: handler error: {e}"),
336 }
337 }
338
339 tracing::info!(replayed, "catch_up_missed complete");
340 Ok(())
341 }
342
343 async fn run_periodic_task_by_name(
347 &self,
348 name: &str,
349 now: &chrono::DateTime<chrono::Utc>,
350 ) -> Result<bool, SchedulerError> {
351 let Some(task) = self.tasks.iter().find(|t| t.name == name) else {
352 return Ok(false);
353 };
354 let TaskMode::Periodic { schedule } = &task.mode else {
355 return Ok(false);
356 };
357 let Some(handler) = self.handlers.get(task.kind.as_str()) else {
358 tracing::debug!(task = %name, "catch_up_missed: no handler, skipping");
359 return Ok(false);
360 };
361
362 tracing::info!(task = %name, "catch_up_missed: executing overdue task");
363 handler.execute(&task.config).await?;
364
365 let next = schedule
366 .after(now)
367 .next()
368 .map(|dt| dt.to_rfc3339())
369 .unwrap_or_default();
370 self.store
371 .record_run(name, &now.to_rfc3339(), &next)
372 .await?;
373 Ok(true)
374 }
375
376 pub async fn run_with_interval_and_grace(&mut self, tick_secs: u64, grace_secs: u64) {
390 let secs = tick_secs.clamp(5, 3600);
391 let mut interval = tokio::time::interval(Duration::from_secs(secs));
392 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
393
394 loop {
395 tokio::select! {
396 _ = interval.tick() => {
397 let _tick_span = tracing::info_span!(
398 "scheduler.daemon.tick",
399 tasks = self.tasks.len()
400 ).entered();
401 self.drain_channel().await;
402 self.tick().await;
403 }
404 _ = self.shutdown_rx.changed() => {
405 if *self.shutdown_rx.borrow() {
406 tracing::info!("scheduler shutting down (grace {}s)", grace_secs);
407 if grace_secs > 0 {
408 let deadline = tokio::time::Instant::now()
409 + Duration::from_secs(grace_secs.min(60));
410 loop {
411 if self.in_flight.lock().await.is_empty() {
412 tracing::debug!("scheduler: no in-flight tasks, exiting immediately");
413 break;
414 }
415 if tokio::time::Instant::now() >= deadline {
416 tracing::warn!("scheduler: grace period elapsed with tasks still in-flight");
417 break;
418 }
419 tokio::time::sleep(Duration::from_millis(100)).await;
420 }
421 }
422 break;
423 }
424 }
425 }
426 }
427 }
428
429 pub async fn run_with_interval(&mut self, tick_secs: u64) {
437 let secs = tick_secs.max(1);
438 let mut interval = tokio::time::interval(Duration::from_secs(secs));
439 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
443 loop {
444 tokio::select! {
445 _ = interval.tick() => {
446 self.drain_channel().await;
447 self.tick().await;
448 }
449 _ = self.shutdown_rx.changed() => {
450 if *self.shutdown_rx.borrow() {
451 tracing::info!("scheduler shutting down");
452 break;
453 }
454 }
455 }
456 }
457 }
458
459 pub async fn run(&mut self) {
464 let mut interval = tokio::time::interval(Duration::from_mins(1));
465 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
466 loop {
467 tokio::select! {
468 _ = interval.tick() => {
469 self.drain_channel().await;
470 self.tick().await;
471 }
472 _ = self.shutdown_rx.changed() => {
473 if *self.shutdown_rx.borrow() {
474 tracing::info!("scheduler shutting down");
475 break;
476 }
477 }
478 }
479 }
480 }
481
482 async fn drain_channel(&mut self) {
483 while let Ok(msg) = self.task_rx.try_recv() {
484 match msg {
485 SchedulerMessage::Add(boxed) => {
486 let desc = *boxed;
487 self.register_descriptor(desc).await;
488 }
489 SchedulerMessage::Cancel(name) => {
490 self.tasks.retain(|t| t.name != name);
491 if let Err(e) = self.store.delete_job(&name).await {
492 tracing::warn!(task = %name, "failed to delete job from store: {e}");
493 }
494 }
495 }
496 }
497 }
498
499 async fn register_descriptor(&mut self, desc: TaskDescriptor) {
500 let is_new = !self.tasks.iter().any(|t| t.name == desc.name);
502 if is_new && self.tasks.len() >= self.max_tasks {
503 tracing::warn!(
504 task = %desc.name,
505 max_tasks = self.max_tasks,
506 "max_tasks limit reached, dropping task"
507 );
508 return;
509 }
510 let now = Utc::now();
511 match &desc.mode {
512 TaskMode::Periodic { schedule } => {
513 if let Err(e) = self
514 .store
515 .upsert_job_with_mode(
516 &desc.name,
517 &schedule.to_string(),
518 desc.kind.as_str(),
519 "periodic",
520 None,
521 "",
522 )
523 .await
524 {
525 tracing::warn!(task = %desc.name, "failed to upsert job: {e}");
526 return;
527 }
528 if let Some(next) = schedule.after(&now).next() {
529 let _ = self
530 .store
531 .set_next_run(&desc.name, &next.to_rfc3339())
532 .await;
533 }
534 }
535 TaskMode::OneShot { run_at } => {
536 if let Err(e) = self
537 .store
538 .upsert_job_with_mode(
539 &desc.name,
540 "",
541 desc.kind.as_str(),
542 "oneshot",
543 Some(&run_at.to_rfc3339()),
544 "",
545 )
546 .await
547 {
548 tracing::warn!(task = %desc.name, "failed to upsert oneshot job: {e}");
549 return;
550 }
551 }
552 }
553 self.tasks.retain(|t| t.name != desc.name);
555 self.tasks.push(ScheduledTask {
556 name: desc.name,
557 mode: desc.mode,
558 kind: desc.kind,
559 config: desc.config,
560 });
561 }
562
563 async fn tick(&mut self) {
564 let now = Utc::now();
565 let mut completed_oneshots: Vec<String> = Vec::new();
566
567 for task in &self.tasks {
568 let should_run = match &task.mode {
569 TaskMode::Periodic { .. } => {
570 match self.store.get_next_run(&task.name).await {
571 Ok(Some(ref s)) => {
572 s.parse::<chrono::DateTime<Utc>>().is_ok_and(|dt| dt <= now)
573 }
574 Ok(None) => {
577 if let Some(schedule) = task.cron_schedule()
578 && let Some(next) = schedule.after(&now).next()
579 {
580 let _ = self
581 .store
582 .set_next_run(&task.name, &next.to_rfc3339())
583 .await;
584 }
585 false
586 }
587 Err(e) => {
588 tracing::warn!(task = %task.name, "failed to check next_run: {e}");
589 false
590 }
591 }
592 }
593 TaskMode::OneShot { run_at } => *run_at <= now,
594 };
595
596 if should_run {
597 if let Some(handler) = self.handlers.get(task.kind.as_str()) {
598 tracing::info!(task = %task.name, kind = task.kind.as_str(), "executing task");
599 match handler.execute(&task.config).await {
600 Ok(()) => match &task.mode {
601 TaskMode::Periodic { schedule } => {
602 let next = schedule
603 .after(&now)
604 .next()
605 .map(|dt| dt.to_rfc3339())
606 .unwrap_or_default();
607 if let Err(e) = self
608 .store
609 .record_run(&task.name, &now.to_rfc3339(), &next)
610 .await
611 {
612 tracing::warn!(task = %task.name, "failed to record run: {e}");
613 }
614 }
615 TaskMode::OneShot { .. } => {
616 if let Err(e) = self.store.mark_done(&task.name).await {
617 tracing::warn!(task = %task.name, "failed to mark done: {e}");
618 }
619 completed_oneshots.push(task.name.clone());
620 }
621 },
622 Err(e) => {
623 tracing::warn!(task = %task.name, "task execution failed: {e}");
624 }
625 }
626 } else if let TaskMode::OneShot { .. } = &task.mode {
627 if let (TaskKind::Custom(_), Some(tx)) = (&task.kind, &self.custom_task_tx) {
633 let raw =
634 task.config.get("task").and_then(|v| v.as_str()).unwrap_or(
635 "Execute the following scheduled task now: check status",
636 );
637 let prompt = sanitize_task_prompt(raw);
638 let _ = tx.try_send(prompt);
639 if let Err(e) = self.store.mark_done(&task.name).await {
640 tracing::warn!(task = %task.name, "failed to mark done: {e}");
641 }
642 completed_oneshots.push(task.name.clone());
643 } else {
644 tracing::debug!(
645 task = %task.name,
646 kind = task.kind.as_str(),
647 "no handler registered"
648 );
649 }
650 } else {
651 tracing::debug!(task = %task.name, kind = task.kind.as_str(), "no handler registered");
652 }
653 }
654 }
655
656 self.tasks.retain(|t| !completed_oneshots.contains(&t.name));
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use std::pin::Pin;
664 use std::sync::Arc;
665 use std::sync::atomic::{AtomicU32, Ordering};
666
667 use chrono::Duration;
668
669 use super::*;
670 use crate::task::TaskHandler;
671 use zeph_db::DbPool;
672
673 struct CountingHandler {
674 count: Arc<AtomicU32>,
675 }
676
677 impl TaskHandler for CountingHandler {
678 fn execute(
679 &self,
680 _config: &serde_json::Value,
681 ) -> Pin<Box<dyn std::future::Future<Output = Result<(), SchedulerError>> + Send + '_>>
682 {
683 let count = self.count.clone();
684 Box::pin(async move {
685 count.fetch_add(1, Ordering::Relaxed);
686 Ok(())
687 })
688 }
689 }
690
691 async fn test_pool() -> DbPool {
692 zeph_db::sqlx::SqlitePool::connect("sqlite::memory:")
693 .await
694 .unwrap()
695 }
696
697 #[tokio::test]
698 async fn scheduler_init_and_tick() {
699 let pool = test_pool().await;
700 let store = JobStore::new(pool.clone());
701 let (_tx, rx) = watch::channel(false);
702 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
703
704 let task = ScheduledTask::new(
705 "test",
706 "* * * * * *",
707 TaskKind::HealthCheck,
708 serde_json::Value::Null,
709 )
710 .unwrap();
711 scheduler.add_task(task);
712
713 let count = Arc::new(AtomicU32::new(0));
714 scheduler.register_handler(
715 &TaskKind::HealthCheck,
716 Box::new(CountingHandler {
717 count: count.clone(),
718 }),
719 );
720
721 scheduler.init().await.unwrap();
722
723 zeph_db::query(sql!(
725 "UPDATE scheduled_jobs SET next_run = '2000-01-01T00:00:00+00:00' WHERE name = 'test'"
726 ))
727 .execute(&pool)
728 .await
729 .unwrap();
730
731 scheduler.tick().await;
732 assert_eq!(count.load(Ordering::Relaxed), 1);
733 }
734
735 #[tokio::test]
737 async fn tick_does_not_fire_without_next_run() {
738 let pool = test_pool().await;
739 let store = JobStore::new(pool.clone());
740 let (_tx, rx) = watch::channel(false);
741 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
742
743 let task = ScheduledTask::new(
744 "yearly",
745 "0 0 1 1 * *",
746 TaskKind::HealthCheck,
747 serde_json::Value::Null,
748 )
749 .unwrap();
750 scheduler.add_task(task);
751
752 let count = Arc::new(AtomicU32::new(0));
753 scheduler.register_handler(
754 &TaskKind::HealthCheck,
755 Box::new(CountingHandler {
756 count: count.clone(),
757 }),
758 );
759
760 scheduler.store.init().await.unwrap();
762 scheduler
763 .store
764 .upsert_job("yearly", "0 0 1 1 * *", "health_check")
765 .await
766 .unwrap();
767 zeph_db::query(sql!(
769 "UPDATE scheduled_jobs SET next_run = NULL WHERE name = 'yearly'"
770 ))
771 .execute(&pool)
772 .await
773 .unwrap();
774
775 scheduler.tick().await;
776 assert_eq!(
777 count.load(Ordering::Relaxed),
778 0,
779 "task without next_run must not fire (PERF-SC-04)"
780 );
781 }
782
783 #[tokio::test]
785 async fn init_always_sets_next_run() {
786 let pool = test_pool().await;
787 let store = JobStore::new(pool.clone());
788 let (_tx, rx) = watch::channel(false);
789 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
790
791 let task = ScheduledTask::new(
792 "periodic",
793 "0 * * * * *",
794 TaskKind::HealthCheck,
795 serde_json::Value::Null,
796 )
797 .unwrap();
798 scheduler.add_task(task);
799 scheduler.init().await.unwrap();
800
801 let next: Option<String> = zeph_db::query_scalar(sql!(
802 "SELECT next_run FROM scheduled_jobs WHERE name = 'periodic'"
803 ))
804 .fetch_optional(&pool)
805 .await
806 .unwrap()
807 .flatten();
808 assert!(
809 next.is_some(),
810 "next_run must be set after init() for periodic task"
811 );
812 }
813
814 #[tokio::test]
816 async fn task_does_not_fire_before_next_run() {
817 let pool = test_pool().await;
818 let store = JobStore::new(pool.clone());
819 let (_tx, rx) = watch::channel(false);
820 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
821
822 let task = ScheduledTask::new(
823 "future",
824 "0 0 1 1 * *", TaskKind::HealthCheck,
826 serde_json::Value::Null,
827 )
828 .unwrap();
829 scheduler.add_task(task);
830
831 let count = Arc::new(AtomicU32::new(0));
832 scheduler.register_handler(
833 &TaskKind::HealthCheck,
834 Box::new(CountingHandler {
835 count: count.clone(),
836 }),
837 );
838
839 scheduler.init().await.unwrap();
840
841 let far_future = "2099-01-01T00:00:00+00:00";
843 zeph_db::query(sql!(
844 "UPDATE scheduled_jobs SET next_run = ? WHERE name = 'future'"
845 ))
846 .bind(far_future)
847 .execute(&pool)
848 .await
849 .unwrap();
850
851 scheduler.tick().await;
852 assert_eq!(
853 count.load(Ordering::Relaxed),
854 0,
855 "should not fire before next_run"
856 );
857 }
858
859 #[tokio::test]
861 async fn next_run_advances_after_execution() {
862 let pool = test_pool().await;
863 let store = JobStore::new(pool.clone());
864 let (_tx, rx) = watch::channel(false);
865 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
866
867 let task = ScheduledTask::new(
868 "adv",
869 "0 * * * * *",
870 TaskKind::HealthCheck,
871 serde_json::Value::Null,
872 )
873 .unwrap();
874 scheduler.add_task(task);
875 scheduler.register_handler(
876 &TaskKind::HealthCheck,
877 Box::new(CountingHandler {
878 count: Arc::new(AtomicU32::new(0)),
879 }),
880 );
881
882 scheduler.init().await.unwrap();
883
884 zeph_db::query(sql!(
886 "UPDATE scheduled_jobs SET next_run = '2000-01-01T00:00:00+00:00' WHERE name = 'adv'"
887 ))
888 .execute(&pool)
889 .await
890 .unwrap();
891
892 scheduler.tick().await;
893
894 let next: Option<String> = zeph_db::query_scalar(sql!(
896 "SELECT next_run FROM scheduled_jobs WHERE name = 'adv'"
897 ))
898 .fetch_optional(&pool)
899 .await
900 .unwrap()
901 .flatten();
902 let next_str = next.expect("next_run should be set after execution");
903 let next_dt = next_str
904 .parse::<chrono::DateTime<Utc>>()
905 .expect("should parse as RFC3339");
906 let epoch_2001 = chrono::DateTime::parse_from_rfc3339("2001-01-01T00:00:00+00:00")
911 .expect("static parse")
912 .with_timezone(&Utc);
913 assert!(
914 next_dt > epoch_2001,
915 "next_run must have advanced beyond the backdated value after firing"
916 );
917 }
918
919 #[tokio::test]
920 async fn scheduler_shutdown() {
921 let pool = test_pool().await;
922 let store = JobStore::new(pool);
923 let (tx, rx) = watch::channel(false);
924 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
925 scheduler.init().await.unwrap();
926
927 let handle = tokio::spawn(async move { scheduler.run().await });
928 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
929 let _ = tx.send(true);
930 tokio::time::timeout(std::time::Duration::from_secs(2), handle)
931 .await
932 .expect("scheduler should stop")
933 .expect("task should complete");
934 }
935
936 #[tokio::test]
938 async fn oneshot_fires_at_run_at() {
939 let pool = test_pool().await;
940 let store = JobStore::new(pool.clone());
941 let (_tx, rx) = watch::channel(false);
942 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
943
944 let past = Utc::now() - Duration::hours(1);
945 let task = ScheduledTask::oneshot(
946 "os_fire",
947 past,
948 TaskKind::HealthCheck,
949 serde_json::Value::Null,
950 );
951 scheduler.add_task(task);
952
953 let count = Arc::new(AtomicU32::new(0));
954 scheduler.register_handler(
955 &TaskKind::HealthCheck,
956 Box::new(CountingHandler {
957 count: count.clone(),
958 }),
959 );
960 scheduler.init().await.unwrap();
961 scheduler.tick().await;
962
963 assert_eq!(
964 count.load(Ordering::Relaxed),
965 1,
966 "oneshot must fire when run_at is past"
967 );
968 }
969
970 #[tokio::test]
972 async fn oneshot_does_not_fire_before_run_at() {
973 let pool = test_pool().await;
974 let store = JobStore::new(pool.clone());
975 let (_tx, rx) = watch::channel(false);
976 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
977
978 let future = Utc::now() + Duration::hours(1);
979 let task = ScheduledTask::oneshot(
980 "os_future",
981 future,
982 TaskKind::HealthCheck,
983 serde_json::Value::Null,
984 );
985 scheduler.add_task(task);
986
987 let count = Arc::new(AtomicU32::new(0));
988 scheduler.register_handler(
989 &TaskKind::HealthCheck,
990 Box::new(CountingHandler {
991 count: count.clone(),
992 }),
993 );
994 scheduler.init().await.unwrap();
995 scheduler.tick().await;
996
997 assert_eq!(
998 count.load(Ordering::Relaxed),
999 0,
1000 "oneshot must not fire before run_at"
1001 );
1002 }
1003
1004 #[tokio::test]
1006 async fn oneshot_removed_after_execution() {
1007 let pool = test_pool().await;
1008 let store = JobStore::new(pool.clone());
1009 let (_tx, rx) = watch::channel(false);
1010 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
1011
1012 let past = Utc::now() - Duration::seconds(1);
1013 let task = ScheduledTask::oneshot(
1014 "os_rm",
1015 past,
1016 TaskKind::HealthCheck,
1017 serde_json::Value::Null,
1018 );
1019 scheduler.add_task(task);
1020 scheduler.register_handler(
1021 &TaskKind::HealthCheck,
1022 Box::new(CountingHandler {
1023 count: Arc::new(AtomicU32::new(0)),
1024 }),
1025 );
1026 scheduler.init().await.unwrap();
1027 assert_eq!(scheduler.tasks.len(), 1);
1028 scheduler.tick().await;
1029 assert_eq!(
1030 scheduler.tasks.len(),
1031 0,
1032 "completed oneshot must be removed from tasks"
1033 );
1034 }
1035
1036 #[tokio::test]
1042 async fn init_hydrates_cli_added_periodic_jobs_from_store() {
1043 let pool = test_pool().await;
1044 let store = JobStore::new(pool.clone());
1045
1046 store.init().await.unwrap();
1050 store
1051 .upsert_job_with_mode(
1052 "cli-job",
1053 "0 * * * * *",
1054 "health_check",
1055 "periodic",
1056 None,
1057 "",
1058 )
1059 .await
1060 .unwrap();
1061
1062 let store2 = JobStore::new(pool.clone());
1065 let (_tx, rx) = watch::channel(false);
1066 let (mut scheduler, _msg_tx) = Scheduler::new(store2, rx);
1067
1068 assert_eq!(
1070 scheduler.tasks.len(),
1071 0,
1072 "tasks must be empty before init()"
1073 );
1074
1075 scheduler.init().await.unwrap();
1076
1077 assert_eq!(
1079 scheduler.tasks.len(),
1080 1,
1081 "init() must hydrate the CLI-added periodic job from the store"
1082 );
1083 assert_eq!(
1084 scheduler.tasks[0].name, "cli-job",
1085 "hydrated task name must match the DB row"
1086 );
1087
1088 let next_run = store.get_next_run("cli-job").await.unwrap();
1090 assert!(
1091 next_run.is_some(),
1092 "init() must compute and persist next_run for the hydrated job"
1093 );
1094 let dt = next_run
1095 .unwrap()
1096 .parse::<chrono::DateTime<chrono::Utc>>()
1097 .expect("next_run must be a valid RFC3339 timestamp");
1098 assert!(
1099 dt > chrono::Utc::now(),
1100 "next_run must be in the future after hydration"
1101 );
1102 }
1103
1104 #[tokio::test]
1107 async fn init_does_not_duplicate_static_tasks_already_in_tasks() {
1108 let pool = test_pool().await;
1109 let store = JobStore::new(pool.clone());
1110 let (_tx, rx) = watch::channel(false);
1111 let (mut scheduler, _msg_tx) = Scheduler::new(store, rx);
1112
1113 let task = ScheduledTask::new(
1115 "static-job",
1116 "0 * * * * *",
1117 TaskKind::HealthCheck,
1118 serde_json::Value::Null,
1119 )
1120 .unwrap();
1121 scheduler.add_task(task);
1122
1123 scheduler.init().await.unwrap();
1126
1127 assert_eq!(
1128 scheduler.tasks.len(),
1129 1,
1130 "init() must not duplicate a static task that is already in self.tasks"
1131 );
1132 }
1133
1134 #[tokio::test]
1136 async fn channel_registration() {
1137 let pool = test_pool().await;
1138 let store = JobStore::new(pool.clone());
1139 let (_tx, rx) = watch::channel(false);
1140 let (mut scheduler, msg_tx) = Scheduler::new(store, rx);
1141
1142 let count = Arc::new(AtomicU32::new(0));
1143 scheduler.register_handler(
1144 &TaskKind::HealthCheck,
1145 Box::new(CountingHandler {
1146 count: count.clone(),
1147 }),
1148 );
1149 scheduler.init().await.unwrap();
1150
1151 let past = Utc::now() - Duration::hours(1);
1153 let desc = TaskDescriptor {
1154 name: "chan_task".to_owned(),
1155 mode: TaskMode::OneShot { run_at: past },
1156 kind: TaskKind::HealthCheck,
1157 config: serde_json::Value::Null,
1158 };
1159 msg_tx
1160 .send(SchedulerMessage::Add(Box::new(desc)))
1161 .await
1162 .unwrap();
1163
1164 scheduler.drain_channel().await;
1166 scheduler.tick().await;
1167
1168 assert_eq!(
1169 count.load(Ordering::Relaxed),
1170 1,
1171 "channel-registered task must fire"
1172 );
1173 }
1174}