1use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt::Debug,
11 sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24 Periodic,
25 Once,
26}
27
28#[derive(Debug, Clone)]
31pub struct CronSchedule {
32 expr: Box<str>,
34 cron: Cron,
36}
37
38impl CronSchedule {
39 pub fn parse(expr: &str) -> ClResult<Self> {
41 let cron = Cron::from_str(expr)
42 .map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43 Ok(Self { expr: expr.into(), cron })
44 }
45
46 pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51 let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53 self.cron
54 .find_next_occurrence(&dt, false)
55 .map(|next| Timestamp(next.timestamp()))
56 .map_err(|e| {
57 tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58 Error::ValidationError(format!("cron next_execution failed: {}", e))
59 })
60 }
61
62 pub fn to_cron_string(&self) -> String {
64 self.expr.to_string()
65 }
66}
67
68impl PartialEq for CronSchedule {
69 fn eq(&self, other: &Self) -> bool {
70 self.expr == other.expr
71 }
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78 fn kind() -> &'static str
79 where
80 Self: Sized;
81 fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82 where
83 Self: Sized;
84 fn serialize(&self) -> String;
85 async fn run(&self, state: &S) -> ClResult<()>;
86
87 fn kind_of(&self) -> &'static str;
88
89 async fn on_failed(&self, _state: &S, _attempts: u16, _last_error: &str) {}
95}
96
97#[derive(Debug)]
98pub enum TaskStatus {
99 Pending,
100 Completed,
101 Failed,
102}
103
104pub struct TaskData {
105 id: TaskId,
106 kind: Box<str>,
107 status: TaskStatus,
108 input: Box<str>,
109 deps: Box<[TaskId]>,
110 retry_data: Option<Box<str>>,
111 cron_data: Option<Box<str>>,
112 next_at: Option<Timestamp>,
113}
114
115#[async_trait]
116pub trait TaskStore<S: Clone>: Send + Sync {
117 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
118 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
119 async fn load(&self) -> ClResult<Vec<TaskData>>;
120 async fn update_task_error(
121 &self,
122 task_id: TaskId,
123 output: &str,
124 next_at: Option<Timestamp>,
125 ) -> ClResult<()>;
126 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
127 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
128 async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>>;
129}
130
131pub struct InMemoryTaskStore {
134 last_id: Mutex<TaskId>,
135}
136
137impl InMemoryTaskStore {
138 pub fn new() -> Arc<Self> {
139 Arc::new(Self { last_id: Mutex::new(0) })
140 }
141}
142
143#[async_trait]
144impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
145 async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
146 let mut last_id = lock!(self.last_id)?;
147 *last_id += 1;
148 Ok(*last_id)
149 }
150
151 async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
152 Ok(())
153 }
154
155 async fn load(&self) -> ClResult<Vec<TaskData>> {
156 Ok(vec![])
157 }
158
159 async fn update_task_error(
160 &self,
161 _task_id: TaskId,
162 _output: &str,
163 _next_at: Option<Timestamp>,
164 ) -> ClResult<()> {
165 Ok(())
166 }
167
168 async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
169 Ok(None)
171 }
172
173 async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
174 Ok(())
176 }
177
178 async fn find_completed_deps(&self, _deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
179 Ok(vec![])
180 }
181}
182
183pub struct MetaAdapterTaskStore {
186 meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
187}
188
189impl MetaAdapterTaskStore {
190 pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
191 Arc::new(Self { meta_adapter })
192 }
193}
194
195#[async_trait]
196impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
197 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
198 let id = self
199 .meta_adapter
200 .create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
201 .await?;
202
203 if let Some(cron) = &task.cron {
205 self.meta_adapter
206 .update_task(
207 id,
208 &meta_adapter::TaskPatch {
209 cron: Patch::Value(cron.to_cron_string()),
210 ..Default::default()
211 },
212 )
213 .await?;
214 }
215
216 Ok(id)
217 }
218
219 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
220 self.meta_adapter.update_task_finished(id, output).await
221 }
222
223 async fn load(&self) -> ClResult<Vec<TaskData>> {
224 let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
225 let tasks = tasks
226 .into_iter()
227 .map(|t| TaskData {
228 id: t.task_id,
229 kind: t.kind,
230 status: match t.status {
231 'P' => TaskStatus::Pending,
232 'F' => TaskStatus::Completed,
233 _ => TaskStatus::Failed,
235 },
236 input: t.input,
237 deps: t.deps,
238 retry_data: t.retry,
239 cron_data: t.cron,
240 next_at: t.next_at,
241 })
242 .collect();
243 Ok(tasks)
244 }
245
246 async fn update_task_error(
247 &self,
248 task_id: TaskId,
249 output: &str,
250 next_at: Option<Timestamp>,
251 ) -> ClResult<()> {
252 self.meta_adapter.update_task_error(task_id, output, next_at).await
253 }
254
255 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
256 let task_opt = self.meta_adapter.find_task_by_key(key).await?;
257
258 match task_opt {
259 Some(t) => Ok(Some((
260 t.task_id,
261 TaskData {
262 id: t.task_id,
263 kind: t.kind,
264 status: match t.status {
265 'P' => TaskStatus::Pending,
266 'F' => TaskStatus::Completed,
267 _ => TaskStatus::Failed,
269 },
270 input: t.input,
271 deps: t.deps,
272 retry_data: t.retry,
273 cron_data: t.cron,
274 next_at: t.next_at,
275 },
276 ))),
277 None => Ok(None),
278 }
279 }
280
281 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
282 use cloudillo_types::types::Patch;
283
284 let mut patch = meta_adapter::TaskPatch {
286 input: Patch::Value(task.task.serialize()),
287 next_at: match task.next_at {
288 Some(ts) => Patch::Value(ts),
289 None => Patch::Null,
290 },
291 ..Default::default()
292 };
293
294 if !task.deps.is_empty() {
296 patch.deps = Patch::Value(task.deps.clone());
297 }
298
299 if let Some(ref retry) = task.retry {
301 let retry_str = format!(
302 "{},{},{},{}",
303 task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
304 );
305 patch.retry = Patch::Value(retry_str);
306 }
307
308 if let Some(ref cron) = task.cron {
310 patch.cron = Patch::Value(cron.to_cron_string());
311 }
312
313 self.meta_adapter.update_task(id, &patch).await
314 }
315
316 async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
317 self.meta_adapter.find_completed_deps(deps).await
318 }
319}
320
321type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
323
324#[derive(Debug, Clone)]
325pub struct RetryPolicy {
326 wait_min_max: (u64, u64),
327 times: u16,
328}
329
330impl Default for RetryPolicy {
331 fn default() -> Self {
332 Self { wait_min_max: (60, 3600), times: 10 }
333 }
334}
335
336impl RetryPolicy {
337 pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
339 Self { wait_min_max, times }
340 }
341
342 pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
344 let (min, max) = self.wait_min_max;
345 let backoff = min * (1u64 << u64::from(attempt_count));
346 backoff.min(max)
347 }
348
349 pub fn should_retry(&self, attempt_count: u16) -> bool {
351 attempt_count < self.times
352 }
353}
354
355pub struct TaskSchedulerBuilder<'a, S: Clone> {
358 scheduler: &'a Scheduler<S>,
359 task: Arc<dyn Task<S>>,
360 key: Option<String>,
361 next_at: Option<Timestamp>,
362 deps: Vec<TaskId>,
363 retry: Option<RetryPolicy>,
364 cron: Option<CronSchedule>,
365}
366
367impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
368 fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
370 Self {
371 scheduler,
372 task,
373 key: None,
374 next_at: None,
375 deps: Vec::new(),
376 retry: None,
377 cron: None,
378 }
379 }
380
381 pub fn key(mut self, key: impl Into<String>) -> Self {
383 self.key = Some(key.into());
384 self
385 }
386
387 pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
389 self.next_at = Some(timestamp);
390 self
391 }
392
393 pub fn schedule_after(mut self, seconds: i64) -> Self {
395 self.next_at = Some(Timestamp::from_now(seconds));
396 self
397 }
398
399 pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
401 self.deps = deps;
402 self
403 }
404
405 pub fn depends_on(mut self, dep: TaskId) -> Self {
407 self.deps.push(dep);
408 self
409 }
410
411 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
413 self.retry = Some(policy);
414 self
415 }
416
417 pub fn cron(mut self, expr: impl Into<String>) -> Self {
422 if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
423 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
426 self.cron = Some(cron_schedule);
427 }
428 self
429 }
430
431 pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
434 if hour <= 23 && minute <= 59 {
435 let expr = format!("{} {} * * *", minute, hour);
436 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
437 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
440 self.cron = Some(cron_schedule);
441 }
442 }
443 self
444 }
445
446 pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
450 if weekday <= 6 && hour <= 23 && minute <= 59 {
451 let expr = format!("{} {} * * {}", minute, hour, weekday);
452 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
453 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
456 self.cron = Some(cron_schedule);
457 }
458 }
459 self
460 }
461
462 pub async fn now(self) -> ClResult<TaskId> {
464 self.schedule().await
465 }
466
467 pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
469 self.next_at = Some(ts);
470 self.schedule().await
471 }
472
473 pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
475 self.next_at = Some(Timestamp::from_now(seconds));
476 self.schedule().await
477 }
478
479 pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
481 self.deps.push(dep);
482 self.schedule().await
483 }
484
485 pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
487 self.retry = Some(RetryPolicy::default());
488 self.schedule().await
489 }
490
491 pub async fn schedule(self) -> ClResult<TaskId> {
493 self.scheduler
494 .schedule_task_impl(
495 self.task,
496 self.key.as_deref(),
497 self.next_at,
498 if self.deps.is_empty() { None } else { Some(self.deps) },
499 self.retry,
500 self.cron,
501 )
502 .await
503 }
504}
505
506#[derive(Debug, Clone)]
507pub struct TaskMeta<S: Clone> {
508 pub task: Arc<dyn Task<S>>,
509 pub next_at: Option<Timestamp>,
510 pub deps: Vec<TaskId>,
511 retry_count: u16,
512 pub retry: Option<RetryPolicy>,
513 pub cron: Option<CronSchedule>,
514}
515
516type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
517type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
518
519#[derive(Clone)]
521pub struct Scheduler<S: Clone> {
522 task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
523 store: Arc<dyn TaskStore<S>>,
524 tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
525 tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
526 task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
527 tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
528 tx_finish: flume::Sender<TaskId>,
529 rx_finish: flume::Receiver<TaskId>,
530 notify_schedule: Arc<tokio::sync::Notify>,
531}
532
533impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
534 pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
535 let (tx_finish, rx_finish) = flume::unbounded();
536
537 let scheduler = Self {
538 task_builders: Arc::new(RwLock::new(HashMap::new())),
539 store,
540 tasks_running: Arc::new(Mutex::new(HashMap::new())),
541 tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
542 task_dependents: Arc::new(Mutex::new(HashMap::new())),
543 tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
544 tx_finish,
545 rx_finish,
546 notify_schedule: Arc::new(tokio::sync::Notify::new()),
547 };
548
549 Arc::new(scheduler)
552 }
553
554 pub fn start(&self, state: S) {
555 let schedule = self.clone();
557 let stat = state.clone();
558 let rx_finish = self.rx_finish.clone();
559
560 tokio::spawn(async move {
561 while let Ok(id) = rx_finish.recv_async().await {
562 debug!("Completed task {} (notified)", id);
563
564 let task_meta_opt = {
566 let tasks_running = match schedule.tasks_running.lock() {
567 Ok(guard) => guard,
568 Err(poisoned) => {
569 error!("Mutex poisoned: tasks_running (recovering)");
570 poisoned.into_inner()
571 }
572 };
573 tasks_running.get(&id).cloned()
574 };
575
576 if let Some(task_meta) = task_meta_opt {
577 let mut transition_ok = false;
579
580 if let Some(ref cron) = task_meta.cron {
582 let next_at = match cron.next_execution(Timestamp::now()) {
584 Ok(ts) => ts,
585 Err(e) => {
586 error!(
587 "Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
588 id, e
589 );
590 if let Err(e) = schedule.store.finished(id, "").await {
592 error!("Failed to mark task {} as finished: {}", id, e);
593 }
594 continue;
595 }
596 };
597 info!(
598 "Recurring task {} completed, scheduling next execution at {}",
599 id, next_at
600 );
601
602 let mut updated_meta = task_meta.clone();
604 updated_meta.next_at = Some(next_at);
605
606 if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
608 error!("Failed to update recurring task {} next_at: {}", id, e);
609 }
610
611 match schedule.tasks_running.lock() {
613 Ok(mut tasks_running) => {
614 tasks_running.remove(&id);
615 }
616 Err(poisoned) => {
617 error!("Mutex poisoned: tasks_running (recovering)");
618 poisoned.into_inner().remove(&id);
619 }
620 }
621
622 match schedule.add_queue(id, updated_meta).await {
624 Ok(_) => transition_ok = true,
625 Err(e) => {
626 error!(
627 "Failed to reschedule recurring task {}: {} - task lost!",
628 id, e
629 );
630 }
631 }
632 } else {
633 match schedule.store.finished(id, "").await {
635 Ok(()) => transition_ok = true,
636 Err(e) => {
637 error!(
638 "Failed to mark task {} as finished: {} - task remains in running queue",
639 id, e
640 );
641 }
642 }
643 }
644
645 if transition_ok {
647 match schedule.tasks_running.lock() {
648 Ok(mut tasks_running) => {
649 tasks_running.remove(&id);
650 }
651 Err(poisoned) => {
652 error!("Mutex poisoned: tasks_running (recovering)");
653 poisoned.into_inner().remove(&id);
654 }
655 }
656 }
657
658 match schedule.release_dependents(id) {
660 Ok(ready_to_spawn) => {
661 for (dep_id, dep_task_meta) in ready_to_spawn {
662 match schedule.tasks_running.lock() {
664 Ok(mut tasks_running) => {
665 tasks_running.insert(dep_id, dep_task_meta.clone());
666 }
667 Err(poisoned) => {
668 error!("Mutex poisoned: tasks_running (recovering)");
669 poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
670 }
671 }
672 schedule.spawn_task(
673 stat.clone(),
674 dep_task_meta.task.clone(),
675 dep_id,
676 dep_task_meta,
677 );
678 }
679 }
680 Err(e) => {
681 error!("Failed to release dependents of task {}: {}", id, e);
682 }
683 }
684 } else {
685 warn!("Completed task {} not found in running queue", id);
686 }
687 }
688 });
689
690 let schedule = self.clone();
692 tokio::spawn(async move {
693 loop {
694 let is_empty = match schedule.tasks_scheduled.lock() {
695 Ok(guard) => guard.is_empty(),
696 Err(poisoned) => {
697 error!("Mutex poisoned: tasks_scheduled (recovering)");
698 poisoned.into_inner().is_empty()
699 }
700 };
701 if is_empty {
702 schedule.notify_schedule.notified().await;
703 }
704 let time = Timestamp::now();
705 if let Some((timestamp, _id)) = loop {
706 let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
707 Ok(guard) => guard,
708 Err(poisoned) => {
709 error!("Mutex poisoned: tasks_scheduled (recovering)");
710 poisoned.into_inner()
711 }
712 };
713 if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
714 let (timestamp, id) = (timestamp, id);
715 if timestamp <= Timestamp::now() {
716 debug!("Spawning task id {} (from schedule)", id);
717 if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
718 let mut tasks_running = match schedule.tasks_running.lock() {
719 Ok(guard) => guard,
720 Err(poisoned) => {
721 error!("Mutex poisoned: tasks_running (recovering)");
722 poisoned.into_inner()
723 }
724 };
725 tasks_running.insert(id, task.clone());
726 schedule.spawn_task(state.clone(), task.task.clone(), id, task);
727 } else {
728 error!("Task disappeared while being removed from schedule");
729 break None;
730 }
731 } else {
732 break Some((timestamp, id));
733 }
734 } else {
735 break None;
736 }
737 } {
738 let diff = timestamp.0 - time.0;
739 let wait =
740 tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
741 tokio::select! {
742 () = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
743 };
744 }
745 }
746 });
747
748 let schedule = self.clone();
749 tokio::spawn(async move {
750 let _ignore_err = schedule.load().await;
751 });
752 }
753
754 fn register_builder(
755 &self,
756 name: &'static str,
757 builder: &'static TaskBuilder<S>,
758 ) -> ClResult<&Self> {
759 let mut task_builders = self
760 .task_builders
761 .write()
762 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
763 task_builders.insert(name, Box::new(builder));
764 Ok(self)
765 }
766
767 pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
768 info!("Registering task type {}", T::kind());
769 self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
770 Ok(self)
771 }
772
773 pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
775 TaskSchedulerBuilder::new(self, task)
776 }
777
778 async fn schedule_task_impl(
781 &self,
782 task: Arc<dyn Task<S>>,
783 key: Option<&str>,
784 next_at: Option<Timestamp>,
785 deps: Option<Vec<TaskId>>,
786 retry: Option<RetryPolicy>,
787 cron: Option<CronSchedule>,
788 ) -> ClResult<TaskId> {
789 let task_meta = TaskMeta {
790 task: task.clone(),
791 next_at,
792 deps: deps.clone().unwrap_or_default(),
793 retry_count: 0,
794 retry,
795 cron,
796 };
797
798 if let Some(key) = key
800 && let Some((existing_id, existing_data)) = self.store.find_by_key(key).await?
801 {
802 let new_serialized = task.serialize();
803 let existing_serialized = existing_data.input.as_ref();
804
805 if new_serialized == existing_serialized {
807 info!(
808 "Recurring task '{}' already exists with identical parameters (id={})",
809 key, existing_id
810 );
811 self.store.update_task(existing_id, &task_meta).await?;
813 self.add_queue(existing_id, task_meta).await?;
815 return Ok(existing_id);
816 }
817 info!("Updating recurring task '{}' (id={}) - parameters changed", key, existing_id);
818 debug!(" Old params: {}", existing_serialized);
819 debug!(" New params: {}", new_serialized);
820
821 self.remove_from_queues(existing_id)?;
823
824 self.store.update_task(existing_id, &task_meta).await?;
826
827 self.add_queue(existing_id, task_meta).await?;
829
830 return Ok(existing_id);
831 }
832
833 let id = self.store.add(&task_meta, key).await?;
835 self.add_queue(id, task_meta).await
836 }
837
838 pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
839 self.task(task).now().await
840 }
841
842 pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
843 {
846 let mut running = lock!(self.tasks_running, "tasks_running")?;
847 if let Some(existing_meta) = running.get_mut(&id) {
848 debug!(
849 "Task {} is already running, updating metadata (will reschedule on completion)",
850 id
851 );
852 *existing_meta = task_meta;
854 return Ok(id);
855 }
856 }
857
858 {
860 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
861 if let Some(key) = scheduled
862 .iter()
863 .find(|((_, tid), _)| *tid == id)
864 .map(|((ts, tid), _)| (*ts, *tid))
865 {
866 scheduled.remove(&key);
867 debug!("Removed existing scheduled entry for task {} before re-queueing", id);
868 }
869 }
870 {
871 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
872 if waiting.remove(&id).is_some() {
873 debug!("Removed existing waiting entry for task {} before re-queueing", id);
874 }
875 }
876
877 let deps = task_meta.deps.clone();
878
879 if !deps.is_empty() && task_meta.next_at.is_some() {
881 warn!(
882 "Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue",
883 id
884 );
885 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
887 debug!("Task {} is waiting for {:?}", id, &deps);
888 for dep in &deps {
889 lock!(self.task_dependents, "task_dependents")?
890 .entry(*dep)
891 .or_default()
892 .push(id);
893 }
894
895 self.check_and_resolve_completed_deps(id, &deps).await?;
896 return Ok(id);
897 }
898
899 if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
900 debug!("Spawning task {}", id);
901 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
902 self.notify_schedule.notify_one();
903 } else if let Some(next_at) = task_meta.next_at {
904 debug!("Scheduling task {} for {}", id, next_at);
905 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
906 self.notify_schedule.notify_one();
907 } else {
908 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
909 debug!("Task {} is waiting for {:?}", id, &deps);
910 for dep in &deps {
911 lock!(self.task_dependents, "task_dependents")?
912 .entry(*dep)
913 .or_default()
914 .push(id);
915 }
916
917 self.check_and_resolve_completed_deps(id, &deps).await?;
918 }
919 Ok(id)
920 }
921
922 async fn check_and_resolve_completed_deps(&self, id: TaskId, deps: &[TaskId]) -> ClResult<()> {
925 let completed_deps = self.store.find_completed_deps(deps).await?;
926 if completed_deps.is_empty() {
927 return Ok(());
928 }
929 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
930 if let Some(task_meta) = waiting.get_mut(&id) {
931 for dep in &completed_deps {
932 task_meta.deps.retain(|d| *d != *dep);
933 }
934 if task_meta.deps.is_empty()
935 && let Some(ready_task) = waiting.remove(&id)
936 {
937 drop(waiting);
938 let mut dependents = lock!(self.task_dependents, "task_dependents")?;
939 for dep in deps {
940 if let Some(dep_list) = dependents.get_mut(dep) {
941 dep_list.retain(|d| *d != id);
942 if dep_list.is_empty() {
943 dependents.remove(dep);
944 }
945 }
946 }
947 drop(dependents);
948 debug!("Task {} deps already completed, scheduling immediately", id);
949 lock!(self.tasks_scheduled, "tasks_scheduled")?
950 .insert((Timestamp(0), id), ready_task);
951 self.notify_schedule.notify_one();
952 }
953 }
954 Ok(())
955 }
956
957 fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
960 if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
962 debug!("Removed task {} from waiting queue for update", task_id);
963 return Ok(Some(task_meta));
964 }
965
966 {
968 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
969 if let Some(key) = scheduled
970 .iter()
971 .find(|((_, id), _)| *id == task_id)
972 .map(|((ts, id), _)| (*ts, *id))
973 && let Some(task_meta) = scheduled.remove(&key)
974 {
975 debug!("Removed task {} from scheduled queue for update", task_id);
976 return Ok(Some(task_meta));
977 }
978 }
979
980 if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
982 warn!("Removed task {} from running queue during update", task_id);
983 return Ok(Some(task_meta));
984 }
985
986 Ok(None)
987 }
988
989 fn release_dependents(
992 &self,
993 completed_task_id: TaskId,
994 ) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
995 let dependents = {
997 let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
998 deps_map.remove(&completed_task_id).unwrap_or_default()
999 };
1000
1001 if dependents.is_empty() {
1002 return Ok(Vec::new()); }
1004
1005 debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
1006
1007 let mut ready_to_spawn = Vec::new();
1008
1009 for dependent_id in dependents {
1011 {
1013 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1014 if let Some(task_meta) = waiting.get_mut(&dependent_id) {
1015 task_meta.deps.retain(|x| *x != completed_task_id);
1017
1018 if task_meta.deps.is_empty() {
1020 if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
1021 debug!(
1022 "Dependent task {} ready to spawn (all dependencies cleared)",
1023 dependent_id
1024 );
1025 ready_to_spawn.push((dependent_id, task_to_spawn));
1026 }
1027 } else {
1028 debug!(
1029 "Task {} still has {} remaining dependencies",
1030 dependent_id,
1031 task_meta.deps.len()
1032 );
1033 }
1034 continue;
1035 }
1036 }
1037
1038 {
1040 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1041 if let Some(scheduled_key) = scheduled
1042 .iter()
1043 .find(|((_, id), _)| *id == dependent_id)
1044 .map(|((ts, id), _)| (*ts, *id))
1045 {
1046 if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
1047 task_meta.deps.retain(|x| *x != completed_task_id);
1048 let remaining = task_meta.deps.len();
1049 if remaining == 0 {
1050 debug!(
1051 "Task {} in scheduled queue has no remaining dependencies",
1052 dependent_id
1053 );
1054 } else {
1055 debug!(
1056 "Task {} in scheduled queue has {} remaining dependencies",
1057 dependent_id, remaining
1058 );
1059 }
1060 }
1061 continue;
1062 }
1063 }
1064
1065 warn!(
1067 "Dependent task {} of completed task {} not found in any queue",
1068 dependent_id, completed_task_id
1069 );
1070 }
1071
1072 Ok(ready_to_spawn)
1073 }
1074
1075 async fn load(&self) -> ClResult<()> {
1076 let tasks = self.store.load().await?;
1077 debug!("Loaded {} tasks from store", tasks.len());
1078 for t in tasks {
1079 if let TaskStatus::Pending = t.status {
1080 debug!("Loading task {} {}", t.id, t.kind);
1081 let task = {
1082 let builder_map = self
1083 .task_builders
1084 .read()
1085 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1086 let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1087 format!("task builder not registered: {}", t.kind),
1088 ))?;
1089 builder(t.id, &t.input)?
1090 };
1091 let (retry_count, retry) = match t.retry_data {
1092 Some(retry_str) => {
1093 let (retry_count, retry_min, retry_max, retry_times) = retry_str
1094 .split(',')
1095 .collect_tuple()
1096 .ok_or(Error::Internal("invalid retry policy format".into()))?;
1097 let retry_count: u16 = retry_count
1098 .parse()
1099 .map_err(|_| Error::Internal("retry count must be u16".into()))?;
1100 let retry = RetryPolicy {
1101 wait_min_max: (
1102 retry_min
1103 .parse()
1104 .map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1105 retry_max
1106 .parse()
1107 .map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1108 ),
1109 times: retry_times
1110 .parse()
1111 .map_err(|_| Error::Internal("retry times must be u64".into()))?,
1112 };
1113 debug!("Loaded retry policy: {:?}", retry);
1114 (retry_count, Some(retry))
1115 }
1116 _ => (0, None),
1117 };
1118 let cron =
1120 t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1121
1122 let task_meta = TaskMeta {
1123 task,
1124 next_at: t.next_at,
1125 deps: t.deps.into(),
1126 retry_count,
1127 retry,
1128 cron,
1129 };
1130 self.add_queue(t.id, task_meta).await?;
1131 }
1132 }
1133 Ok(())
1134 }
1135
1136 fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1137 let tx_finish = self.tx_finish.clone();
1138 let store = self.store.clone();
1139 let scheduler = self.clone();
1140 tokio::spawn(async move {
1142 match task.run(&state).await {
1143 Ok(()) => {
1144 debug!("Task {} completed successfully", id);
1145 tx_finish.send(id).unwrap_or(());
1146 }
1147 Err(e) => {
1148 if let Some(retry_policy) = &task_meta.retry {
1149 if retry_policy.should_retry(task_meta.retry_count) {
1150 let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1151 let next_at = Timestamp::from_now(backoff.cast_signed());
1152
1153 info!(
1154 "Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1155 id,
1156 task_meta.retry_count + 1,
1157 retry_policy.times,
1158 backoff,
1159 e
1160 );
1161
1162 store
1164 .update_task_error(id, &e.to_string(), Some(next_at))
1165 .await
1166 .unwrap_or(());
1167
1168 match scheduler.tasks_running.lock() {
1170 Ok(mut tasks_running) => {
1171 tasks_running.remove(&id);
1172 }
1173 Err(poisoned) => {
1174 error!("Mutex poisoned: tasks_running (recovering)");
1175 poisoned.into_inner().remove(&id);
1176 }
1177 }
1178
1179 let mut retry_meta = task_meta.clone();
1181 retry_meta.retry_count += 1;
1182 retry_meta.next_at = Some(next_at);
1183 scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1184 } else {
1185 error!(
1187 "Task {} failed after {} retries: {}",
1188 id, task_meta.retry_count, e
1189 );
1190 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1191 task.on_failed(&state, task_meta.retry_count, &e.to_string()).await;
1192 tx_finish.send(id).unwrap_or(());
1193 }
1194 } else {
1195 error!("Task {} failed: {}", id, e);
1197 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1198 task.on_failed(&state, 0, &e.to_string()).await;
1199 tx_finish.send(id).unwrap_or(());
1200 }
1201 }
1202 }
1203 });
1204 }
1205
1206 pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1209 let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1210 let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1211 let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1212 let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1213
1214 let mut stuck_tasks = Vec::new();
1216 let mut tasks_with_missing_deps = Vec::new();
1217
1218 {
1220 let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1221 let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1222
1223 for (id, task_meta) in waiting.iter() {
1224 if task_meta.deps.is_empty() {
1225 stuck_tasks.push(*id);
1226 warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1227 } else {
1228 for dep in &task_meta.deps {
1230 let dep_exists = waiting.contains_key(dep)
1231 || self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1232 || self
1233 .tasks_scheduled
1234 .lock()
1235 .ok()
1236 .is_some_and(|s| s.iter().any(|((_, task_id), _)| task_id == dep));
1237
1238 if !dep_exists {
1239 tasks_with_missing_deps.push((*id, *dep));
1240 warn!(
1241 "SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1242 id, dep
1243 );
1244 }
1245 }
1246 }
1247 }
1248 }
1249
1250 Ok(SchedulerHealth {
1251 waiting: waiting_count,
1252 scheduled: scheduled_count,
1253 running: running_count,
1254 dependents: dependents_count,
1255 stuck_tasks,
1256 tasks_with_missing_deps,
1257 })
1258 }
1259}
1260
1261#[derive(Debug, Clone)]
1263pub struct SchedulerHealth {
1264 pub waiting: usize,
1266 pub scheduled: usize,
1268 pub running: usize,
1270 pub dependents: usize,
1272 pub stuck_tasks: Vec<TaskId>,
1274 pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1276}
1277
1278#[cfg(test)]
1279mod tests {
1280 use super::*;
1281 use serde::{Deserialize, Serialize};
1282
1283 type State = Arc<Mutex<Vec<u8>>>;
1284
1285 #[derive(Debug, Serialize, Deserialize)]
1286 struct TestTask {
1287 num: u8,
1288 }
1289
1290 impl TestTask {
1291 pub fn new(num: u8) -> Arc<Self> {
1292 Arc::new(Self { num })
1293 }
1294 }
1295
1296 #[async_trait]
1297 impl Task<State> for TestTask {
1298 fn kind() -> &'static str {
1299 "test"
1300 }
1301
1302 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1303 let num: u8 = ctx
1304 .parse()
1305 .map_err(|_| Error::Internal("test task context must be u8".into()))?;
1306 let task = TestTask::new(num);
1307 Ok(task)
1308 }
1309
1310 fn serialize(&self) -> String {
1311 self.num.to_string()
1312 }
1313
1314 fn kind_of(&self) -> &'static str {
1315 "test"
1316 }
1317
1318 async fn run(&self, state: &State) -> ClResult<()> {
1319 info!("Running task {}", self.num);
1320 tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1321 info!("Completed task {}", self.num);
1322 state.lock().unwrap().push(self.num);
1323 Ok(())
1324 }
1325 }
1326
1327 #[derive(Debug, Clone)]
1328 struct FailingTask {
1329 id: u8,
1330 fail_count: u8,
1331 attempt: Arc<Mutex<u8>>,
1332 }
1333
1334 impl FailingTask {
1335 pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1336 Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1337 }
1338 }
1339
1340 #[async_trait]
1341 impl Task<State> for FailingTask {
1342 fn kind() -> &'static str {
1343 "failing"
1344 }
1345
1346 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1347 let parts: Vec<&str> = ctx.split(',').collect();
1348 if parts.len() != 2 {
1349 return Err(Error::Internal("failing task context must have 2 parts".into()));
1350 }
1351 let id: u8 = parts[0]
1352 .parse()
1353 .map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1354 let fail_count: u8 = parts[1]
1355 .parse()
1356 .map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1357 Ok(FailingTask::new(id, fail_count))
1358 }
1359
1360 fn serialize(&self) -> String {
1361 format!("{},{}", self.id, self.fail_count)
1362 }
1363
1364 fn kind_of(&self) -> &'static str {
1365 "failing"
1366 }
1367
1368 async fn run(&self, state: &State) -> ClResult<()> {
1369 let mut attempt = self.attempt.lock().unwrap();
1370 *attempt += 1;
1371 let current_attempt = *attempt;
1372
1373 info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1374
1375 if current_attempt <= self.fail_count {
1376 error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1377 return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1378 }
1379
1380 info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1381 state.lock().unwrap().push(self.id);
1382 Ok(())
1383 }
1384 }
1385
1386 #[tokio::test]
1387 pub async fn test_scheduler() {
1388 let _ = tracing_subscriber::fmt().try_init();
1389
1390 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1391 let state: State = Arc::new(Mutex::new(Vec::new()));
1392 let scheduler = Scheduler::new(task_store);
1393 scheduler.start(state.clone());
1394 scheduler.register::<TestTask>().unwrap();
1395
1396 let _task1 = TestTask::new(1);
1397 let task2 = TestTask::new(1);
1398 let task3 = TestTask::new(1);
1399
1400 let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1401 let task3_id = scheduler.add(task3).await.unwrap();
1402 scheduler
1403 .task(TestTask::new(1))
1404 .depend_on(vec![task2_id, task3_id])
1405 .schedule()
1406 .await
1407 .unwrap();
1408
1409 tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1410 let task4 = TestTask::new(1);
1411 let task5 = TestTask::new(1);
1412 scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1413 scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1414
1415 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1416
1417 let st = state.lock().unwrap();
1418 info!("res: {}", st.len());
1419 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1420 assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1421 }
1422
1423 #[tokio::test]
1424 pub async fn test_retry_with_backoff() {
1425 let _ = tracing_subscriber::fmt().try_init();
1426
1427 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1428 let state: State = Arc::new(Mutex::new(Vec::new()));
1429 let scheduler = Scheduler::new(task_store);
1430 scheduler.start(state.clone());
1431 scheduler.register::<FailingTask>().unwrap();
1432
1433 let failing_task = FailingTask::new(42, 2);
1436 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1437
1438 scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1439
1440 tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1447
1448 let st = state.lock().unwrap();
1449 assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1450 assert_eq!(st[0], 42);
1451 }
1452
1453 #[tokio::test]
1456 pub async fn test_builder_simple_schedule() {
1457 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1458 let state: State = Arc::new(Mutex::new(Vec::new()));
1459 let scheduler = Scheduler::new(task_store);
1460 scheduler.start(state.clone());
1461 scheduler.register::<TestTask>().unwrap();
1462
1463 let task = TestTask::new(1);
1465 let id = scheduler.task(task).now().await.unwrap();
1466
1467 assert!(id > 0, "Task ID should be positive");
1468
1469 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1470
1471 let st = state.lock().unwrap();
1472 assert_eq!(st.len(), 1, "Task should have executed");
1473 assert_eq!(st[0], 1);
1474 }
1475
1476 #[tokio::test]
1477 pub async fn test_builder_with_key() {
1478 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1479 let state: State = Arc::new(Mutex::new(Vec::new()));
1480 let scheduler = Scheduler::new(task_store);
1481 scheduler.start(state.clone());
1482 scheduler.register::<TestTask>().unwrap();
1483
1484 let task = TestTask::new(1);
1486 let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1487
1488 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1489
1490 let st = state.lock().unwrap();
1491 assert_eq!(st.len(), 1);
1492 assert_eq!(st[0], 1);
1493 }
1494
1495 #[tokio::test]
1496 pub async fn test_builder_with_delay() {
1497 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1498 let state: State = Arc::new(Mutex::new(Vec::new()));
1499 let scheduler = Scheduler::new(task_store);
1500 scheduler.start(state.clone());
1501 scheduler.register::<TestTask>().unwrap();
1502
1503 let task = TestTask::new(1);
1505 let _id = scheduler
1506 .task(task)
1507 .after(1) .await
1509 .unwrap();
1510
1511 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1513 {
1514 let st = state.lock().unwrap();
1515 assert_eq!(st.len(), 0, "Task should not execute yet");
1516 }
1517
1518 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1520
1521 {
1522 let st = state.lock().unwrap();
1523 assert_eq!(st.len(), 1, "Task should have executed");
1524 assert_eq!(st[0], 1);
1525 }
1526 }
1527
1528 #[tokio::test]
1529 pub async fn test_builder_with_dependencies() {
1530 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1531 let state: State = Arc::new(Mutex::new(Vec::new()));
1532 let scheduler = Scheduler::new(task_store);
1533 scheduler.start(state.clone());
1534 scheduler.register::<TestTask>().unwrap();
1535
1536 let task1 = TestTask::new(1);
1538 let id1 = scheduler.task(task1).now().await.unwrap();
1539
1540 let task2 = TestTask::new(1);
1542 let id2 = scheduler.task(task2).now().await.unwrap();
1543
1544 let task3 = TestTask::new(1);
1546 let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1547
1548 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1550
1551 let st = state.lock().unwrap();
1552 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1554 assert_eq!(str_vec.join(":"), "1:1:1");
1555 }
1556
1557 #[tokio::test]
1558 pub async fn test_builder_with_retry() {
1559 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1560 let state: State = Arc::new(Mutex::new(Vec::new()));
1561 let scheduler = Scheduler::new(task_store);
1562 scheduler.start(state.clone());
1563 scheduler.register::<FailingTask>().unwrap();
1564
1565 let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1568
1569 let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1570
1571 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1573
1574 let st = state.lock().unwrap();
1575 assert_eq!(st.len(), 1);
1576 assert_eq!(st[0], 55);
1577 }
1578
1579 #[tokio::test]
1580 pub async fn test_builder_with_automatic_retry() {
1581 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1582 let state: State = Arc::new(Mutex::new(Vec::new()));
1583 let scheduler = Scheduler::new(task_store);
1584 scheduler.start(state.clone());
1585 scheduler.register::<FailingTask>().unwrap();
1586
1587 let failing_task = FailingTask::new(66, 1);
1589 let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1590
1591 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1594
1595 let st = state.lock().unwrap();
1597 let _ = st.len(); }
1601
1602 #[tokio::test]
1603 pub async fn test_builder_fluent_chaining() {
1604 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1605 let state: State = Arc::new(Mutex::new(Vec::new()));
1606 let scheduler = Scheduler::new(task_store);
1607 scheduler.start(state.clone());
1608 scheduler.register::<TestTask>().unwrap();
1609
1610 let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1612 let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1613
1614 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1616
1617 let task = TestTask::new(1);
1618 let _id = scheduler
1619 .task(task)
1620 .key("complex-task")
1621 .schedule_after(0) .depend_on(vec![dep1, dep2])
1623 .with_retry(retry_policy)
1624 .schedule()
1625 .await
1626 .unwrap();
1627
1628 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1629
1630 let st = state.lock().unwrap();
1631 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1633 assert_eq!(str_vec.join(":"), "1:1:1");
1634 }
1635
1636 #[tokio::test]
1637 pub async fn test_builder_backward_compatibility() {
1638 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1639 let state: State = Arc::new(Mutex::new(Vec::new()));
1640 let scheduler = Scheduler::new(task_store);
1641 scheduler.start(state.clone());
1642 scheduler.register::<TestTask>().unwrap();
1643
1644 let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1646
1647 let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1649
1650 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1651
1652 let st = state.lock().unwrap();
1653 assert_eq!(st.len(), 2);
1655 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1656 assert_eq!(str_vec.join(":"), "1:1");
1657 }
1658
1659 #[tokio::test]
1662 pub async fn test_builder_pipeline_scenario() {
1663 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1665 let state: State = Arc::new(Mutex::new(Vec::new()));
1666 let scheduler = Scheduler::new(task_store);
1667 scheduler.start(state.clone());
1668 scheduler.register::<TestTask>().unwrap();
1669
1670 let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1672
1673 let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1675
1676 let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1678
1679 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1681
1682 let st = state.lock().unwrap();
1683 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1685 assert_eq!(str_vec.join(":"), "1:1:1");
1686 }
1687
1688 #[tokio::test]
1689 pub async fn test_builder_multi_dependency_join() {
1690 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1692 let state: State = Arc::new(Mutex::new(Vec::new()));
1693 let scheduler = Scheduler::new(task_store);
1694 scheduler.start(state.clone());
1695 scheduler.register::<TestTask>().unwrap();
1696
1697 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1699 let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1700
1701 let _id3 = scheduler
1703 .task(TestTask::new(1))
1704 .depend_on(vec![id1, id2])
1705 .schedule()
1706 .await
1707 .unwrap();
1708
1709 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1710
1711 let st = state.lock().unwrap();
1712 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1714 assert_eq!(str_vec.join(":"), "1:1:1");
1715 }
1716
1717 #[tokio::test]
1718 pub async fn test_builder_scheduled_task_with_dependencies() {
1719 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1721 let state: State = Arc::new(Mutex::new(Vec::new()));
1722 let scheduler = Scheduler::new(task_store);
1723 scheduler.start(state.clone());
1724 scheduler.register::<TestTask>().unwrap();
1725
1726 let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1728
1729 let ts = Timestamp::from_now(1);
1731 let _task_id = scheduler
1732 .task(TestTask::new(1))
1733 .schedule_at(ts)
1734 .depend_on(vec![dep_id])
1735 .schedule()
1736 .await
1737 .unwrap();
1738
1739 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1741 {
1742 let st = state.lock().unwrap();
1743 assert_eq!(st.len(), 1); }
1745
1746 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1748
1749 {
1750 let st = state.lock().unwrap();
1751 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1752 assert_eq!(str_vec.join(":"), "1:1");
1753 }
1754 }
1755
1756 #[tokio::test]
1757 pub async fn test_builder_mixed_features() {
1758 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1760 let state: State = Arc::new(Mutex::new(Vec::new()));
1761 let scheduler = Scheduler::new(task_store);
1762 scheduler.start(state.clone());
1763 scheduler.register::<TestTask>().unwrap();
1764 scheduler.register::<FailingTask>().unwrap();
1765
1766 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1768
1769 let _id2 = scheduler
1771 .task(TestTask::new(1))
1772 .key("critical-task")
1773 .schedule_after(0)
1774 .depend_on(vec![id1])
1775 .schedule()
1776 .await
1777 .unwrap();
1778
1779 let _id3 = scheduler
1781 .task(FailingTask::new(1, 0)) .key("retryable-task")
1783 .with_retry(RetryPolicy {
1784 wait_min_max: (1, 3600),
1785 times: 3,
1786 })
1787 .schedule()
1788 .await
1789 .unwrap();
1790
1791 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1793
1794 let st = state.lock().unwrap();
1795 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1797 assert_eq!(str_vec.join(":"), "1:1:1");
1798 }
1799
1800 #[tokio::test]
1801 pub async fn test_builder_builder_reuse_not_possible() {
1802 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1804 let _state: State = Arc::new(Mutex::new(Vec::new()));
1805 let scheduler = Scheduler::new(task_store);
1806
1807 let task = TestTask::new(1);
1808 let builder = scheduler.task(task);
1809
1810 let _id = builder.now().await.unwrap();
1816 }
1820
1821 #[tokio::test]
1822 pub async fn test_builder_different_task_types() {
1823 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1825 let state: State = Arc::new(Mutex::new(Vec::new()));
1826 let scheduler = Scheduler::new(task_store);
1827 scheduler.start(state.clone());
1828 scheduler.register::<TestTask>().unwrap();
1829 scheduler.register::<FailingTask>().unwrap();
1830
1831 let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1833
1834 let _id2 = scheduler
1835 .task(FailingTask::new(1, 0)) .key("failing-task")
1837 .now()
1838 .await
1839 .unwrap();
1840
1841 let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1842
1843 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1844
1845 let st = state.lock().unwrap();
1846 assert_eq!(st.len(), 3);
1847 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1848 assert_eq!(str_vec.join(":"), "1:1:1");
1850 }
1851
1852 #[tokio::test]
1857 pub async fn test_builder_cron_placeholder_syntax() {
1858 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1860 let state: State = Arc::new(Mutex::new(Vec::new()));
1861 let scheduler = Scheduler::new(task_store);
1862 scheduler.start(state.clone());
1863 scheduler.register::<TestTask>().unwrap();
1864
1865 let task = TestTask::new(1);
1867 let _id = scheduler
1868 .task(task)
1869 .key("cron-task")
1870 .cron("0 9 * * *") .schedule()
1872 .await
1873 .unwrap();
1874
1875 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1879
1880 let st = state.lock().unwrap();
1881 assert_eq!(st.len(), 0); }
1885
1886 #[tokio::test]
1887 pub async fn test_builder_daily_at_placeholder() {
1888 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1890 let state: State = Arc::new(Mutex::new(Vec::new()));
1891 let scheduler = Scheduler::new(task_store);
1892 scheduler.start(state.clone());
1893 scheduler.register::<TestTask>().unwrap();
1894
1895 let task = TestTask::new(1);
1897 let _id = scheduler
1898 .task(task)
1899 .key("daily-task")
1900 .daily_at(14, 30) .schedule()
1902 .await
1903 .unwrap();
1904
1905 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1908
1909 let st = state.lock().unwrap();
1910 assert_eq!(st.len(), 0);
1913 }
1914
1915 #[tokio::test]
1916 pub async fn test_builder_weekly_at_placeholder() {
1917 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1919 let state: State = Arc::new(Mutex::new(Vec::new()));
1920 let scheduler = Scheduler::new(task_store);
1921 scheduler.start(state.clone());
1922 scheduler.register::<TestTask>().unwrap();
1923
1924 let task = TestTask::new(1);
1926 let _id = scheduler
1927 .task(task)
1928 .key("weekly-task")
1929 .weekly_at(1, 9, 0) .schedule()
1931 .await
1932 .unwrap();
1933
1934 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1937
1938 let st = state.lock().unwrap();
1939 assert_eq!(st.len(), 0);
1942 }
1943
1944 #[tokio::test]
1945 pub async fn test_builder_cron_with_retry() {
1946 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1948 let state: State = Arc::new(Mutex::new(Vec::new()));
1949 let scheduler = Scheduler::new(task_store);
1950 scheduler.start(state.clone());
1951 scheduler.register::<TestTask>().unwrap();
1952
1953 let task = TestTask::new(1);
1955 let _id = scheduler
1956 .task(task)
1957 .key("reliable-scheduled-task")
1958 .daily_at(2, 0) .with_retry(RetryPolicy {
1960 wait_min_max: (60, 3600),
1961 times: 5,
1962 })
1963 .schedule()
1964 .await
1965 .unwrap();
1966
1967 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1970
1971 let st = state.lock().unwrap();
1972 assert_eq!(st.len(), 0);
1975 }
1976
1977 #[test]
1980 fn test_cron_to_string() {
1981 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1983 assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1984 }
1985
1986 #[tokio::test]
1987 pub async fn test_running_task_not_double_scheduled() {
1988 let _ = tracing_subscriber::fmt().try_init();
1989
1990 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1991 let state: State = Arc::new(Mutex::new(Vec::new()));
1992 let scheduler = Scheduler::new(task_store);
1993 scheduler.start(state.clone());
1994 scheduler.register::<TestTask>().unwrap();
1995
1996 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1999
2000 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2002
2003 {
2005 let running = scheduler.tasks_running.lock().unwrap();
2006 assert!(running.contains_key(&task_id), "Task should be in running queue");
2007 }
2008
2009 let task_meta = TaskMeta {
2011 task: task.clone(),
2012 next_at: Some(Timestamp::now()),
2013 deps: vec![],
2014 retry_count: 0,
2015 retry: None,
2016 cron: None,
2017 };
2018 let result = scheduler.add_queue(task_id, task_meta).await;
2019
2020 assert!(result.is_ok(), "add_queue should succeed");
2022
2023 {
2025 let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
2026 let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
2027 assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
2028 }
2029
2030 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2032
2033 let st = state.lock().unwrap();
2035 assert_eq!(st.len(), 1, "Only one task execution should have occurred");
2036 assert_eq!(st[0], 5);
2037 }
2038
2039 #[tokio::test]
2040 pub async fn test_running_task_metadata_updated() {
2041 let _ = tracing_subscriber::fmt().try_init();
2042
2043 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2044 let state: State = Arc::new(Mutex::new(Vec::new()));
2045 let scheduler = Scheduler::new(task_store);
2046 scheduler.start(state.clone());
2047 scheduler.register::<TestTask>().unwrap();
2048
2049 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
2052
2053 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2055
2056 {
2058 let running = scheduler.tasks_running.lock().unwrap();
2059 let meta = running.get(&task_id).expect("Task should be running");
2060 assert!(meta.cron.is_none(), "Task should have no cron initially");
2061 }
2062
2063 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2065 let task_meta_with_cron = TaskMeta {
2066 task: task.clone(),
2067 next_at: Some(Timestamp::now()),
2068 deps: vec![],
2069 retry_count: 0,
2070 retry: None,
2071 cron: Some(cron.clone()),
2072 };
2073 let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2074
2075 assert!(result.is_ok(), "add_queue should succeed");
2077
2078 {
2080 let running = scheduler.tasks_running.lock().unwrap();
2081 let meta = running.get(&task_id).expect("Task should still be running");
2082 assert!(meta.cron.is_some(), "Task should now have cron after update");
2083 }
2084
2085 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2087 }
2088}
2089
2090