1use async_trait::async_trait;
4use itertools::Itertools;
5use std::{
6 collections::{BTreeMap, HashMap},
7 fmt::Debug,
8 sync::{Arc, Mutex, RwLock},
9};
10
11use chrono::{DateTime, Utc};
12use croner::Cron;
13use std::str::FromStr;
14
15use crate::prelude::*;
16use cloudillo_types::{lock, meta_adapter};
17
18pub type TaskId = u64;
19
20pub enum TaskType {
21 Periodic,
22 Once,
23}
24
25#[derive(Debug, Clone)]
28pub struct CronSchedule {
29 expr: Box<str>,
31 cron: Cron,
33}
34
35impl CronSchedule {
36 pub fn parse(expr: &str) -> ClResult<Self> {
38 let cron = Cron::from_str(expr)
39 .map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
40 Ok(Self { expr: expr.into(), cron })
41 }
42
43 pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
48 let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
49
50 self.cron
51 .find_next_occurrence(&dt, false)
52 .map(|next| Timestamp(next.timestamp()))
53 .map_err(|e| {
54 tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
55 Error::ValidationError(format!("cron next_execution failed: {}", e))
56 })
57 }
58
59 pub fn to_cron_string(&self) -> String {
61 self.expr.to_string()
62 }
63}
64
65impl PartialEq for CronSchedule {
66 fn eq(&self, other: &Self) -> bool {
67 self.expr == other.expr
68 }
69}
70
71impl Eq for CronSchedule {}
72
73#[async_trait]
74pub trait Task<S: Clone>: Send + Sync + Debug {
75 fn kind() -> &'static str
76 where
77 Self: Sized;
78 fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
79 where
80 Self: Sized;
81 fn serialize(&self) -> String;
82 async fn run(&self, state: &S) -> ClResult<()>;
83
84 fn kind_of(&self) -> &'static str;
85}
86
87#[derive(Debug)]
88pub enum TaskStatus {
89 Pending,
90 Completed,
91 Failed,
92}
93
94pub struct TaskData {
95 id: TaskId,
96 kind: Box<str>,
97 status: TaskStatus,
98 input: Box<str>,
99 deps: Box<[TaskId]>,
100 retry_data: Option<Box<str>>,
101 cron_data: Option<Box<str>>,
102 next_at: Option<Timestamp>,
103}
104
105#[async_trait]
106pub trait TaskStore<S: Clone>: Send + Sync {
107 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
108 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
109 async fn load(&self) -> ClResult<Vec<TaskData>>;
110 async fn update_task_error(
111 &self,
112 task_id: TaskId,
113 output: &str,
114 next_at: Option<Timestamp>,
115 ) -> ClResult<()>;
116 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
117 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
118}
119
120pub struct InMemoryTaskStore {
123 last_id: Mutex<TaskId>,
124}
125
126impl InMemoryTaskStore {
127 pub fn new() -> Arc<Self> {
128 Arc::new(Self { last_id: Mutex::new(0) })
129 }
130}
131
132#[async_trait]
133impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
134 async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
135 let mut last_id = lock!(self.last_id)?;
136 *last_id += 1;
137 Ok(*last_id)
138 }
139
140 async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
141 Ok(())
142 }
143
144 async fn load(&self) -> ClResult<Vec<TaskData>> {
145 Ok(vec![])
146 }
147
148 async fn update_task_error(
149 &self,
150 _task_id: TaskId,
151 _output: &str,
152 _next_at: Option<Timestamp>,
153 ) -> ClResult<()> {
154 Ok(())
155 }
156
157 async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
158 Ok(None)
160 }
161
162 async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
163 Ok(())
165 }
166}
167
168pub struct MetaAdapterTaskStore {
171 meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
172}
173
174impl MetaAdapterTaskStore {
175 pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
176 Arc::new(Self { meta_adapter })
177 }
178}
179
180#[async_trait]
181impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
182 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
183 let id = self
184 .meta_adapter
185 .create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
186 .await?;
187
188 if let Some(cron) = &task.cron {
190 self.meta_adapter
191 .update_task(
192 id,
193 &meta_adapter::TaskPatch {
194 cron: Patch::Value(cron.to_cron_string()),
195 ..Default::default()
196 },
197 )
198 .await?;
199 }
200
201 Ok(id)
202 }
203
204 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
205 self.meta_adapter.update_task_finished(id, output).await
206 }
207
208 async fn load(&self) -> ClResult<Vec<TaskData>> {
209 let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
210 let tasks = tasks
211 .into_iter()
212 .map(|t| TaskData {
213 id: t.task_id,
214 kind: t.kind,
215 status: match t.status {
216 'P' => TaskStatus::Pending,
217 'F' => TaskStatus::Completed,
218 _ => TaskStatus::Failed,
220 },
221 input: t.input,
222 deps: t.deps,
223 retry_data: t.retry,
224 cron_data: t.cron,
225 next_at: t.next_at,
226 })
227 .collect();
228 Ok(tasks)
229 }
230
231 async fn update_task_error(
232 &self,
233 task_id: TaskId,
234 output: &str,
235 next_at: Option<Timestamp>,
236 ) -> ClResult<()> {
237 self.meta_adapter.update_task_error(task_id, output, next_at).await
238 }
239
240 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
241 let task_opt = self.meta_adapter.find_task_by_key(key).await?;
242
243 match task_opt {
244 Some(t) => Ok(Some((
245 t.task_id,
246 TaskData {
247 id: t.task_id,
248 kind: t.kind,
249 status: match t.status {
250 'P' => TaskStatus::Pending,
251 'F' => TaskStatus::Completed,
252 _ => TaskStatus::Failed,
254 },
255 input: t.input,
256 deps: t.deps,
257 retry_data: t.retry,
258 cron_data: t.cron,
259 next_at: t.next_at,
260 },
261 ))),
262 None => Ok(None),
263 }
264 }
265
266 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
267 use cloudillo_types::types::Patch;
268
269 let mut patch = meta_adapter::TaskPatch {
271 input: Patch::Value(task.task.serialize()),
272 next_at: match task.next_at {
273 Some(ts) => Patch::Value(ts),
274 None => Patch::Null,
275 },
276 ..Default::default()
277 };
278
279 if !task.deps.is_empty() {
281 patch.deps = Patch::Value(task.deps.clone());
282 }
283
284 if let Some(ref retry) = task.retry {
286 let retry_str = format!(
287 "{},{},{},{}",
288 task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
289 );
290 patch.retry = Patch::Value(retry_str);
291 }
292
293 if let Some(ref cron) = task.cron {
295 patch.cron = Patch::Value(cron.to_cron_string());
296 }
297
298 self.meta_adapter.update_task(id, &patch).await
299 }
300}
301
302type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
304
305#[derive(Debug, Clone)]
306pub struct RetryPolicy {
307 wait_min_max: (u64, u64),
308 times: u16,
309}
310
311impl Default for RetryPolicy {
312 fn default() -> Self {
313 Self { wait_min_max: (60, 3600), times: 10 }
314 }
315}
316
317impl RetryPolicy {
318 pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
320 Self { wait_min_max, times }
321 }
322
323 pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
325 let (min, max) = self.wait_min_max;
326 let backoff = min * (1u64 << u64::from(attempt_count));
327 backoff.min(max)
328 }
329
330 pub fn should_retry(&self, attempt_count: u16) -> bool {
332 attempt_count < self.times
333 }
334}
335
336pub struct TaskSchedulerBuilder<'a, S: Clone> {
339 scheduler: &'a Scheduler<S>,
340 task: Arc<dyn Task<S>>,
341 key: Option<String>,
342 next_at: Option<Timestamp>,
343 deps: Vec<TaskId>,
344 retry: Option<RetryPolicy>,
345 cron: Option<CronSchedule>,
346}
347
348impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
349 fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
351 Self {
352 scheduler,
353 task,
354 key: None,
355 next_at: None,
356 deps: Vec::new(),
357 retry: None,
358 cron: None,
359 }
360 }
361
362 pub fn key(mut self, key: impl Into<String>) -> Self {
364 self.key = Some(key.into());
365 self
366 }
367
368 pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
370 self.next_at = Some(timestamp);
371 self
372 }
373
374 pub fn schedule_after(mut self, seconds: i64) -> Self {
376 self.next_at = Some(Timestamp::from_now(seconds));
377 self
378 }
379
380 pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
382 self.deps = deps;
383 self
384 }
385
386 pub fn depends_on(mut self, dep: TaskId) -> Self {
388 self.deps.push(dep);
389 self
390 }
391
392 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
394 self.retry = Some(policy);
395 self
396 }
397
398 pub fn cron(mut self, expr: impl Into<String>) -> Self {
403 if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
404 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
407 self.cron = Some(cron_schedule);
408 }
409 self
410 }
411
412 pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
415 if hour <= 23 && minute <= 59 {
416 let expr = format!("{} {} * * *", minute, hour);
417 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
418 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
421 self.cron = Some(cron_schedule);
422 }
423 }
424 self
425 }
426
427 pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
431 if weekday <= 6 && hour <= 23 && minute <= 59 {
432 let expr = format!("{} {} * * {}", minute, hour, weekday);
433 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
434 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
437 self.cron = Some(cron_schedule);
438 }
439 }
440 self
441 }
442
443 pub async fn now(self) -> ClResult<TaskId> {
445 self.schedule().await
446 }
447
448 pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
450 self.next_at = Some(ts);
451 self.schedule().await
452 }
453
454 pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
456 self.next_at = Some(Timestamp::from_now(seconds));
457 self.schedule().await
458 }
459
460 pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
462 self.deps.push(dep);
463 self.schedule().await
464 }
465
466 pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
468 self.retry = Some(RetryPolicy::default());
469 self.schedule().await
470 }
471
472 pub async fn schedule(self) -> ClResult<TaskId> {
474 self.scheduler
475 .schedule_task_impl(
476 self.task,
477 self.key.as_deref(),
478 self.next_at,
479 if self.deps.is_empty() { None } else { Some(self.deps) },
480 self.retry,
481 self.cron,
482 )
483 .await
484 }
485}
486
487#[derive(Debug, Clone)]
488pub struct TaskMeta<S: Clone> {
489 pub task: Arc<dyn Task<S>>,
490 pub next_at: Option<Timestamp>,
491 pub deps: Vec<TaskId>,
492 retry_count: u16,
493 pub retry: Option<RetryPolicy>,
494 pub cron: Option<CronSchedule>,
495}
496
497type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
498type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
499
500#[derive(Clone)]
502pub struct Scheduler<S: Clone> {
503 task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
504 store: Arc<dyn TaskStore<S>>,
505 tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
506 tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
507 task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
508 tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
509 tx_finish: flume::Sender<TaskId>,
510 rx_finish: flume::Receiver<TaskId>,
511 notify_schedule: Arc<tokio::sync::Notify>,
512}
513
514impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
515 pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
516 let (tx_finish, rx_finish) = flume::unbounded();
517
518 let scheduler = Self {
519 task_builders: Arc::new(RwLock::new(HashMap::new())),
520 store,
521 tasks_running: Arc::new(Mutex::new(HashMap::new())),
522 tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
523 task_dependents: Arc::new(Mutex::new(HashMap::new())),
524 tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
525 tx_finish,
526 rx_finish,
527 notify_schedule: Arc::new(tokio::sync::Notify::new()),
528 };
529
530 Arc::new(scheduler)
533 }
534
535 pub fn start(&self, state: S) {
536 let schedule = self.clone();
538 let stat = state.clone();
539 let rx_finish = self.rx_finish.clone();
540
541 tokio::spawn(async move {
542 while let Ok(id) = rx_finish.recv_async().await {
543 debug!("Completed task {} (notified)", id);
544
545 let task_meta_opt = {
547 let tasks_running = match schedule.tasks_running.lock() {
548 Ok(guard) => guard,
549 Err(poisoned) => {
550 error!("Mutex poisoned: tasks_running (recovering)");
551 poisoned.into_inner()
552 }
553 };
554 tasks_running.get(&id).cloned()
555 };
556
557 if let Some(task_meta) = task_meta_opt {
558 let mut transition_ok = false;
560
561 if let Some(ref cron) = task_meta.cron {
563 let next_at = match cron.next_execution(Timestamp::now()) {
565 Ok(ts) => ts,
566 Err(e) => {
567 error!(
568 "Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
569 id, e
570 );
571 if let Err(e) = schedule.store.finished(id, "").await {
573 error!("Failed to mark task {} as finished: {}", id, e);
574 }
575 continue;
576 }
577 };
578 info!(
579 "Recurring task {} completed, scheduling next execution at {}",
580 id, next_at
581 );
582
583 let mut updated_meta = task_meta.clone();
585 updated_meta.next_at = Some(next_at);
586
587 if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
589 error!("Failed to update recurring task {} next_at: {}", id, e);
590 }
591
592 match schedule.tasks_running.lock() {
594 Ok(mut tasks_running) => {
595 tasks_running.remove(&id);
596 }
597 Err(poisoned) => {
598 error!("Mutex poisoned: tasks_running (recovering)");
599 poisoned.into_inner().remove(&id);
600 }
601 }
602
603 match schedule.add_queue(id, updated_meta).await {
605 Ok(_) => transition_ok = true,
606 Err(e) => {
607 error!(
608 "Failed to reschedule recurring task {}: {} - task lost!",
609 id, e
610 );
611 }
612 }
613 } else {
614 match schedule.store.finished(id, "").await {
616 Ok(()) => transition_ok = true,
617 Err(e) => {
618 error!(
619 "Failed to mark task {} as finished: {} - task remains in running queue",
620 id, e
621 );
622 }
623 }
624 }
625
626 if transition_ok {
628 match schedule.tasks_running.lock() {
629 Ok(mut tasks_running) => {
630 tasks_running.remove(&id);
631 }
632 Err(poisoned) => {
633 error!("Mutex poisoned: tasks_running (recovering)");
634 poisoned.into_inner().remove(&id);
635 }
636 }
637 }
638
639 match schedule.release_dependents(id) {
641 Ok(ready_to_spawn) => {
642 for (dep_id, dep_task_meta) in ready_to_spawn {
643 match schedule.tasks_running.lock() {
645 Ok(mut tasks_running) => {
646 tasks_running.insert(dep_id, dep_task_meta.clone());
647 }
648 Err(poisoned) => {
649 error!("Mutex poisoned: tasks_running (recovering)");
650 poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
651 }
652 }
653 schedule.spawn_task(
654 stat.clone(),
655 dep_task_meta.task.clone(),
656 dep_id,
657 dep_task_meta,
658 );
659 }
660 }
661 Err(e) => {
662 error!("Failed to release dependents of task {}: {}", id, e);
663 }
664 }
665 } else {
666 warn!("Completed task {} not found in running queue", id);
667 }
668 }
669 });
670
671 let schedule = self.clone();
673 tokio::spawn(async move {
674 loop {
675 let is_empty = match schedule.tasks_scheduled.lock() {
676 Ok(guard) => guard.is_empty(),
677 Err(poisoned) => {
678 error!("Mutex poisoned: tasks_scheduled (recovering)");
679 poisoned.into_inner().is_empty()
680 }
681 };
682 if is_empty {
683 schedule.notify_schedule.notified().await;
684 }
685 let time = Timestamp::now();
686 if let Some((timestamp, _id)) = loop {
687 let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
688 Ok(guard) => guard,
689 Err(poisoned) => {
690 error!("Mutex poisoned: tasks_scheduled (recovering)");
691 poisoned.into_inner()
692 }
693 };
694 if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
695 let (timestamp, id) = (timestamp, id);
696 if timestamp <= Timestamp::now() {
697 debug!("Spawning task id {} (from schedule)", id);
698 if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
699 let mut tasks_running = match schedule.tasks_running.lock() {
700 Ok(guard) => guard,
701 Err(poisoned) => {
702 error!("Mutex poisoned: tasks_running (recovering)");
703 poisoned.into_inner()
704 }
705 };
706 tasks_running.insert(id, task.clone());
707 schedule.spawn_task(state.clone(), task.task.clone(), id, task);
708 } else {
709 error!("Task disappeared while being removed from schedule");
710 break None;
711 }
712 } else {
713 break Some((timestamp, id));
714 }
715 } else {
716 break None;
717 }
718 } {
719 let diff = timestamp.0 - time.0;
720 let wait =
721 tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
722 tokio::select! {
723 () = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
724 };
725 }
726 }
727 });
728
729 let schedule = self.clone();
730 tokio::spawn(async move {
731 let _ignore_err = schedule.load().await;
732 });
733 }
734
735 fn register_builder(
736 &self,
737 name: &'static str,
738 builder: &'static TaskBuilder<S>,
739 ) -> ClResult<&Self> {
740 let mut task_builders = self
741 .task_builders
742 .write()
743 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
744 task_builders.insert(name, Box::new(builder));
745 Ok(self)
746 }
747
748 pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
749 info!("Registering task type {}", T::kind());
750 self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
751 Ok(self)
752 }
753
754 pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
756 TaskSchedulerBuilder::new(self, task)
757 }
758
759 async fn schedule_task_impl(
762 &self,
763 task: Arc<dyn Task<S>>,
764 key: Option<&str>,
765 next_at: Option<Timestamp>,
766 deps: Option<Vec<TaskId>>,
767 retry: Option<RetryPolicy>,
768 cron: Option<CronSchedule>,
769 ) -> ClResult<TaskId> {
770 let task_meta = TaskMeta {
771 task: task.clone(),
772 next_at,
773 deps: deps.clone().unwrap_or_default(),
774 retry_count: 0,
775 retry,
776 cron,
777 };
778
779 if let Some(key) = key {
781 if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
782 let new_serialized = task.serialize();
783 let existing_serialized = existing_data.input.as_ref();
784
785 if new_serialized == existing_serialized {
787 info!(
788 "Recurring task '{}' already exists with identical parameters (id={})",
789 key, existing_id
790 );
791 self.store.update_task(existing_id, &task_meta).await?;
793 self.add_queue(existing_id, task_meta).await?;
795 return Ok(existing_id);
796 }
797 info!(
798 "Updating recurring task '{}' (id={}) - parameters changed",
799 key, existing_id
800 );
801 info!(" Old params: {}", existing_serialized);
802 info!(" New params: {}", new_serialized);
803
804 self.remove_from_queues(existing_id)?;
806
807 self.store.update_task(existing_id, &task_meta).await?;
809
810 self.add_queue(existing_id, task_meta).await?;
812
813 return Ok(existing_id);
814 }
815 }
816
817 let id = self.store.add(&task_meta, key).await?;
819 self.add_queue(id, task_meta).await
820 }
821
822 pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
823 self.task(task).now().await
824 }
825
826 pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
827 {
830 let mut running = lock!(self.tasks_running, "tasks_running")?;
831 if let Some(existing_meta) = running.get_mut(&id) {
832 debug!(
833 "Task {} is already running, updating metadata (will reschedule on completion)",
834 id
835 );
836 *existing_meta = task_meta;
838 return Ok(id);
839 }
840 }
841
842 {
844 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
845 if let Some(key) = scheduled
846 .iter()
847 .find(|((_, tid), _)| *tid == id)
848 .map(|((ts, tid), _)| (*ts, *tid))
849 {
850 scheduled.remove(&key);
851 debug!("Removed existing scheduled entry for task {} before re-queueing", id);
852 }
853 }
854 {
855 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
856 if waiting.remove(&id).is_some() {
857 debug!("Removed existing waiting entry for task {} before re-queueing", id);
858 }
859 }
860
861 let deps = task_meta.deps.clone();
862
863 if !deps.is_empty() && task_meta.next_at.is_some() {
865 warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
866 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
868 debug!("Task {} is waiting for {:?}", id, &deps);
869 for dep in deps {
870 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
871 }
872 return Ok(id);
873 }
874
875 if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
876 debug!("Spawning task {}", id);
877 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
878 self.notify_schedule.notify_one();
879 } else if let Some(next_at) = task_meta.next_at {
880 debug!("Scheduling task {} for {}", id, next_at);
881 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
882 self.notify_schedule.notify_one();
883 } else {
884 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
885 debug!("Task {} is waiting for {:?}", id, &deps);
886 for dep in deps {
887 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
888 }
889 }
890 Ok(id)
891 }
892
893 fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
896 if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
898 debug!("Removed task {} from waiting queue for update", task_id);
899 return Ok(Some(task_meta));
900 }
901
902 {
904 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
905 if let Some(key) = scheduled
906 .iter()
907 .find(|((_, id), _)| *id == task_id)
908 .map(|((ts, id), _)| (*ts, *id))
909 {
910 if let Some(task_meta) = scheduled.remove(&key) {
911 debug!("Removed task {} from scheduled queue for update", task_id);
912 return Ok(Some(task_meta));
913 }
914 }
915 }
916
917 if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
919 warn!("Removed task {} from running queue during update", task_id);
920 return Ok(Some(task_meta));
921 }
922
923 Ok(None)
924 }
925
926 fn release_dependents(
929 &self,
930 completed_task_id: TaskId,
931 ) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
932 let dependents = {
934 let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
935 deps_map.remove(&completed_task_id).unwrap_or_default()
936 };
937
938 if dependents.is_empty() {
939 return Ok(Vec::new()); }
941
942 debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
943
944 let mut ready_to_spawn = Vec::new();
945
946 for dependent_id in dependents {
948 {
950 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
951 if let Some(task_meta) = waiting.get_mut(&dependent_id) {
952 task_meta.deps.retain(|x| *x != completed_task_id);
954
955 if task_meta.deps.is_empty() {
957 if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
958 debug!(
959 "Dependent task {} ready to spawn (all dependencies cleared)",
960 dependent_id
961 );
962 ready_to_spawn.push((dependent_id, task_to_spawn));
963 }
964 } else {
965 debug!(
966 "Task {} still has {} remaining dependencies",
967 dependent_id,
968 task_meta.deps.len()
969 );
970 }
971 continue;
972 }
973 }
974
975 {
977 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
978 if let Some(scheduled_key) = scheduled
979 .iter()
980 .find(|((_, id), _)| *id == dependent_id)
981 .map(|((ts, id), _)| (*ts, *id))
982 {
983 if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
984 task_meta.deps.retain(|x| *x != completed_task_id);
985 let remaining = task_meta.deps.len();
986 if remaining == 0 {
987 debug!(
988 "Task {} in scheduled queue has no remaining dependencies",
989 dependent_id
990 );
991 } else {
992 debug!(
993 "Task {} in scheduled queue has {} remaining dependencies",
994 dependent_id, remaining
995 );
996 }
997 }
998 continue;
999 }
1000 }
1001
1002 warn!(
1004 "Dependent task {} of completed task {} not found in any queue",
1005 dependent_id, completed_task_id
1006 );
1007 }
1008
1009 Ok(ready_to_spawn)
1010 }
1011
1012 async fn load(&self) -> ClResult<()> {
1013 let tasks = self.store.load().await?;
1014 debug!("Loaded {} tasks from store", tasks.len());
1015 for t in tasks {
1016 if let TaskStatus::Pending = t.status {
1017 debug!("Loading task {} {}", t.id, t.kind);
1018 let task = {
1019 let builder_map = self
1020 .task_builders
1021 .read()
1022 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1023 let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1024 format!("task builder not registered: {}", t.kind),
1025 ))?;
1026 builder(t.id, &t.input)?
1027 };
1028 let (retry_count, retry) = match t.retry_data {
1029 Some(retry_str) => {
1030 let (retry_count, retry_min, retry_max, retry_times) = retry_str
1031 .split(',')
1032 .collect_tuple()
1033 .ok_or(Error::Internal("invalid retry policy format".into()))?;
1034 let retry_count: u16 = retry_count
1035 .parse()
1036 .map_err(|_| Error::Internal("retry count must be u16".into()))?;
1037 let retry = RetryPolicy {
1038 wait_min_max: (
1039 retry_min
1040 .parse()
1041 .map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1042 retry_max
1043 .parse()
1044 .map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1045 ),
1046 times: retry_times
1047 .parse()
1048 .map_err(|_| Error::Internal("retry times must be u64".into()))?,
1049 };
1050 debug!("Loaded retry policy: {:?}", retry);
1051 (retry_count, Some(retry))
1052 }
1053 _ => (0, None),
1054 };
1055 let cron =
1057 t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1058
1059 let task_meta = TaskMeta {
1060 task,
1061 next_at: t.next_at,
1062 deps: t.deps.into(),
1063 retry_count,
1064 retry,
1065 cron,
1066 };
1067 self.add_queue(t.id, task_meta).await?;
1068 }
1069 }
1070 Ok(())
1071 }
1072
1073 fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1074 let tx_finish = self.tx_finish.clone();
1075 let store = self.store.clone();
1076 let scheduler = self.clone();
1077 tokio::spawn(async move {
1079 match task.run(&state).await {
1080 Ok(()) => {
1081 debug!("Task {} completed successfully", id);
1082 tx_finish.send(id).unwrap_or(());
1083 }
1084 Err(e) => {
1085 if let Some(retry_policy) = &task_meta.retry {
1086 if retry_policy.should_retry(task_meta.retry_count) {
1087 let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1088 let next_at = Timestamp::from_now(backoff.cast_signed());
1089
1090 info!(
1091 "Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1092 id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1093 );
1094
1095 store
1097 .update_task_error(id, &e.to_string(), Some(next_at))
1098 .await
1099 .unwrap_or(());
1100
1101 match scheduler.tasks_running.lock() {
1103 Ok(mut tasks_running) => {
1104 tasks_running.remove(&id);
1105 }
1106 Err(poisoned) => {
1107 error!("Mutex poisoned: tasks_running (recovering)");
1108 poisoned.into_inner().remove(&id);
1109 }
1110 }
1111
1112 let mut retry_meta = task_meta.clone();
1114 retry_meta.retry_count += 1;
1115 retry_meta.next_at = Some(next_at);
1116 scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1117 } else {
1118 error!(
1120 "Task {} failed after {} retries: {}",
1121 id, task_meta.retry_count, e
1122 );
1123 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1124 tx_finish.send(id).unwrap_or(());
1125 }
1126 } else {
1127 error!("Task {} failed: {}", id, e);
1129 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1130 tx_finish.send(id).unwrap_or(());
1131 }
1132 }
1133 }
1134 });
1135 }
1136
1137 pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1140 let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1141 let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1142 let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1143 let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1144
1145 let mut stuck_tasks = Vec::new();
1147 let mut tasks_with_missing_deps = Vec::new();
1148
1149 {
1151 let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1152 let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1153
1154 for (id, task_meta) in waiting.iter() {
1155 if task_meta.deps.is_empty() {
1156 stuck_tasks.push(*id);
1157 warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1158 } else {
1159 for dep in &task_meta.deps {
1161 let dep_exists =
1163 self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1164 || self
1165 .tasks_waiting
1166 .lock()
1167 .ok()
1168 .is_some_and(|w| w.contains_key(dep))
1169 || self.tasks_scheduled.lock().ok().is_some_and(|s| {
1170 s.iter().any(|((_, task_id), _)| task_id == dep)
1171 });
1172
1173 if !dep_exists {
1174 tasks_with_missing_deps.push((*id, *dep));
1175 warn!(
1176 "SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1177 id, dep
1178 );
1179 }
1180 }
1181 }
1182 }
1183 }
1184
1185 Ok(SchedulerHealth {
1186 waiting: waiting_count,
1187 scheduled: scheduled_count,
1188 running: running_count,
1189 dependents: dependents_count,
1190 stuck_tasks,
1191 tasks_with_missing_deps,
1192 })
1193 }
1194}
1195
1196#[derive(Debug, Clone)]
1198pub struct SchedulerHealth {
1199 pub waiting: usize,
1201 pub scheduled: usize,
1203 pub running: usize,
1205 pub dependents: usize,
1207 pub stuck_tasks: Vec<TaskId>,
1209 pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1211}
1212
1213#[cfg(test)]
1214mod tests {
1215 use super::*;
1216 use serde::{Deserialize, Serialize};
1217
1218 type State = Arc<Mutex<Vec<u8>>>;
1219
1220 #[derive(Debug, Serialize, Deserialize)]
1221 struct TestTask {
1222 num: u8,
1223 }
1224
1225 impl TestTask {
1226 pub fn new(num: u8) -> Arc<Self> {
1227 Arc::new(Self { num })
1228 }
1229 }
1230
1231 #[async_trait]
1232 impl Task<State> for TestTask {
1233 fn kind() -> &'static str {
1234 "test"
1235 }
1236
1237 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1238 let num: u8 = ctx
1239 .parse()
1240 .map_err(|_| Error::Internal("test task context must be u8".into()))?;
1241 let task = TestTask::new(num);
1242 Ok(task)
1243 }
1244
1245 fn serialize(&self) -> String {
1246 self.num.to_string()
1247 }
1248
1249 fn kind_of(&self) -> &'static str {
1250 "test"
1251 }
1252
1253 async fn run(&self, state: &State) -> ClResult<()> {
1254 info!("Running task {}", self.num);
1255 tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1256 info!("Completed task {}", self.num);
1257 state.lock().unwrap().push(self.num);
1258 Ok(())
1259 }
1260 }
1261
1262 #[derive(Debug, Clone)]
1263 struct FailingTask {
1264 id: u8,
1265 fail_count: u8,
1266 attempt: Arc<Mutex<u8>>,
1267 }
1268
1269 impl FailingTask {
1270 pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1271 Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1272 }
1273 }
1274
1275 #[async_trait]
1276 impl Task<State> for FailingTask {
1277 fn kind() -> &'static str {
1278 "failing"
1279 }
1280
1281 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1282 let parts: Vec<&str> = ctx.split(',').collect();
1283 if parts.len() != 2 {
1284 return Err(Error::Internal("failing task context must have 2 parts".into()));
1285 }
1286 let id: u8 = parts[0]
1287 .parse()
1288 .map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1289 let fail_count: u8 = parts[1]
1290 .parse()
1291 .map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1292 Ok(FailingTask::new(id, fail_count))
1293 }
1294
1295 fn serialize(&self) -> String {
1296 format!("{},{}", self.id, self.fail_count)
1297 }
1298
1299 fn kind_of(&self) -> &'static str {
1300 "failing"
1301 }
1302
1303 async fn run(&self, state: &State) -> ClResult<()> {
1304 let mut attempt = self.attempt.lock().unwrap();
1305 *attempt += 1;
1306 let current_attempt = *attempt;
1307
1308 info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1309
1310 if current_attempt <= self.fail_count {
1311 error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1312 return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1313 }
1314
1315 info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1316 state.lock().unwrap().push(self.id);
1317 Ok(())
1318 }
1319 }
1320
1321 #[tokio::test]
1322 pub async fn test_scheduler() {
1323 let _ = tracing_subscriber::fmt().try_init();
1324
1325 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1326 let state: State = Arc::new(Mutex::new(Vec::new()));
1327 let scheduler = Scheduler::new(task_store);
1328 scheduler.start(state.clone());
1329 scheduler.register::<TestTask>().unwrap();
1330
1331 let _task1 = TestTask::new(1);
1332 let task2 = TestTask::new(1);
1333 let task3 = TestTask::new(1);
1334
1335 let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1336 let task3_id = scheduler.add(task3).await.unwrap();
1337 scheduler
1338 .task(TestTask::new(1))
1339 .depend_on(vec![task2_id, task3_id])
1340 .schedule()
1341 .await
1342 .unwrap();
1343
1344 tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1345 let task4 = TestTask::new(1);
1346 let task5 = TestTask::new(1);
1347 scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1348 scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1349
1350 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1351
1352 let st = state.lock().unwrap();
1353 info!("res: {}", st.len());
1354 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1355 assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1356 }
1357
1358 #[tokio::test]
1359 pub async fn test_retry_with_backoff() {
1360 let _ = tracing_subscriber::fmt().try_init();
1361
1362 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1363 let state: State = Arc::new(Mutex::new(Vec::new()));
1364 let scheduler = Scheduler::new(task_store);
1365 scheduler.start(state.clone());
1366 scheduler.register::<FailingTask>().unwrap();
1367
1368 let failing_task = FailingTask::new(42, 2);
1371 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1372
1373 scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1374
1375 tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1382
1383 let st = state.lock().unwrap();
1384 assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1385 assert_eq!(st[0], 42);
1386 }
1387
1388 #[tokio::test]
1391 pub async fn test_builder_simple_schedule() {
1392 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1393 let state: State = Arc::new(Mutex::new(Vec::new()));
1394 let scheduler = Scheduler::new(task_store);
1395 scheduler.start(state.clone());
1396 scheduler.register::<TestTask>().unwrap();
1397
1398 let task = TestTask::new(1);
1400 let id = scheduler.task(task).now().await.unwrap();
1401
1402 assert!(id > 0, "Task ID should be positive");
1403
1404 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1405
1406 let st = state.lock().unwrap();
1407 assert_eq!(st.len(), 1, "Task should have executed");
1408 assert_eq!(st[0], 1);
1409 }
1410
1411 #[tokio::test]
1412 pub async fn test_builder_with_key() {
1413 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1414 let state: State = Arc::new(Mutex::new(Vec::new()));
1415 let scheduler = Scheduler::new(task_store);
1416 scheduler.start(state.clone());
1417 scheduler.register::<TestTask>().unwrap();
1418
1419 let task = TestTask::new(1);
1421 let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1422
1423 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1424
1425 let st = state.lock().unwrap();
1426 assert_eq!(st.len(), 1);
1427 assert_eq!(st[0], 1);
1428 }
1429
1430 #[tokio::test]
1431 pub async fn test_builder_with_delay() {
1432 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1433 let state: State = Arc::new(Mutex::new(Vec::new()));
1434 let scheduler = Scheduler::new(task_store);
1435 scheduler.start(state.clone());
1436 scheduler.register::<TestTask>().unwrap();
1437
1438 let task = TestTask::new(1);
1440 let _id = scheduler
1441 .task(task)
1442 .after(1) .await
1444 .unwrap();
1445
1446 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1448 {
1449 let st = state.lock().unwrap();
1450 assert_eq!(st.len(), 0, "Task should not execute yet");
1451 }
1452
1453 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1455
1456 {
1457 let st = state.lock().unwrap();
1458 assert_eq!(st.len(), 1, "Task should have executed");
1459 assert_eq!(st[0], 1);
1460 }
1461 }
1462
1463 #[tokio::test]
1464 pub async fn test_builder_with_dependencies() {
1465 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1466 let state: State = Arc::new(Mutex::new(Vec::new()));
1467 let scheduler = Scheduler::new(task_store);
1468 scheduler.start(state.clone());
1469 scheduler.register::<TestTask>().unwrap();
1470
1471 let task1 = TestTask::new(1);
1473 let id1 = scheduler.task(task1).now().await.unwrap();
1474
1475 let task2 = TestTask::new(1);
1477 let id2 = scheduler.task(task2).now().await.unwrap();
1478
1479 let task3 = TestTask::new(1);
1481 let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1482
1483 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1485
1486 let st = state.lock().unwrap();
1487 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1489 assert_eq!(str_vec.join(":"), "1:1:1");
1490 }
1491
1492 #[tokio::test]
1493 pub async fn test_builder_with_retry() {
1494 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1495 let state: State = Arc::new(Mutex::new(Vec::new()));
1496 let scheduler = Scheduler::new(task_store);
1497 scheduler.start(state.clone());
1498 scheduler.register::<FailingTask>().unwrap();
1499
1500 let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1503
1504 let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1505
1506 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1508
1509 let st = state.lock().unwrap();
1510 assert_eq!(st.len(), 1);
1511 assert_eq!(st[0], 55);
1512 }
1513
1514 #[tokio::test]
1515 pub async fn test_builder_with_automatic_retry() {
1516 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1517 let state: State = Arc::new(Mutex::new(Vec::new()));
1518 let scheduler = Scheduler::new(task_store);
1519 scheduler.start(state.clone());
1520 scheduler.register::<FailingTask>().unwrap();
1521
1522 let failing_task = FailingTask::new(66, 1);
1524 let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1525
1526 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1529
1530 let st = state.lock().unwrap();
1532 let _ = st.len(); }
1536
1537 #[tokio::test]
1538 pub async fn test_builder_fluent_chaining() {
1539 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1540 let state: State = Arc::new(Mutex::new(Vec::new()));
1541 let scheduler = Scheduler::new(task_store);
1542 scheduler.start(state.clone());
1543 scheduler.register::<TestTask>().unwrap();
1544
1545 let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1547 let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1548
1549 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1551
1552 let task = TestTask::new(1);
1553 let _id = scheduler
1554 .task(task)
1555 .key("complex-task")
1556 .schedule_after(0) .depend_on(vec![dep1, dep2])
1558 .with_retry(retry_policy)
1559 .schedule()
1560 .await
1561 .unwrap();
1562
1563 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1564
1565 let st = state.lock().unwrap();
1566 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1568 assert_eq!(str_vec.join(":"), "1:1:1");
1569 }
1570
1571 #[tokio::test]
1572 pub async fn test_builder_backward_compatibility() {
1573 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1574 let state: State = Arc::new(Mutex::new(Vec::new()));
1575 let scheduler = Scheduler::new(task_store);
1576 scheduler.start(state.clone());
1577 scheduler.register::<TestTask>().unwrap();
1578
1579 let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1581
1582 let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1584
1585 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1586
1587 let st = state.lock().unwrap();
1588 assert_eq!(st.len(), 2);
1590 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1591 assert_eq!(str_vec.join(":"), "1:1");
1592 }
1593
1594 #[tokio::test]
1597 pub async fn test_builder_pipeline_scenario() {
1598 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1600 let state: State = Arc::new(Mutex::new(Vec::new()));
1601 let scheduler = Scheduler::new(task_store);
1602 scheduler.start(state.clone());
1603 scheduler.register::<TestTask>().unwrap();
1604
1605 let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1607
1608 let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1610
1611 let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1613
1614 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1616
1617 let st = state.lock().unwrap();
1618 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1620 assert_eq!(str_vec.join(":"), "1:1:1");
1621 }
1622
1623 #[tokio::test]
1624 pub async fn test_builder_multi_dependency_join() {
1625 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1627 let state: State = Arc::new(Mutex::new(Vec::new()));
1628 let scheduler = Scheduler::new(task_store);
1629 scheduler.start(state.clone());
1630 scheduler.register::<TestTask>().unwrap();
1631
1632 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1634 let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1635
1636 let _id3 = scheduler
1638 .task(TestTask::new(1))
1639 .depend_on(vec![id1, id2])
1640 .schedule()
1641 .await
1642 .unwrap();
1643
1644 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1645
1646 let st = state.lock().unwrap();
1647 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1649 assert_eq!(str_vec.join(":"), "1:1:1");
1650 }
1651
1652 #[tokio::test]
1653 pub async fn test_builder_scheduled_task_with_dependencies() {
1654 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1656 let state: State = Arc::new(Mutex::new(Vec::new()));
1657 let scheduler = Scheduler::new(task_store);
1658 scheduler.start(state.clone());
1659 scheduler.register::<TestTask>().unwrap();
1660
1661 let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1663
1664 let ts = Timestamp::from_now(1);
1666 let _task_id = scheduler
1667 .task(TestTask::new(1))
1668 .schedule_at(ts)
1669 .depend_on(vec![dep_id])
1670 .schedule()
1671 .await
1672 .unwrap();
1673
1674 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1676 {
1677 let st = state.lock().unwrap();
1678 assert_eq!(st.len(), 1); }
1680
1681 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1683
1684 {
1685 let st = state.lock().unwrap();
1686 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1687 assert_eq!(str_vec.join(":"), "1:1");
1688 }
1689 }
1690
1691 #[tokio::test]
1692 pub async fn test_builder_mixed_features() {
1693 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1695 let state: State = Arc::new(Mutex::new(Vec::new()));
1696 let scheduler = Scheduler::new(task_store);
1697 scheduler.start(state.clone());
1698 scheduler.register::<TestTask>().unwrap();
1699 scheduler.register::<FailingTask>().unwrap();
1700
1701 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1703
1704 let _id2 = scheduler
1706 .task(TestTask::new(1))
1707 .key("critical-task")
1708 .schedule_after(0)
1709 .depend_on(vec![id1])
1710 .schedule()
1711 .await
1712 .unwrap();
1713
1714 let _id3 = scheduler
1716 .task(FailingTask::new(1, 0)) .key("retryable-task")
1718 .with_retry(RetryPolicy {
1719 wait_min_max: (1, 3600),
1720 times: 3,
1721 })
1722 .schedule()
1723 .await
1724 .unwrap();
1725
1726 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1728
1729 let st = state.lock().unwrap();
1730 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1732 assert_eq!(str_vec.join(":"), "1:1:1");
1733 }
1734
1735 #[tokio::test]
1736 pub async fn test_builder_builder_reuse_not_possible() {
1737 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1739 let _state: State = Arc::new(Mutex::new(Vec::new()));
1740 let scheduler = Scheduler::new(task_store);
1741
1742 let task = TestTask::new(1);
1743 let builder = scheduler.task(task);
1744
1745 let _id = builder.now().await.unwrap();
1751 }
1755
1756 #[tokio::test]
1757 pub async fn test_builder_different_task_types() {
1758 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1760 let state: State = Arc::new(Mutex::new(Vec::new()));
1761 let scheduler = Scheduler::new(task_store);
1762 scheduler.start(state.clone());
1763 scheduler.register::<TestTask>().unwrap();
1764 scheduler.register::<FailingTask>().unwrap();
1765
1766 let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1768
1769 let _id2 = scheduler
1770 .task(FailingTask::new(1, 0)) .key("failing-task")
1772 .now()
1773 .await
1774 .unwrap();
1775
1776 let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1777
1778 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1779
1780 let st = state.lock().unwrap();
1781 assert_eq!(st.len(), 3);
1782 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1783 assert_eq!(str_vec.join(":"), "1:1:1");
1785 }
1786
1787 #[tokio::test]
1792 pub async fn test_builder_cron_placeholder_syntax() {
1793 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1795 let state: State = Arc::new(Mutex::new(Vec::new()));
1796 let scheduler = Scheduler::new(task_store);
1797 scheduler.start(state.clone());
1798 scheduler.register::<TestTask>().unwrap();
1799
1800 let task = TestTask::new(1);
1802 let _id = scheduler
1803 .task(task)
1804 .key("cron-task")
1805 .cron("0 9 * * *") .schedule()
1807 .await
1808 .unwrap();
1809
1810 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1814
1815 let st = state.lock().unwrap();
1816 assert_eq!(st.len(), 0); }
1820
1821 #[tokio::test]
1822 pub async fn test_builder_daily_at_placeholder() {
1823 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1825 let state: State = Arc::new(Mutex::new(Vec::new()));
1826 let scheduler = Scheduler::new(task_store);
1827 scheduler.start(state.clone());
1828 scheduler.register::<TestTask>().unwrap();
1829
1830 let task = TestTask::new(1);
1832 let _id = scheduler
1833 .task(task)
1834 .key("daily-task")
1835 .daily_at(14, 30) .schedule()
1837 .await
1838 .unwrap();
1839
1840 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1843
1844 let st = state.lock().unwrap();
1845 assert_eq!(st.len(), 0);
1848 }
1849
1850 #[tokio::test]
1851 pub async fn test_builder_weekly_at_placeholder() {
1852 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1854 let state: State = Arc::new(Mutex::new(Vec::new()));
1855 let scheduler = Scheduler::new(task_store);
1856 scheduler.start(state.clone());
1857 scheduler.register::<TestTask>().unwrap();
1858
1859 let task = TestTask::new(1);
1861 let _id = scheduler
1862 .task(task)
1863 .key("weekly-task")
1864 .weekly_at(1, 9, 0) .schedule()
1866 .await
1867 .unwrap();
1868
1869 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1872
1873 let st = state.lock().unwrap();
1874 assert_eq!(st.len(), 0);
1877 }
1878
1879 #[tokio::test]
1880 pub async fn test_builder_cron_with_retry() {
1881 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1883 let state: State = Arc::new(Mutex::new(Vec::new()));
1884 let scheduler = Scheduler::new(task_store);
1885 scheduler.start(state.clone());
1886 scheduler.register::<TestTask>().unwrap();
1887
1888 let task = TestTask::new(1);
1890 let _id = scheduler
1891 .task(task)
1892 .key("reliable-scheduled-task")
1893 .daily_at(2, 0) .with_retry(RetryPolicy {
1895 wait_min_max: (60, 3600),
1896 times: 5,
1897 })
1898 .schedule()
1899 .await
1900 .unwrap();
1901
1902 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1905
1906 let st = state.lock().unwrap();
1907 assert_eq!(st.len(), 0);
1910 }
1911
1912 #[test]
1915 fn test_cron_to_string() {
1916 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1918 assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1919 }
1920
1921 #[tokio::test]
1922 pub async fn test_running_task_not_double_scheduled() {
1923 let _ = tracing_subscriber::fmt().try_init();
1924
1925 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1926 let state: State = Arc::new(Mutex::new(Vec::new()));
1927 let scheduler = Scheduler::new(task_store);
1928 scheduler.start(state.clone());
1929 scheduler.register::<TestTask>().unwrap();
1930
1931 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1934
1935 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1937
1938 {
1940 let running = scheduler.tasks_running.lock().unwrap();
1941 assert!(running.contains_key(&task_id), "Task should be in running queue");
1942 }
1943
1944 let task_meta = TaskMeta {
1946 task: task.clone(),
1947 next_at: Some(Timestamp::now()),
1948 deps: vec![],
1949 retry_count: 0,
1950 retry: None,
1951 cron: None,
1952 };
1953 let result = scheduler.add_queue(task_id, task_meta).await;
1954
1955 assert!(result.is_ok(), "add_queue should succeed");
1957
1958 {
1960 let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
1961 let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
1962 assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1963 }
1964
1965 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1967
1968 let st = state.lock().unwrap();
1970 assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1971 assert_eq!(st[0], 5);
1972 }
1973
1974 #[tokio::test]
1975 pub async fn test_running_task_metadata_updated() {
1976 let _ = tracing_subscriber::fmt().try_init();
1977
1978 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1979 let state: State = Arc::new(Mutex::new(Vec::new()));
1980 let scheduler = Scheduler::new(task_store);
1981 scheduler.start(state.clone());
1982 scheduler.register::<TestTask>().unwrap();
1983
1984 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1987
1988 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1990
1991 {
1993 let running = scheduler.tasks_running.lock().unwrap();
1994 let meta = running.get(&task_id).expect("Task should be running");
1995 assert!(meta.cron.is_none(), "Task should have no cron initially");
1996 }
1997
1998 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2000 let task_meta_with_cron = TaskMeta {
2001 task: task.clone(),
2002 next_at: Some(Timestamp::now()),
2003 deps: vec![],
2004 retry_count: 0,
2005 retry: None,
2006 cron: Some(cron.clone()),
2007 };
2008 let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2009
2010 assert!(result.is_ok(), "add_queue should succeed");
2012
2013 {
2015 let running = scheduler.tasks_running.lock().unwrap();
2016 let meta = running.get(&task_id).expect("Task should still be running");
2017 assert!(meta.cron.is_some(), "Task should now have cron after update");
2018 }
2019
2020 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2022 }
2023}
2024
2025