1use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt::Debug,
11 sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24 Periodic,
25 Once,
26}
27
28#[derive(Debug, Clone)]
31pub struct CronSchedule {
32 expr: Box<str>,
34 cron: Cron,
36}
37
38impl CronSchedule {
39 pub fn parse(expr: &str) -> ClResult<Self> {
41 let cron = Cron::from_str(expr)
42 .map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43 Ok(Self { expr: expr.into(), cron })
44 }
45
46 pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51 let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53 self.cron
54 .find_next_occurrence(&dt, false)
55 .map(|next| Timestamp(next.timestamp()))
56 .map_err(|e| {
57 tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58 Error::ValidationError(format!("cron next_execution failed: {}", e))
59 })
60 }
61
62 pub fn to_cron_string(&self) -> String {
64 self.expr.to_string()
65 }
66}
67
68impl PartialEq for CronSchedule {
69 fn eq(&self, other: &Self) -> bool {
70 self.expr == other.expr
71 }
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78 fn kind() -> &'static str
79 where
80 Self: Sized;
81 fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82 where
83 Self: Sized;
84 fn serialize(&self) -> String;
85 async fn run(&self, state: &S) -> ClResult<()>;
86
87 fn kind_of(&self) -> &'static str;
88
89 async fn on_failed(&self, _state: &S, _attempts: u16, _last_error: &str) {}
95}
96
97#[derive(Debug)]
98pub enum TaskStatus {
99 Pending,
100 Completed,
101 Failed,
102}
103
104pub struct TaskData {
105 id: TaskId,
106 kind: Box<str>,
107 status: TaskStatus,
108 input: Box<str>,
109 deps: Box<[TaskId]>,
110 retry_data: Option<Box<str>>,
111 cron_data: Option<Box<str>>,
112 next_at: Option<Timestamp>,
113}
114
115#[async_trait]
116pub trait TaskStore<S: Clone>: Send + Sync {
117 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
118 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
119 async fn load(&self) -> ClResult<Vec<TaskData>>;
120 async fn update_task_error(
121 &self,
122 task_id: TaskId,
123 output: &str,
124 next_at: Option<Timestamp>,
125 ) -> ClResult<()>;
126 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
127 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
128 async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>>;
129}
130
131pub struct InMemoryTaskStore {
134 last_id: Mutex<TaskId>,
135}
136
137impl InMemoryTaskStore {
138 pub fn new() -> Arc<Self> {
139 Arc::new(Self { last_id: Mutex::new(0) })
140 }
141}
142
143#[async_trait]
144impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
145 async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
146 let mut last_id = lock!(self.last_id)?;
147 *last_id += 1;
148 Ok(*last_id)
149 }
150
151 async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
152 Ok(())
153 }
154
155 async fn load(&self) -> ClResult<Vec<TaskData>> {
156 Ok(vec![])
157 }
158
159 async fn update_task_error(
160 &self,
161 _task_id: TaskId,
162 _output: &str,
163 _next_at: Option<Timestamp>,
164 ) -> ClResult<()> {
165 Ok(())
166 }
167
168 async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
169 Ok(None)
171 }
172
173 async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
174 Ok(())
176 }
177
178 async fn find_completed_deps(&self, _deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
179 Ok(vec![])
180 }
181}
182
183pub struct MetaAdapterTaskStore {
186 meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
187}
188
189impl MetaAdapterTaskStore {
190 pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
191 Arc::new(Self { meta_adapter })
192 }
193}
194
195#[async_trait]
196impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
197 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
198 let id = self
199 .meta_adapter
200 .create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
201 .await?;
202
203 if let Some(cron) = &task.cron {
205 self.meta_adapter
206 .update_task(
207 id,
208 &meta_adapter::TaskPatch {
209 cron: Patch::Value(cron.to_cron_string()),
210 ..Default::default()
211 },
212 )
213 .await?;
214 }
215
216 Ok(id)
217 }
218
219 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
220 self.meta_adapter.update_task_finished(id, output).await
221 }
222
223 async fn load(&self) -> ClResult<Vec<TaskData>> {
224 let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
225 let tasks = tasks
226 .into_iter()
227 .map(|t| TaskData {
228 id: t.task_id,
229 kind: t.kind,
230 status: match t.status {
231 'P' => TaskStatus::Pending,
232 'F' => TaskStatus::Completed,
233 _ => TaskStatus::Failed,
235 },
236 input: t.input,
237 deps: t.deps,
238 retry_data: t.retry,
239 cron_data: t.cron,
240 next_at: t.next_at,
241 })
242 .collect();
243 Ok(tasks)
244 }
245
246 async fn update_task_error(
247 &self,
248 task_id: TaskId,
249 output: &str,
250 next_at: Option<Timestamp>,
251 ) -> ClResult<()> {
252 self.meta_adapter.update_task_error(task_id, output, next_at).await
253 }
254
255 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
256 let task_opt = self.meta_adapter.find_task_by_key(key).await?;
257
258 match task_opt {
259 Some(t) => Ok(Some((
260 t.task_id,
261 TaskData {
262 id: t.task_id,
263 kind: t.kind,
264 status: match t.status {
265 'P' => TaskStatus::Pending,
266 'F' => TaskStatus::Completed,
267 _ => TaskStatus::Failed,
269 },
270 input: t.input,
271 deps: t.deps,
272 retry_data: t.retry,
273 cron_data: t.cron,
274 next_at: t.next_at,
275 },
276 ))),
277 None => Ok(None),
278 }
279 }
280
281 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
282 use cloudillo_types::types::Patch;
283
284 let mut patch = meta_adapter::TaskPatch {
286 input: Patch::Value(task.task.serialize()),
287 next_at: match task.next_at {
288 Some(ts) => Patch::Value(ts),
289 None => Patch::Null,
290 },
291 ..Default::default()
292 };
293
294 if !task.deps.is_empty() {
296 patch.deps = Patch::Value(task.deps.clone());
297 }
298
299 if let Some(ref retry) = task.retry {
301 let retry_str = format!(
302 "{},{},{},{}",
303 task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
304 );
305 patch.retry = Patch::Value(retry_str);
306 }
307
308 if let Some(ref cron) = task.cron {
310 patch.cron = Patch::Value(cron.to_cron_string());
311 }
312
313 self.meta_adapter.update_task(id, &patch).await
314 }
315
316 async fn find_completed_deps(&self, deps: &[TaskId]) -> ClResult<Vec<TaskId>> {
317 self.meta_adapter.find_completed_deps(deps).await
318 }
319}
320
321type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
323
324#[derive(Debug, Clone)]
325pub struct RetryPolicy {
326 wait_min_max: (u64, u64),
327 times: u16,
328}
329
330impl Default for RetryPolicy {
331 fn default() -> Self {
332 Self { wait_min_max: (60, 3600), times: 10 }
333 }
334}
335
336impl RetryPolicy {
337 pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
339 Self { wait_min_max, times }
340 }
341
342 pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
344 let (min, max) = self.wait_min_max;
345 let backoff = min * (1u64 << u64::from(attempt_count));
346 backoff.min(max)
347 }
348
349 pub fn should_retry(&self, attempt_count: u16) -> bool {
351 attempt_count < self.times
352 }
353}
354
355pub struct TaskSchedulerBuilder<'a, S: Clone> {
358 scheduler: &'a Scheduler<S>,
359 task: Arc<dyn Task<S>>,
360 key: Option<String>,
361 next_at: Option<Timestamp>,
362 deps: Vec<TaskId>,
363 retry: Option<RetryPolicy>,
364 cron: Option<CronSchedule>,
365 run_on_startup: bool,
366}
367
368impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
369 fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
371 Self {
372 scheduler,
373 task,
374 key: None,
375 next_at: None,
376 deps: Vec::new(),
377 retry: None,
378 cron: None,
379 run_on_startup: false,
380 }
381 }
382
383 pub fn key(mut self, key: impl Into<String>) -> Self {
385 self.key = Some(key.into());
386 self
387 }
388
389 pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
391 self.next_at = Some(timestamp);
392 self
393 }
394
395 pub fn schedule_after(mut self, seconds: i64) -> Self {
397 self.next_at = Some(Timestamp::from_now(seconds));
398 self
399 }
400
401 pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
403 self.deps = deps;
404 self
405 }
406
407 pub fn depends_on(mut self, dep: TaskId) -> Self {
409 self.deps.push(dep);
410 self
411 }
412
413 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
415 self.retry = Some(policy);
416 self
417 }
418
419 pub fn cron(mut self, expr: impl Into<String>) -> Self {
424 if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
425 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
428 self.cron = Some(cron_schedule);
429 }
430 self
431 }
432
433 pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
436 if hour <= 23 && minute <= 59 {
437 let expr = format!("{} {} * * *", minute, hour);
438 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
439 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
442 self.cron = Some(cron_schedule);
443 }
444 }
445 self
446 }
447
448 pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
452 if weekday <= 6 && hour <= 23 && minute <= 59 {
453 let expr = format!("{} {} * * {}", minute, hour, weekday);
454 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
455 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
458 self.cron = Some(cron_schedule);
459 }
460 }
461 self
462 }
463
464 pub fn run_on_startup(mut self) -> Self {
469 self.run_on_startup = true;
470 self
471 }
472
473 pub async fn now(self) -> ClResult<TaskId> {
475 self.schedule().await
476 }
477
478 pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
480 self.next_at = Some(ts);
481 self.schedule().await
482 }
483
484 pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
486 self.next_at = Some(Timestamp::from_now(seconds));
487 self.schedule().await
488 }
489
490 pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
492 self.deps.push(dep);
493 self.schedule().await
494 }
495
496 pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
498 self.retry = Some(RetryPolicy::default());
499 self.schedule().await
500 }
501
502 pub async fn schedule(self) -> ClResult<TaskId> {
504 self.scheduler
505 .schedule_task_impl(
506 self.task,
507 self.key.as_deref(),
508 self.next_at,
509 if self.deps.is_empty() { None } else { Some(self.deps) },
510 self.retry,
511 self.cron,
512 self.run_on_startup,
513 )
514 .await
515 }
516}
517
518#[derive(Debug, Clone)]
519pub struct TaskMeta<S: Clone> {
520 pub task: Arc<dyn Task<S>>,
521 pub next_at: Option<Timestamp>,
522 pub deps: Vec<TaskId>,
523 retry_count: u16,
524 pub retry: Option<RetryPolicy>,
525 pub cron: Option<CronSchedule>,
526}
527
528type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
529type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
530
531#[derive(Clone)]
533pub struct Scheduler<S: Clone> {
534 task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
535 store: Arc<dyn TaskStore<S>>,
536 tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
537 tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
538 task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
539 tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
540 tx_finish: flume::Sender<TaskId>,
541 rx_finish: flume::Receiver<TaskId>,
542 notify_schedule: Arc<tokio::sync::Notify>,
543}
544
545impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
546 pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
547 let (tx_finish, rx_finish) = flume::unbounded();
548
549 let scheduler = Self {
550 task_builders: Arc::new(RwLock::new(HashMap::new())),
551 store,
552 tasks_running: Arc::new(Mutex::new(HashMap::new())),
553 tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
554 task_dependents: Arc::new(Mutex::new(HashMap::new())),
555 tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
556 tx_finish,
557 rx_finish,
558 notify_schedule: Arc::new(tokio::sync::Notify::new()),
559 };
560
561 Arc::new(scheduler)
564 }
565
566 pub fn start(&self, state: S) {
567 let schedule = self.clone();
569 let stat = state.clone();
570 let rx_finish = self.rx_finish.clone();
571
572 tokio::spawn(async move {
573 while let Ok(id) = rx_finish.recv_async().await {
574 debug!("Completed task {} (notified)", id);
575
576 let task_meta_opt = {
578 let tasks_running = match schedule.tasks_running.lock() {
579 Ok(guard) => guard,
580 Err(poisoned) => {
581 error!("Mutex poisoned: tasks_running (recovering)");
582 poisoned.into_inner()
583 }
584 };
585 tasks_running.get(&id).cloned()
586 };
587
588 if let Some(task_meta) = task_meta_opt {
589 let mut transition_ok = false;
591
592 if let Some(ref cron) = task_meta.cron {
594 let next_at = match cron.next_execution(Timestamp::now()) {
596 Ok(ts) => ts,
597 Err(e) => {
598 error!(
599 "Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
600 id, e
601 );
602 if let Err(e) = schedule.store.finished(id, "").await {
604 error!("Failed to mark task {} as finished: {}", id, e);
605 }
606 continue;
607 }
608 };
609 info!(
610 "Recurring task {} completed, scheduling next execution at {}",
611 id, next_at
612 );
613
614 let mut updated_meta = task_meta.clone();
616 updated_meta.next_at = Some(next_at);
617
618 if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
620 error!("Failed to update recurring task {} next_at: {}", id, e);
621 }
622
623 match schedule.tasks_running.lock() {
625 Ok(mut tasks_running) => {
626 tasks_running.remove(&id);
627 }
628 Err(poisoned) => {
629 error!("Mutex poisoned: tasks_running (recovering)");
630 poisoned.into_inner().remove(&id);
631 }
632 }
633
634 match schedule.add_queue(id, updated_meta).await {
636 Ok(_) => transition_ok = true,
637 Err(e) => {
638 error!(
639 "Failed to reschedule recurring task {}: {} - task lost!",
640 id, e
641 );
642 }
643 }
644 } else {
645 match schedule.store.finished(id, "").await {
647 Ok(()) => transition_ok = true,
648 Err(e) => {
649 error!(
650 "Failed to mark task {} as finished: {} - task remains in running queue",
651 id, e
652 );
653 }
654 }
655 }
656
657 if transition_ok {
659 match schedule.tasks_running.lock() {
660 Ok(mut tasks_running) => {
661 tasks_running.remove(&id);
662 }
663 Err(poisoned) => {
664 error!("Mutex poisoned: tasks_running (recovering)");
665 poisoned.into_inner().remove(&id);
666 }
667 }
668 }
669
670 match schedule.release_dependents(id) {
672 Ok(ready_to_spawn) => {
673 for (dep_id, dep_task_meta) in ready_to_spawn {
674 match schedule.tasks_running.lock() {
676 Ok(mut tasks_running) => {
677 tasks_running.insert(dep_id, dep_task_meta.clone());
678 }
679 Err(poisoned) => {
680 error!("Mutex poisoned: tasks_running (recovering)");
681 poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
682 }
683 }
684 schedule.spawn_task(
685 stat.clone(),
686 dep_task_meta.task.clone(),
687 dep_id,
688 dep_task_meta,
689 );
690 }
691 }
692 Err(e) => {
693 error!("Failed to release dependents of task {}: {}", id, e);
694 }
695 }
696 } else {
697 warn!("Completed task {} not found in running queue", id);
698 }
699 }
700 });
701
702 let schedule = self.clone();
704 tokio::spawn(async move {
705 loop {
706 let is_empty = match schedule.tasks_scheduled.lock() {
707 Ok(guard) => guard.is_empty(),
708 Err(poisoned) => {
709 error!("Mutex poisoned: tasks_scheduled (recovering)");
710 poisoned.into_inner().is_empty()
711 }
712 };
713 if is_empty {
714 schedule.notify_schedule.notified().await;
715 }
716 let time = Timestamp::now();
717 if let Some((timestamp, _id)) = loop {
718 let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
719 Ok(guard) => guard,
720 Err(poisoned) => {
721 error!("Mutex poisoned: tasks_scheduled (recovering)");
722 poisoned.into_inner()
723 }
724 };
725 if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
726 let (timestamp, id) = (timestamp, id);
727 if timestamp <= Timestamp::now() {
728 debug!("Spawning task id {} (from schedule)", id);
729 if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
730 let mut tasks_running = match schedule.tasks_running.lock() {
731 Ok(guard) => guard,
732 Err(poisoned) => {
733 error!("Mutex poisoned: tasks_running (recovering)");
734 poisoned.into_inner()
735 }
736 };
737 tasks_running.insert(id, task.clone());
738 schedule.spawn_task(state.clone(), task.task.clone(), id, task);
739 } else {
740 error!("Task disappeared while being removed from schedule");
741 break None;
742 }
743 } else {
744 break Some((timestamp, id));
745 }
746 } else {
747 break None;
748 }
749 } {
750 let diff = timestamp.0 - time.0;
751 let wait =
752 tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
753 tokio::select! {
754 () = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
755 };
756 }
757 }
758 });
759
760 let schedule = self.clone();
761 tokio::spawn(async move {
762 let _ignore_err = schedule.load().await;
763 });
764 }
765
766 fn register_builder(
767 &self,
768 name: &'static str,
769 builder: &'static TaskBuilder<S>,
770 ) -> ClResult<&Self> {
771 let mut task_builders = self
772 .task_builders
773 .write()
774 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
775 task_builders.insert(name, Box::new(builder));
776 Ok(self)
777 }
778
779 pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
780 info!("Registering task type {}", T::kind());
781 self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
782 Ok(self)
783 }
784
785 pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
787 TaskSchedulerBuilder::new(self, task)
788 }
789
790 #[allow(clippy::too_many_arguments)]
793 async fn schedule_task_impl(
794 &self,
795 task: Arc<dyn Task<S>>,
796 key: Option<&str>,
797 next_at: Option<Timestamp>,
798 deps: Option<Vec<TaskId>>,
799 retry: Option<RetryPolicy>,
800 cron: Option<CronSchedule>,
801 run_on_startup: bool,
802 ) -> ClResult<TaskId> {
803 let existing = if let Some(k) = key { self.store.find_by_key(k).await? } else { None };
806
807 let effective_next_at = if run_on_startup && cron.is_some() {
809 match &existing {
810 Some((_existing_id, existing_data)) => {
811 match existing_data.next_at {
816 Some(persisted) if persisted > Timestamp::now() => next_at,
817 _ => Some(Timestamp::now()),
818 }
819 }
820 None => Some(Timestamp::now()), }
822 } else {
823 next_at
824 };
825
826 let task_meta = TaskMeta {
827 task: task.clone(),
828 next_at: effective_next_at,
829 deps: deps.clone().unwrap_or_default(),
830 retry_count: 0,
831 retry,
832 cron,
833 };
834
835 if let Some(key) = key
837 && let Some((existing_id, existing_data)) = existing
838 {
839 let new_serialized = task.serialize();
840 let existing_serialized = existing_data.input.as_ref();
841
842 if new_serialized == existing_serialized {
844 info!(
845 "Recurring task '{}' already exists with identical parameters (id={})",
846 key, existing_id
847 );
848 self.store.update_task(existing_id, &task_meta).await?;
850 self.add_queue(existing_id, task_meta).await?;
852 return Ok(existing_id);
853 }
854 info!("Updating recurring task '{}' (id={}) - parameters changed", key, existing_id);
855 debug!(" Old params: {}", existing_serialized);
856 debug!(" New params: {}", new_serialized);
857
858 self.remove_from_queues(existing_id)?;
860
861 self.store.update_task(existing_id, &task_meta).await?;
863
864 self.add_queue(existing_id, task_meta).await?;
866
867 return Ok(existing_id);
868 }
869
870 let id = self.store.add(&task_meta, key).await?;
872 self.add_queue(id, task_meta).await
873 }
874
875 pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
876 self.task(task).now().await
877 }
878
879 pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
880 {
883 let mut running = lock!(self.tasks_running, "tasks_running")?;
884 if let Some(existing_meta) = running.get_mut(&id) {
885 debug!(
886 "Task {} is already running, updating metadata (will reschedule on completion)",
887 id
888 );
889 *existing_meta = task_meta;
891 return Ok(id);
892 }
893 }
894
895 {
897 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
898 if let Some(key) = scheduled
899 .iter()
900 .find(|((_, tid), _)| *tid == id)
901 .map(|((ts, tid), _)| (*ts, *tid))
902 {
903 scheduled.remove(&key);
904 debug!("Removed existing scheduled entry for task {} before re-queueing", id);
905 }
906 }
907 {
908 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
909 if waiting.remove(&id).is_some() {
910 debug!("Removed existing waiting entry for task {} before re-queueing", id);
911 }
912 }
913
914 let deps = task_meta.deps.clone();
915
916 if !deps.is_empty() && task_meta.next_at.is_some() {
918 warn!(
919 "Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue",
920 id
921 );
922 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
924 debug!("Task {} is waiting for {:?}", id, &deps);
925 for dep in &deps {
926 lock!(self.task_dependents, "task_dependents")?
927 .entry(*dep)
928 .or_default()
929 .push(id);
930 }
931
932 self.check_and_resolve_completed_deps(id, &deps).await?;
933 return Ok(id);
934 }
935
936 if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
937 debug!("Spawning task {}", id);
938 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
939 self.notify_schedule.notify_one();
940 } else if let Some(next_at) = task_meta.next_at {
941 debug!("Scheduling task {} for {}", id, next_at);
942 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
943 self.notify_schedule.notify_one();
944 } else {
945 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
946 debug!("Task {} is waiting for {:?}", id, &deps);
947 for dep in &deps {
948 lock!(self.task_dependents, "task_dependents")?
949 .entry(*dep)
950 .or_default()
951 .push(id);
952 }
953
954 self.check_and_resolve_completed_deps(id, &deps).await?;
955 }
956 Ok(id)
957 }
958
959 async fn check_and_resolve_completed_deps(&self, id: TaskId, deps: &[TaskId]) -> ClResult<()> {
962 let completed_deps = self.store.find_completed_deps(deps).await?;
963 if completed_deps.is_empty() {
964 return Ok(());
965 }
966 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
967 if let Some(task_meta) = waiting.get_mut(&id) {
968 for dep in &completed_deps {
969 task_meta.deps.retain(|d| *d != *dep);
970 }
971 if task_meta.deps.is_empty()
972 && let Some(ready_task) = waiting.remove(&id)
973 {
974 drop(waiting);
975 let mut dependents = lock!(self.task_dependents, "task_dependents")?;
976 for dep in deps {
977 if let Some(dep_list) = dependents.get_mut(dep) {
978 dep_list.retain(|d| *d != id);
979 if dep_list.is_empty() {
980 dependents.remove(dep);
981 }
982 }
983 }
984 drop(dependents);
985 debug!("Task {} deps already completed, scheduling immediately", id);
986 lock!(self.tasks_scheduled, "tasks_scheduled")?
987 .insert((Timestamp(0), id), ready_task);
988 self.notify_schedule.notify_one();
989 }
990 }
991 Ok(())
992 }
993
994 fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
997 if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
999 debug!("Removed task {} from waiting queue for update", task_id);
1000 return Ok(Some(task_meta));
1001 }
1002
1003 {
1005 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1006 if let Some(key) = scheduled
1007 .iter()
1008 .find(|((_, id), _)| *id == task_id)
1009 .map(|((ts, id), _)| (*ts, *id))
1010 && let Some(task_meta) = scheduled.remove(&key)
1011 {
1012 debug!("Removed task {} from scheduled queue for update", task_id);
1013 return Ok(Some(task_meta));
1014 }
1015 }
1016
1017 if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
1019 warn!("Removed task {} from running queue during update", task_id);
1020 return Ok(Some(task_meta));
1021 }
1022
1023 Ok(None)
1024 }
1025
1026 fn release_dependents(
1029 &self,
1030 completed_task_id: TaskId,
1031 ) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
1032 let dependents = {
1034 let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
1035 deps_map.remove(&completed_task_id).unwrap_or_default()
1036 };
1037
1038 if dependents.is_empty() {
1039 return Ok(Vec::new()); }
1041
1042 debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
1043
1044 let mut ready_to_spawn = Vec::new();
1045
1046 for dependent_id in dependents {
1048 {
1050 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1051 if let Some(task_meta) = waiting.get_mut(&dependent_id) {
1052 task_meta.deps.retain(|x| *x != completed_task_id);
1054
1055 if task_meta.deps.is_empty() {
1057 if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
1058 debug!(
1059 "Dependent task {} ready to spawn (all dependencies cleared)",
1060 dependent_id
1061 );
1062 ready_to_spawn.push((dependent_id, task_to_spawn));
1063 }
1064 } else {
1065 debug!(
1066 "Task {} still has {} remaining dependencies",
1067 dependent_id,
1068 task_meta.deps.len()
1069 );
1070 }
1071 continue;
1072 }
1073 }
1074
1075 {
1077 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
1078 if let Some(scheduled_key) = scheduled
1079 .iter()
1080 .find(|((_, id), _)| *id == dependent_id)
1081 .map(|((ts, id), _)| (*ts, *id))
1082 {
1083 if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
1084 task_meta.deps.retain(|x| *x != completed_task_id);
1085 let remaining = task_meta.deps.len();
1086 if remaining == 0 {
1087 debug!(
1088 "Task {} in scheduled queue has no remaining dependencies",
1089 dependent_id
1090 );
1091 } else {
1092 debug!(
1093 "Task {} in scheduled queue has {} remaining dependencies",
1094 dependent_id, remaining
1095 );
1096 }
1097 }
1098 continue;
1099 }
1100 }
1101
1102 warn!(
1104 "Dependent task {} of completed task {} not found in any queue",
1105 dependent_id, completed_task_id
1106 );
1107 }
1108
1109 Ok(ready_to_spawn)
1110 }
1111
1112 async fn load(&self) -> ClResult<()> {
1113 let tasks = self.store.load().await?;
1114 debug!("Loaded {} tasks from store", tasks.len());
1115 for t in tasks {
1116 if let TaskStatus::Pending = t.status {
1117 debug!("Loading task {} {}", t.id, t.kind);
1118 let task = {
1119 let builder_map = self
1120 .task_builders
1121 .read()
1122 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1123 let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1124 format!("task builder not registered: {}", t.kind),
1125 ))?;
1126 builder(t.id, &t.input)?
1127 };
1128 let (retry_count, retry) = match t.retry_data {
1129 Some(retry_str) => {
1130 let (retry_count, retry_min, retry_max, retry_times) = retry_str
1131 .split(',')
1132 .collect_tuple()
1133 .ok_or(Error::Internal("invalid retry policy format".into()))?;
1134 let retry_count: u16 = retry_count
1135 .parse()
1136 .map_err(|_| Error::Internal("retry count must be u16".into()))?;
1137 let retry = RetryPolicy {
1138 wait_min_max: (
1139 retry_min
1140 .parse()
1141 .map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1142 retry_max
1143 .parse()
1144 .map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1145 ),
1146 times: retry_times
1147 .parse()
1148 .map_err(|_| Error::Internal("retry times must be u64".into()))?,
1149 };
1150 debug!("Loaded retry policy: {:?}", retry);
1151 (retry_count, Some(retry))
1152 }
1153 _ => (0, None),
1154 };
1155 let cron =
1157 t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1158
1159 let task_meta = TaskMeta {
1160 task,
1161 next_at: t.next_at,
1162 deps: t.deps.into(),
1163 retry_count,
1164 retry,
1165 cron,
1166 };
1167 self.add_queue(t.id, task_meta).await?;
1168 }
1169 }
1170 Ok(())
1171 }
1172
1173 fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1174 let tx_finish = self.tx_finish.clone();
1175 let store = self.store.clone();
1176 let scheduler = self.clone();
1177 tokio::spawn(async move {
1179 match task.run(&state).await {
1180 Ok(()) => {
1181 debug!("Task {} completed successfully", id);
1182 tx_finish.send(id).unwrap_or(());
1183 }
1184 Err(e) => {
1185 let is_retryable = e.is_retryable();
1186 if let Some(retry_policy) = &task_meta.retry {
1187 if is_retryable && retry_policy.should_retry(task_meta.retry_count) {
1188 let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1189 let next_at = Timestamp::from_now(backoff.cast_signed());
1190
1191 info!(
1192 "Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1193 id,
1194 task_meta.retry_count + 1,
1195 retry_policy.times,
1196 backoff,
1197 e
1198 );
1199
1200 store
1202 .update_task_error(id, &e.to_string(), Some(next_at))
1203 .await
1204 .unwrap_or(());
1205
1206 match scheduler.tasks_running.lock() {
1208 Ok(mut tasks_running) => {
1209 tasks_running.remove(&id);
1210 }
1211 Err(poisoned) => {
1212 error!("Mutex poisoned: tasks_running (recovering)");
1213 poisoned.into_inner().remove(&id);
1214 }
1215 }
1216
1217 let mut retry_meta = task_meta.clone();
1219 retry_meta.retry_count += 1;
1220 retry_meta.next_at = Some(next_at);
1221 scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1222 } else {
1223 if is_retryable {
1225 error!(
1226 "Task {} failed after {} retries: {}",
1227 id, task_meta.retry_count, e
1228 );
1229 } else {
1230 error!("Task {} failed permanently (non-retryable): {}", id, e);
1231 }
1232 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1233 task.on_failed(&state, task_meta.retry_count, &e.to_string()).await;
1234 tx_finish.send(id).unwrap_or(());
1235 }
1236 } else {
1237 error!("Task {} failed: {}", id, e);
1239 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1240 task.on_failed(&state, 0, &e.to_string()).await;
1241 tx_finish.send(id).unwrap_or(());
1242 }
1243 }
1244 }
1245 });
1246 }
1247
1248 pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1251 let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1252 let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1253 let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1254 let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1255
1256 let mut stuck_tasks = Vec::new();
1258 let mut tasks_with_missing_deps = Vec::new();
1259
1260 {
1262 let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1263 let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1264
1265 for (id, task_meta) in waiting.iter() {
1266 if task_meta.deps.is_empty() {
1267 stuck_tasks.push(*id);
1268 warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1269 } else {
1270 for dep in &task_meta.deps {
1272 let dep_exists = waiting.contains_key(dep)
1273 || self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1274 || self
1275 .tasks_scheduled
1276 .lock()
1277 .ok()
1278 .is_some_and(|s| s.iter().any(|((_, task_id), _)| task_id == dep));
1279
1280 if !dep_exists {
1281 tasks_with_missing_deps.push((*id, *dep));
1282 warn!(
1283 "SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1284 id, dep
1285 );
1286 }
1287 }
1288 }
1289 }
1290 }
1291
1292 Ok(SchedulerHealth {
1293 waiting: waiting_count,
1294 scheduled: scheduled_count,
1295 running: running_count,
1296 dependents: dependents_count,
1297 stuck_tasks,
1298 tasks_with_missing_deps,
1299 })
1300 }
1301}
1302
1303#[derive(Debug, Clone)]
1305pub struct SchedulerHealth {
1306 pub waiting: usize,
1308 pub scheduled: usize,
1310 pub running: usize,
1312 pub dependents: usize,
1314 pub stuck_tasks: Vec<TaskId>,
1316 pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1318}
1319
1320#[cfg(test)]
1321mod tests {
1322 use super::*;
1323 use serde::{Deserialize, Serialize};
1324
1325 type State = Arc<Mutex<Vec<u8>>>;
1326
1327 #[derive(Debug, Serialize, Deserialize)]
1328 struct TestTask {
1329 num: u8,
1330 }
1331
1332 impl TestTask {
1333 pub fn new(num: u8) -> Arc<Self> {
1334 Arc::new(Self { num })
1335 }
1336 }
1337
1338 #[async_trait]
1339 impl Task<State> for TestTask {
1340 fn kind() -> &'static str {
1341 "test"
1342 }
1343
1344 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1345 let num: u8 = ctx
1346 .parse()
1347 .map_err(|_| Error::Internal("test task context must be u8".into()))?;
1348 let task = TestTask::new(num);
1349 Ok(task)
1350 }
1351
1352 fn serialize(&self) -> String {
1353 self.num.to_string()
1354 }
1355
1356 fn kind_of(&self) -> &'static str {
1357 "test"
1358 }
1359
1360 async fn run(&self, state: &State) -> ClResult<()> {
1361 info!("Running task {}", self.num);
1362 tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1363 info!("Completed task {}", self.num);
1364 state.lock().unwrap().push(self.num);
1365 Ok(())
1366 }
1367 }
1368
1369 #[derive(Debug, Clone)]
1370 struct FailingTask {
1371 id: u8,
1372 fail_count: u8,
1373 attempt: Arc<Mutex<u8>>,
1374 }
1375
1376 impl FailingTask {
1377 pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1378 Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1379 }
1380 }
1381
1382 #[async_trait]
1383 impl Task<State> for FailingTask {
1384 fn kind() -> &'static str {
1385 "failing"
1386 }
1387
1388 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1389 let parts: Vec<&str> = ctx.split(',').collect();
1390 if parts.len() != 2 {
1391 return Err(Error::Internal("failing task context must have 2 parts".into()));
1392 }
1393 let id: u8 = parts[0]
1394 .parse()
1395 .map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1396 let fail_count: u8 = parts[1]
1397 .parse()
1398 .map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1399 Ok(FailingTask::new(id, fail_count))
1400 }
1401
1402 fn serialize(&self) -> String {
1403 format!("{},{}", self.id, self.fail_count)
1404 }
1405
1406 fn kind_of(&self) -> &'static str {
1407 "failing"
1408 }
1409
1410 async fn run(&self, state: &State) -> ClResult<()> {
1411 let mut attempt = self.attempt.lock().unwrap();
1412 *attempt += 1;
1413 let current_attempt = *attempt;
1414
1415 info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1416
1417 if current_attempt <= self.fail_count {
1418 error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1419 return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1420 }
1421
1422 info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1423 state.lock().unwrap().push(self.id);
1424 Ok(())
1425 }
1426 }
1427
1428 #[tokio::test]
1429 pub async fn test_scheduler() {
1430 let _ = tracing_subscriber::fmt().try_init();
1431
1432 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1433 let state: State = Arc::new(Mutex::new(Vec::new()));
1434 let scheduler = Scheduler::new(task_store);
1435 scheduler.start(state.clone());
1436 scheduler.register::<TestTask>().unwrap();
1437
1438 let _task1 = TestTask::new(1);
1439 let task2 = TestTask::new(1);
1440 let task3 = TestTask::new(1);
1441
1442 let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1443 let task3_id = scheduler.add(task3).await.unwrap();
1444 scheduler
1445 .task(TestTask::new(1))
1446 .depend_on(vec![task2_id, task3_id])
1447 .schedule()
1448 .await
1449 .unwrap();
1450
1451 tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1452 let task4 = TestTask::new(1);
1453 let task5 = TestTask::new(1);
1454 scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1455 scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1456
1457 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1458
1459 let st = state.lock().unwrap();
1460 info!("res: {}", st.len());
1461 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1462 assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1463 }
1464
1465 #[tokio::test]
1466 pub async fn test_retry_with_backoff() {
1467 let _ = tracing_subscriber::fmt().try_init();
1468
1469 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1470 let state: State = Arc::new(Mutex::new(Vec::new()));
1471 let scheduler = Scheduler::new(task_store);
1472 scheduler.start(state.clone());
1473 scheduler.register::<FailingTask>().unwrap();
1474
1475 let failing_task = FailingTask::new(42, 2);
1478 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1479
1480 scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1481
1482 tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1489
1490 let st = state.lock().unwrap();
1491 assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1492 assert_eq!(st[0], 42);
1493 }
1494
1495 #[tokio::test]
1498 pub async fn test_builder_simple_schedule() {
1499 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1500 let state: State = Arc::new(Mutex::new(Vec::new()));
1501 let scheduler = Scheduler::new(task_store);
1502 scheduler.start(state.clone());
1503 scheduler.register::<TestTask>().unwrap();
1504
1505 let task = TestTask::new(1);
1507 let id = scheduler.task(task).now().await.unwrap();
1508
1509 assert!(id > 0, "Task ID should be positive");
1510
1511 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1512
1513 let st = state.lock().unwrap();
1514 assert_eq!(st.len(), 1, "Task should have executed");
1515 assert_eq!(st[0], 1);
1516 }
1517
1518 #[tokio::test]
1519 pub async fn test_builder_with_key() {
1520 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1521 let state: State = Arc::new(Mutex::new(Vec::new()));
1522 let scheduler = Scheduler::new(task_store);
1523 scheduler.start(state.clone());
1524 scheduler.register::<TestTask>().unwrap();
1525
1526 let task = TestTask::new(1);
1528 let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1529
1530 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1531
1532 let st = state.lock().unwrap();
1533 assert_eq!(st.len(), 1);
1534 assert_eq!(st[0], 1);
1535 }
1536
1537 #[tokio::test]
1538 pub async fn test_builder_with_delay() {
1539 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1540 let state: State = Arc::new(Mutex::new(Vec::new()));
1541 let scheduler = Scheduler::new(task_store);
1542 scheduler.start(state.clone());
1543 scheduler.register::<TestTask>().unwrap();
1544
1545 let task = TestTask::new(1);
1547 let _id = scheduler
1548 .task(task)
1549 .after(1) .await
1551 .unwrap();
1552
1553 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1555 {
1556 let st = state.lock().unwrap();
1557 assert_eq!(st.len(), 0, "Task should not execute yet");
1558 }
1559
1560 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1562
1563 {
1564 let st = state.lock().unwrap();
1565 assert_eq!(st.len(), 1, "Task should have executed");
1566 assert_eq!(st[0], 1);
1567 }
1568 }
1569
1570 #[tokio::test]
1571 pub async fn test_builder_with_dependencies() {
1572 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1573 let state: State = Arc::new(Mutex::new(Vec::new()));
1574 let scheduler = Scheduler::new(task_store);
1575 scheduler.start(state.clone());
1576 scheduler.register::<TestTask>().unwrap();
1577
1578 let task1 = TestTask::new(1);
1580 let id1 = scheduler.task(task1).now().await.unwrap();
1581
1582 let task2 = TestTask::new(1);
1584 let id2 = scheduler.task(task2).now().await.unwrap();
1585
1586 let task3 = TestTask::new(1);
1588 let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1589
1590 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1592
1593 let st = state.lock().unwrap();
1594 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1596 assert_eq!(str_vec.join(":"), "1:1:1");
1597 }
1598
1599 #[tokio::test]
1600 pub async fn test_builder_with_retry() {
1601 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1602 let state: State = Arc::new(Mutex::new(Vec::new()));
1603 let scheduler = Scheduler::new(task_store);
1604 scheduler.start(state.clone());
1605 scheduler.register::<FailingTask>().unwrap();
1606
1607 let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1610
1611 let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1612
1613 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1615
1616 let st = state.lock().unwrap();
1617 assert_eq!(st.len(), 1);
1618 assert_eq!(st[0], 55);
1619 }
1620
1621 #[tokio::test]
1622 pub async fn test_builder_with_automatic_retry() {
1623 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1624 let state: State = Arc::new(Mutex::new(Vec::new()));
1625 let scheduler = Scheduler::new(task_store);
1626 scheduler.start(state.clone());
1627 scheduler.register::<FailingTask>().unwrap();
1628
1629 let failing_task = FailingTask::new(66, 1);
1631 let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1632
1633 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1636
1637 let st = state.lock().unwrap();
1639 let _ = st.len(); }
1643
1644 #[tokio::test]
1645 pub async fn test_builder_fluent_chaining() {
1646 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1647 let state: State = Arc::new(Mutex::new(Vec::new()));
1648 let scheduler = Scheduler::new(task_store);
1649 scheduler.start(state.clone());
1650 scheduler.register::<TestTask>().unwrap();
1651
1652 let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1654 let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1655
1656 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1658
1659 let task = TestTask::new(1);
1660 let _id = scheduler
1661 .task(task)
1662 .key("complex-task")
1663 .schedule_after(0) .depend_on(vec![dep1, dep2])
1665 .with_retry(retry_policy)
1666 .schedule()
1667 .await
1668 .unwrap();
1669
1670 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1671
1672 let st = state.lock().unwrap();
1673 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1675 assert_eq!(str_vec.join(":"), "1:1:1");
1676 }
1677
1678 #[tokio::test]
1679 pub async fn test_builder_backward_compatibility() {
1680 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1681 let state: State = Arc::new(Mutex::new(Vec::new()));
1682 let scheduler = Scheduler::new(task_store);
1683 scheduler.start(state.clone());
1684 scheduler.register::<TestTask>().unwrap();
1685
1686 let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1688
1689 let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1691
1692 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1693
1694 let st = state.lock().unwrap();
1695 assert_eq!(st.len(), 2);
1697 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1698 assert_eq!(str_vec.join(":"), "1:1");
1699 }
1700
1701 #[tokio::test]
1704 pub async fn test_builder_pipeline_scenario() {
1705 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1707 let state: State = Arc::new(Mutex::new(Vec::new()));
1708 let scheduler = Scheduler::new(task_store);
1709 scheduler.start(state.clone());
1710 scheduler.register::<TestTask>().unwrap();
1711
1712 let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1714
1715 let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1717
1718 let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1720
1721 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1723
1724 let st = state.lock().unwrap();
1725 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1727 assert_eq!(str_vec.join(":"), "1:1:1");
1728 }
1729
1730 #[tokio::test]
1731 pub async fn test_builder_multi_dependency_join() {
1732 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1734 let state: State = Arc::new(Mutex::new(Vec::new()));
1735 let scheduler = Scheduler::new(task_store);
1736 scheduler.start(state.clone());
1737 scheduler.register::<TestTask>().unwrap();
1738
1739 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1741 let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1742
1743 let _id3 = scheduler
1745 .task(TestTask::new(1))
1746 .depend_on(vec![id1, id2])
1747 .schedule()
1748 .await
1749 .unwrap();
1750
1751 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1752
1753 let st = state.lock().unwrap();
1754 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1756 assert_eq!(str_vec.join(":"), "1:1:1");
1757 }
1758
1759 #[tokio::test]
1760 pub async fn test_builder_scheduled_task_with_dependencies() {
1761 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1763 let state: State = Arc::new(Mutex::new(Vec::new()));
1764 let scheduler = Scheduler::new(task_store);
1765 scheduler.start(state.clone());
1766 scheduler.register::<TestTask>().unwrap();
1767
1768 let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1770
1771 let ts = Timestamp::from_now(1);
1773 let _task_id = scheduler
1774 .task(TestTask::new(1))
1775 .schedule_at(ts)
1776 .depend_on(vec![dep_id])
1777 .schedule()
1778 .await
1779 .unwrap();
1780
1781 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1783 {
1784 let st = state.lock().unwrap();
1785 assert_eq!(st.len(), 1); }
1787
1788 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1790
1791 {
1792 let st = state.lock().unwrap();
1793 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1794 assert_eq!(str_vec.join(":"), "1:1");
1795 }
1796 }
1797
1798 #[tokio::test]
1799 pub async fn test_builder_mixed_features() {
1800 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1802 let state: State = Arc::new(Mutex::new(Vec::new()));
1803 let scheduler = Scheduler::new(task_store);
1804 scheduler.start(state.clone());
1805 scheduler.register::<TestTask>().unwrap();
1806 scheduler.register::<FailingTask>().unwrap();
1807
1808 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1810
1811 let _id2 = scheduler
1813 .task(TestTask::new(1))
1814 .key("critical-task")
1815 .schedule_after(0)
1816 .depend_on(vec![id1])
1817 .schedule()
1818 .await
1819 .unwrap();
1820
1821 let _id3 = scheduler
1823 .task(FailingTask::new(1, 0)) .key("retryable-task")
1825 .with_retry(RetryPolicy {
1826 wait_min_max: (1, 3600),
1827 times: 3,
1828 })
1829 .schedule()
1830 .await
1831 .unwrap();
1832
1833 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1835
1836 let st = state.lock().unwrap();
1837 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1839 assert_eq!(str_vec.join(":"), "1:1:1");
1840 }
1841
1842 #[tokio::test]
1843 pub async fn test_builder_builder_reuse_not_possible() {
1844 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1846 let _state: State = Arc::new(Mutex::new(Vec::new()));
1847 let scheduler = Scheduler::new(task_store);
1848
1849 let task = TestTask::new(1);
1850 let builder = scheduler.task(task);
1851
1852 let _id = builder.now().await.unwrap();
1858 }
1862
1863 #[tokio::test]
1864 pub async fn test_builder_different_task_types() {
1865 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1867 let state: State = Arc::new(Mutex::new(Vec::new()));
1868 let scheduler = Scheduler::new(task_store);
1869 scheduler.start(state.clone());
1870 scheduler.register::<TestTask>().unwrap();
1871 scheduler.register::<FailingTask>().unwrap();
1872
1873 let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1875
1876 let _id2 = scheduler
1877 .task(FailingTask::new(1, 0)) .key("failing-task")
1879 .now()
1880 .await
1881 .unwrap();
1882
1883 let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1884
1885 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1886
1887 let st = state.lock().unwrap();
1888 assert_eq!(st.len(), 3);
1889 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1890 assert_eq!(str_vec.join(":"), "1:1:1");
1892 }
1893
1894 #[tokio::test]
1899 pub async fn test_builder_cron_placeholder_syntax() {
1900 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1902 let state: State = Arc::new(Mutex::new(Vec::new()));
1903 let scheduler = Scheduler::new(task_store);
1904 scheduler.start(state.clone());
1905 scheduler.register::<TestTask>().unwrap();
1906
1907 let task = TestTask::new(1);
1909 let _id = scheduler
1910 .task(task)
1911 .key("cron-task")
1912 .cron("0 9 * * *") .schedule()
1914 .await
1915 .unwrap();
1916
1917 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1921
1922 let st = state.lock().unwrap();
1923 assert_eq!(st.len(), 0); }
1927
1928 #[tokio::test]
1929 pub async fn test_builder_daily_at_placeholder() {
1930 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1932 let state: State = Arc::new(Mutex::new(Vec::new()));
1933 let scheduler = Scheduler::new(task_store);
1934 scheduler.start(state.clone());
1935 scheduler.register::<TestTask>().unwrap();
1936
1937 let task = TestTask::new(1);
1939 let _id = scheduler
1940 .task(task)
1941 .key("daily-task")
1942 .daily_at(14, 30) .schedule()
1944 .await
1945 .unwrap();
1946
1947 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1950
1951 let st = state.lock().unwrap();
1952 assert_eq!(st.len(), 0);
1955 }
1956
1957 #[tokio::test]
1958 pub async fn test_builder_weekly_at_placeholder() {
1959 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1961 let state: State = Arc::new(Mutex::new(Vec::new()));
1962 let scheduler = Scheduler::new(task_store);
1963 scheduler.start(state.clone());
1964 scheduler.register::<TestTask>().unwrap();
1965
1966 let task = TestTask::new(1);
1968 let _id = scheduler
1969 .task(task)
1970 .key("weekly-task")
1971 .weekly_at(1, 9, 0) .schedule()
1973 .await
1974 .unwrap();
1975
1976 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1979
1980 let st = state.lock().unwrap();
1981 assert_eq!(st.len(), 0);
1984 }
1985
1986 #[tokio::test]
1987 pub async fn test_builder_cron_with_retry() {
1988 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1990 let state: State = Arc::new(Mutex::new(Vec::new()));
1991 let scheduler = Scheduler::new(task_store);
1992 scheduler.start(state.clone());
1993 scheduler.register::<TestTask>().unwrap();
1994
1995 let task = TestTask::new(1);
1997 let _id = scheduler
1998 .task(task)
1999 .key("reliable-scheduled-task")
2000 .daily_at(2, 0) .with_retry(RetryPolicy {
2002 wait_min_max: (60, 3600),
2003 times: 5,
2004 })
2005 .schedule()
2006 .await
2007 .unwrap();
2008
2009 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
2012
2013 let st = state.lock().unwrap();
2014 assert_eq!(st.len(), 0);
2017 }
2018
2019 #[test]
2022 fn test_cron_to_string() {
2023 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2025 assert_eq!(cron.to_cron_string(), "*/5 * * * *");
2026 }
2027
2028 #[tokio::test]
2029 pub async fn test_running_task_not_double_scheduled() {
2030 let _ = tracing_subscriber::fmt().try_init();
2031
2032 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2033 let state: State = Arc::new(Mutex::new(Vec::new()));
2034 let scheduler = Scheduler::new(task_store);
2035 scheduler.start(state.clone());
2036 scheduler.register::<TestTask>().unwrap();
2037
2038 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
2041
2042 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2044
2045 {
2047 let running = scheduler.tasks_running.lock().unwrap();
2048 assert!(running.contains_key(&task_id), "Task should be in running queue");
2049 }
2050
2051 let task_meta = TaskMeta {
2053 task: task.clone(),
2054 next_at: Some(Timestamp::now()),
2055 deps: vec![],
2056 retry_count: 0,
2057 retry: None,
2058 cron: None,
2059 };
2060 let result = scheduler.add_queue(task_id, task_meta).await;
2061
2062 assert!(result.is_ok(), "add_queue should succeed");
2064
2065 {
2067 let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
2068 let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
2069 assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
2070 }
2071
2072 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2074
2075 let st = state.lock().unwrap();
2077 assert_eq!(st.len(), 1, "Only one task execution should have occurred");
2078 assert_eq!(st[0], 5);
2079 }
2080
2081 #[tokio::test]
2082 pub async fn test_running_task_metadata_updated() {
2083 let _ = tracing_subscriber::fmt().try_init();
2084
2085 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
2086 let state: State = Arc::new(Mutex::new(Vec::new()));
2087 let scheduler = Scheduler::new(task_store);
2088 scheduler.start(state.clone());
2089 scheduler.register::<TestTask>().unwrap();
2090
2091 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
2094
2095 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
2097
2098 {
2100 let running = scheduler.tasks_running.lock().unwrap();
2101 let meta = running.get(&task_id).expect("Task should be running");
2102 assert!(meta.cron.is_none(), "Task should have no cron initially");
2103 }
2104
2105 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2107 let task_meta_with_cron = TaskMeta {
2108 task: task.clone(),
2109 next_at: Some(Timestamp::now()),
2110 deps: vec![],
2111 retry_count: 0,
2112 retry: None,
2113 cron: Some(cron.clone()),
2114 };
2115 let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2116
2117 assert!(result.is_ok(), "add_queue should succeed");
2119
2120 {
2122 let running = scheduler.tasks_running.lock().unwrap();
2123 let meta = running.get(&task_id).expect("Task should still be running");
2124 assert!(meta.cron.is_some(), "Task should now have cron after update");
2125 }
2126
2127 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2129 }
2130}
2131
2132