1use async_trait::async_trait;
7use itertools::Itertools;
8use std::{
9 collections::{BTreeMap, HashMap},
10 fmt::Debug,
11 sync::{Arc, Mutex, RwLock},
12};
13
14use chrono::{DateTime, Utc};
15use croner::Cron;
16use std::str::FromStr;
17
18use crate::prelude::*;
19use cloudillo_types::{lock, meta_adapter};
20
21pub type TaskId = u64;
22
23pub enum TaskType {
24 Periodic,
25 Once,
26}
27
28#[derive(Debug, Clone)]
31pub struct CronSchedule {
32 expr: Box<str>,
34 cron: Cron,
36}
37
38impl CronSchedule {
39 pub fn parse(expr: &str) -> ClResult<Self> {
41 let cron = Cron::from_str(expr)
42 .map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
43 Ok(Self { expr: expr.into(), cron })
44 }
45
46 pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
51 let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
52
53 self.cron
54 .find_next_occurrence(&dt, false)
55 .map(|next| Timestamp(next.timestamp()))
56 .map_err(|e| {
57 tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
58 Error::ValidationError(format!("cron next_execution failed: {}", e))
59 })
60 }
61
62 pub fn to_cron_string(&self) -> String {
64 self.expr.to_string()
65 }
66}
67
68impl PartialEq for CronSchedule {
69 fn eq(&self, other: &Self) -> bool {
70 self.expr == other.expr
71 }
72}
73
74impl Eq for CronSchedule {}
75
76#[async_trait]
77pub trait Task<S: Clone>: Send + Sync + Debug {
78 fn kind() -> &'static str
79 where
80 Self: Sized;
81 fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
82 where
83 Self: Sized;
84 fn serialize(&self) -> String;
85 async fn run(&self, state: &S) -> ClResult<()>;
86
87 fn kind_of(&self) -> &'static str;
88}
89
90#[derive(Debug)]
91pub enum TaskStatus {
92 Pending,
93 Completed,
94 Failed,
95}
96
97pub struct TaskData {
98 id: TaskId,
99 kind: Box<str>,
100 status: TaskStatus,
101 input: Box<str>,
102 deps: Box<[TaskId]>,
103 retry_data: Option<Box<str>>,
104 cron_data: Option<Box<str>>,
105 next_at: Option<Timestamp>,
106}
107
108#[async_trait]
109pub trait TaskStore<S: Clone>: Send + Sync {
110 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
111 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
112 async fn load(&self) -> ClResult<Vec<TaskData>>;
113 async fn update_task_error(
114 &self,
115 task_id: TaskId,
116 output: &str,
117 next_at: Option<Timestamp>,
118 ) -> ClResult<()>;
119 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
120 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
121}
122
123pub struct InMemoryTaskStore {
126 last_id: Mutex<TaskId>,
127}
128
129impl InMemoryTaskStore {
130 pub fn new() -> Arc<Self> {
131 Arc::new(Self { last_id: Mutex::new(0) })
132 }
133}
134
135#[async_trait]
136impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
137 async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
138 let mut last_id = lock!(self.last_id)?;
139 *last_id += 1;
140 Ok(*last_id)
141 }
142
143 async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
144 Ok(())
145 }
146
147 async fn load(&self) -> ClResult<Vec<TaskData>> {
148 Ok(vec![])
149 }
150
151 async fn update_task_error(
152 &self,
153 _task_id: TaskId,
154 _output: &str,
155 _next_at: Option<Timestamp>,
156 ) -> ClResult<()> {
157 Ok(())
158 }
159
160 async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
161 Ok(None)
163 }
164
165 async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
166 Ok(())
168 }
169}
170
171pub struct MetaAdapterTaskStore {
174 meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
175}
176
177impl MetaAdapterTaskStore {
178 pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
179 Arc::new(Self { meta_adapter })
180 }
181}
182
183#[async_trait]
184impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
185 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
186 let id = self
187 .meta_adapter
188 .create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
189 .await?;
190
191 if let Some(cron) = &task.cron {
193 self.meta_adapter
194 .update_task(
195 id,
196 &meta_adapter::TaskPatch {
197 cron: Patch::Value(cron.to_cron_string()),
198 ..Default::default()
199 },
200 )
201 .await?;
202 }
203
204 Ok(id)
205 }
206
207 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
208 self.meta_adapter.update_task_finished(id, output).await
209 }
210
211 async fn load(&self) -> ClResult<Vec<TaskData>> {
212 let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
213 let tasks = tasks
214 .into_iter()
215 .map(|t| TaskData {
216 id: t.task_id,
217 kind: t.kind,
218 status: match t.status {
219 'P' => TaskStatus::Pending,
220 'F' => TaskStatus::Completed,
221 _ => TaskStatus::Failed,
223 },
224 input: t.input,
225 deps: t.deps,
226 retry_data: t.retry,
227 cron_data: t.cron,
228 next_at: t.next_at,
229 })
230 .collect();
231 Ok(tasks)
232 }
233
234 async fn update_task_error(
235 &self,
236 task_id: TaskId,
237 output: &str,
238 next_at: Option<Timestamp>,
239 ) -> ClResult<()> {
240 self.meta_adapter.update_task_error(task_id, output, next_at).await
241 }
242
243 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
244 let task_opt = self.meta_adapter.find_task_by_key(key).await?;
245
246 match task_opt {
247 Some(t) => Ok(Some((
248 t.task_id,
249 TaskData {
250 id: t.task_id,
251 kind: t.kind,
252 status: match t.status {
253 'P' => TaskStatus::Pending,
254 'F' => TaskStatus::Completed,
255 _ => TaskStatus::Failed,
257 },
258 input: t.input,
259 deps: t.deps,
260 retry_data: t.retry,
261 cron_data: t.cron,
262 next_at: t.next_at,
263 },
264 ))),
265 None => Ok(None),
266 }
267 }
268
269 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
270 use cloudillo_types::types::Patch;
271
272 let mut patch = meta_adapter::TaskPatch {
274 input: Patch::Value(task.task.serialize()),
275 next_at: match task.next_at {
276 Some(ts) => Patch::Value(ts),
277 None => Patch::Null,
278 },
279 ..Default::default()
280 };
281
282 if !task.deps.is_empty() {
284 patch.deps = Patch::Value(task.deps.clone());
285 }
286
287 if let Some(ref retry) = task.retry {
289 let retry_str = format!(
290 "{},{},{},{}",
291 task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
292 );
293 patch.retry = Patch::Value(retry_str);
294 }
295
296 if let Some(ref cron) = task.cron {
298 patch.cron = Patch::Value(cron.to_cron_string());
299 }
300
301 self.meta_adapter.update_task(id, &patch).await
302 }
303}
304
305type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
307
308#[derive(Debug, Clone)]
309pub struct RetryPolicy {
310 wait_min_max: (u64, u64),
311 times: u16,
312}
313
314impl Default for RetryPolicy {
315 fn default() -> Self {
316 Self { wait_min_max: (60, 3600), times: 10 }
317 }
318}
319
320impl RetryPolicy {
321 pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
323 Self { wait_min_max, times }
324 }
325
326 pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
328 let (min, max) = self.wait_min_max;
329 let backoff = min * (1u64 << u64::from(attempt_count));
330 backoff.min(max)
331 }
332
333 pub fn should_retry(&self, attempt_count: u16) -> bool {
335 attempt_count < self.times
336 }
337}
338
339pub struct TaskSchedulerBuilder<'a, S: Clone> {
342 scheduler: &'a Scheduler<S>,
343 task: Arc<dyn Task<S>>,
344 key: Option<String>,
345 next_at: Option<Timestamp>,
346 deps: Vec<TaskId>,
347 retry: Option<RetryPolicy>,
348 cron: Option<CronSchedule>,
349}
350
351impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
352 fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
354 Self {
355 scheduler,
356 task,
357 key: None,
358 next_at: None,
359 deps: Vec::new(),
360 retry: None,
361 cron: None,
362 }
363 }
364
365 pub fn key(mut self, key: impl Into<String>) -> Self {
367 self.key = Some(key.into());
368 self
369 }
370
371 pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
373 self.next_at = Some(timestamp);
374 self
375 }
376
377 pub fn schedule_after(mut self, seconds: i64) -> Self {
379 self.next_at = Some(Timestamp::from_now(seconds));
380 self
381 }
382
383 pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
385 self.deps = deps;
386 self
387 }
388
389 pub fn depends_on(mut self, dep: TaskId) -> Self {
391 self.deps.push(dep);
392 self
393 }
394
395 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
397 self.retry = Some(policy);
398 self
399 }
400
401 pub fn cron(mut self, expr: impl Into<String>) -> Self {
406 if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
407 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
410 self.cron = Some(cron_schedule);
411 }
412 self
413 }
414
415 pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
418 if hour <= 23 && minute <= 59 {
419 let expr = format!("{} {} * * *", minute, hour);
420 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
421 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
424 self.cron = Some(cron_schedule);
425 }
426 }
427 self
428 }
429
430 pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
434 if weekday <= 6 && hour <= 23 && minute <= 59 {
435 let expr = format!("{} {} * * {}", minute, hour, weekday);
436 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
437 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
440 self.cron = Some(cron_schedule);
441 }
442 }
443 self
444 }
445
446 pub async fn now(self) -> ClResult<TaskId> {
448 self.schedule().await
449 }
450
451 pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
453 self.next_at = Some(ts);
454 self.schedule().await
455 }
456
457 pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
459 self.next_at = Some(Timestamp::from_now(seconds));
460 self.schedule().await
461 }
462
463 pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
465 self.deps.push(dep);
466 self.schedule().await
467 }
468
469 pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
471 self.retry = Some(RetryPolicy::default());
472 self.schedule().await
473 }
474
475 pub async fn schedule(self) -> ClResult<TaskId> {
477 self.scheduler
478 .schedule_task_impl(
479 self.task,
480 self.key.as_deref(),
481 self.next_at,
482 if self.deps.is_empty() { None } else { Some(self.deps) },
483 self.retry,
484 self.cron,
485 )
486 .await
487 }
488}
489
490#[derive(Debug, Clone)]
491pub struct TaskMeta<S: Clone> {
492 pub task: Arc<dyn Task<S>>,
493 pub next_at: Option<Timestamp>,
494 pub deps: Vec<TaskId>,
495 retry_count: u16,
496 pub retry: Option<RetryPolicy>,
497 pub cron: Option<CronSchedule>,
498}
499
500type TaskBuilderRegistry<S> = HashMap<&'static str, Box<TaskBuilder<S>>>;
501type ScheduledTaskMap<S> = BTreeMap<(Timestamp, TaskId), TaskMeta<S>>;
502
503#[derive(Clone)]
505pub struct Scheduler<S: Clone> {
506 task_builders: Arc<RwLock<TaskBuilderRegistry<S>>>,
507 store: Arc<dyn TaskStore<S>>,
508 tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
509 tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
510 task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
511 tasks_scheduled: Arc<Mutex<ScheduledTaskMap<S>>>,
512 tx_finish: flume::Sender<TaskId>,
513 rx_finish: flume::Receiver<TaskId>,
514 notify_schedule: Arc<tokio::sync::Notify>,
515}
516
517impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
518 pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
519 let (tx_finish, rx_finish) = flume::unbounded();
520
521 let scheduler = Self {
522 task_builders: Arc::new(RwLock::new(HashMap::new())),
523 store,
524 tasks_running: Arc::new(Mutex::new(HashMap::new())),
525 tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
526 task_dependents: Arc::new(Mutex::new(HashMap::new())),
527 tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
528 tx_finish,
529 rx_finish,
530 notify_schedule: Arc::new(tokio::sync::Notify::new()),
531 };
532
533 Arc::new(scheduler)
536 }
537
538 pub fn start(&self, state: S) {
539 let schedule = self.clone();
541 let stat = state.clone();
542 let rx_finish = self.rx_finish.clone();
543
544 tokio::spawn(async move {
545 while let Ok(id) = rx_finish.recv_async().await {
546 debug!("Completed task {} (notified)", id);
547
548 let task_meta_opt = {
550 let tasks_running = match schedule.tasks_running.lock() {
551 Ok(guard) => guard,
552 Err(poisoned) => {
553 error!("Mutex poisoned: tasks_running (recovering)");
554 poisoned.into_inner()
555 }
556 };
557 tasks_running.get(&id).cloned()
558 };
559
560 if let Some(task_meta) = task_meta_opt {
561 let mut transition_ok = false;
563
564 if let Some(ref cron) = task_meta.cron {
566 let next_at = match cron.next_execution(Timestamp::now()) {
568 Ok(ts) => ts,
569 Err(e) => {
570 error!(
571 "Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
572 id, e
573 );
574 if let Err(e) = schedule.store.finished(id, "").await {
576 error!("Failed to mark task {} as finished: {}", id, e);
577 }
578 continue;
579 }
580 };
581 info!(
582 "Recurring task {} completed, scheduling next execution at {}",
583 id, next_at
584 );
585
586 let mut updated_meta = task_meta.clone();
588 updated_meta.next_at = Some(next_at);
589
590 if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
592 error!("Failed to update recurring task {} next_at: {}", id, e);
593 }
594
595 match schedule.tasks_running.lock() {
597 Ok(mut tasks_running) => {
598 tasks_running.remove(&id);
599 }
600 Err(poisoned) => {
601 error!("Mutex poisoned: tasks_running (recovering)");
602 poisoned.into_inner().remove(&id);
603 }
604 }
605
606 match schedule.add_queue(id, updated_meta).await {
608 Ok(_) => transition_ok = true,
609 Err(e) => {
610 error!(
611 "Failed to reschedule recurring task {}: {} - task lost!",
612 id, e
613 );
614 }
615 }
616 } else {
617 match schedule.store.finished(id, "").await {
619 Ok(()) => transition_ok = true,
620 Err(e) => {
621 error!(
622 "Failed to mark task {} as finished: {} - task remains in running queue",
623 id, e
624 );
625 }
626 }
627 }
628
629 if transition_ok {
631 match schedule.tasks_running.lock() {
632 Ok(mut tasks_running) => {
633 tasks_running.remove(&id);
634 }
635 Err(poisoned) => {
636 error!("Mutex poisoned: tasks_running (recovering)");
637 poisoned.into_inner().remove(&id);
638 }
639 }
640 }
641
642 match schedule.release_dependents(id) {
644 Ok(ready_to_spawn) => {
645 for (dep_id, dep_task_meta) in ready_to_spawn {
646 match schedule.tasks_running.lock() {
648 Ok(mut tasks_running) => {
649 tasks_running.insert(dep_id, dep_task_meta.clone());
650 }
651 Err(poisoned) => {
652 error!("Mutex poisoned: tasks_running (recovering)");
653 poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
654 }
655 }
656 schedule.spawn_task(
657 stat.clone(),
658 dep_task_meta.task.clone(),
659 dep_id,
660 dep_task_meta,
661 );
662 }
663 }
664 Err(e) => {
665 error!("Failed to release dependents of task {}: {}", id, e);
666 }
667 }
668 } else {
669 warn!("Completed task {} not found in running queue", id);
670 }
671 }
672 });
673
674 let schedule = self.clone();
676 tokio::spawn(async move {
677 loop {
678 let is_empty = match schedule.tasks_scheduled.lock() {
679 Ok(guard) => guard.is_empty(),
680 Err(poisoned) => {
681 error!("Mutex poisoned: tasks_scheduled (recovering)");
682 poisoned.into_inner().is_empty()
683 }
684 };
685 if is_empty {
686 schedule.notify_schedule.notified().await;
687 }
688 let time = Timestamp::now();
689 if let Some((timestamp, _id)) = loop {
690 let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
691 Ok(guard) => guard,
692 Err(poisoned) => {
693 error!("Mutex poisoned: tasks_scheduled (recovering)");
694 poisoned.into_inner()
695 }
696 };
697 if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
698 let (timestamp, id) = (timestamp, id);
699 if timestamp <= Timestamp::now() {
700 debug!("Spawning task id {} (from schedule)", id);
701 if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
702 let mut tasks_running = match schedule.tasks_running.lock() {
703 Ok(guard) => guard,
704 Err(poisoned) => {
705 error!("Mutex poisoned: tasks_running (recovering)");
706 poisoned.into_inner()
707 }
708 };
709 tasks_running.insert(id, task.clone());
710 schedule.spawn_task(state.clone(), task.task.clone(), id, task);
711 } else {
712 error!("Task disappeared while being removed from schedule");
713 break None;
714 }
715 } else {
716 break Some((timestamp, id));
717 }
718 } else {
719 break None;
720 }
721 } {
722 let diff = timestamp.0 - time.0;
723 let wait =
724 tokio::time::Duration::from_secs(u64::try_from(diff).unwrap_or_default());
725 tokio::select! {
726 () = tokio::time::sleep(wait) => (), () = schedule.notify_schedule.notified() => ()
727 };
728 }
729 }
730 });
731
732 let schedule = self.clone();
733 tokio::spawn(async move {
734 let _ignore_err = schedule.load().await;
735 });
736 }
737
738 fn register_builder(
739 &self,
740 name: &'static str,
741 builder: &'static TaskBuilder<S>,
742 ) -> ClResult<&Self> {
743 let mut task_builders = self
744 .task_builders
745 .write()
746 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
747 task_builders.insert(name, Box::new(builder));
748 Ok(self)
749 }
750
751 pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
752 info!("Registering task type {}", T::kind());
753 self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
754 Ok(self)
755 }
756
757 pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
759 TaskSchedulerBuilder::new(self, task)
760 }
761
762 async fn schedule_task_impl(
765 &self,
766 task: Arc<dyn Task<S>>,
767 key: Option<&str>,
768 next_at: Option<Timestamp>,
769 deps: Option<Vec<TaskId>>,
770 retry: Option<RetryPolicy>,
771 cron: Option<CronSchedule>,
772 ) -> ClResult<TaskId> {
773 let task_meta = TaskMeta {
774 task: task.clone(),
775 next_at,
776 deps: deps.clone().unwrap_or_default(),
777 retry_count: 0,
778 retry,
779 cron,
780 };
781
782 if let Some(key) = key {
784 if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
785 let new_serialized = task.serialize();
786 let existing_serialized = existing_data.input.as_ref();
787
788 if new_serialized == existing_serialized {
790 info!(
791 "Recurring task '{}' already exists with identical parameters (id={})",
792 key, existing_id
793 );
794 self.store.update_task(existing_id, &task_meta).await?;
796 self.add_queue(existing_id, task_meta).await?;
798 return Ok(existing_id);
799 }
800 info!(
801 "Updating recurring task '{}' (id={}) - parameters changed",
802 key, existing_id
803 );
804 info!(" Old params: {}", existing_serialized);
805 info!(" New params: {}", new_serialized);
806
807 self.remove_from_queues(existing_id)?;
809
810 self.store.update_task(existing_id, &task_meta).await?;
812
813 self.add_queue(existing_id, task_meta).await?;
815
816 return Ok(existing_id);
817 }
818 }
819
820 let id = self.store.add(&task_meta, key).await?;
822 self.add_queue(id, task_meta).await
823 }
824
825 pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
826 self.task(task).now().await
827 }
828
829 pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
830 {
833 let mut running = lock!(self.tasks_running, "tasks_running")?;
834 if let Some(existing_meta) = running.get_mut(&id) {
835 debug!(
836 "Task {} is already running, updating metadata (will reschedule on completion)",
837 id
838 );
839 *existing_meta = task_meta;
841 return Ok(id);
842 }
843 }
844
845 {
847 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
848 if let Some(key) = scheduled
849 .iter()
850 .find(|((_, tid), _)| *tid == id)
851 .map(|((ts, tid), _)| (*ts, *tid))
852 {
853 scheduled.remove(&key);
854 debug!("Removed existing scheduled entry for task {} before re-queueing", id);
855 }
856 }
857 {
858 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
859 if waiting.remove(&id).is_some() {
860 debug!("Removed existing waiting entry for task {} before re-queueing", id);
861 }
862 }
863
864 let deps = task_meta.deps.clone();
865
866 if !deps.is_empty() && task_meta.next_at.is_some() {
868 warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
869 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
871 debug!("Task {} is waiting for {:?}", id, &deps);
872 for dep in deps {
873 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
874 }
875 return Ok(id);
876 }
877
878 if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
879 debug!("Spawning task {}", id);
880 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
881 self.notify_schedule.notify_one();
882 } else if let Some(next_at) = task_meta.next_at {
883 debug!("Scheduling task {} for {}", id, next_at);
884 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
885 self.notify_schedule.notify_one();
886 } else {
887 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
888 debug!("Task {} is waiting for {:?}", id, &deps);
889 for dep in deps {
890 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
891 }
892 }
893 Ok(id)
894 }
895
896 fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
899 if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
901 debug!("Removed task {} from waiting queue for update", task_id);
902 return Ok(Some(task_meta));
903 }
904
905 {
907 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
908 if let Some(key) = scheduled
909 .iter()
910 .find(|((_, id), _)| *id == task_id)
911 .map(|((ts, id), _)| (*ts, *id))
912 {
913 if let Some(task_meta) = scheduled.remove(&key) {
914 debug!("Removed task {} from scheduled queue for update", task_id);
915 return Ok(Some(task_meta));
916 }
917 }
918 }
919
920 if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
922 warn!("Removed task {} from running queue during update", task_id);
923 return Ok(Some(task_meta));
924 }
925
926 Ok(None)
927 }
928
929 fn release_dependents(
932 &self,
933 completed_task_id: TaskId,
934 ) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
935 let dependents = {
937 let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
938 deps_map.remove(&completed_task_id).unwrap_or_default()
939 };
940
941 if dependents.is_empty() {
942 return Ok(Vec::new()); }
944
945 debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
946
947 let mut ready_to_spawn = Vec::new();
948
949 for dependent_id in dependents {
951 {
953 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
954 if let Some(task_meta) = waiting.get_mut(&dependent_id) {
955 task_meta.deps.retain(|x| *x != completed_task_id);
957
958 if task_meta.deps.is_empty() {
960 if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
961 debug!(
962 "Dependent task {} ready to spawn (all dependencies cleared)",
963 dependent_id
964 );
965 ready_to_spawn.push((dependent_id, task_to_spawn));
966 }
967 } else {
968 debug!(
969 "Task {} still has {} remaining dependencies",
970 dependent_id,
971 task_meta.deps.len()
972 );
973 }
974 continue;
975 }
976 }
977
978 {
980 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
981 if let Some(scheduled_key) = scheduled
982 .iter()
983 .find(|((_, id), _)| *id == dependent_id)
984 .map(|((ts, id), _)| (*ts, *id))
985 {
986 if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
987 task_meta.deps.retain(|x| *x != completed_task_id);
988 let remaining = task_meta.deps.len();
989 if remaining == 0 {
990 debug!(
991 "Task {} in scheduled queue has no remaining dependencies",
992 dependent_id
993 );
994 } else {
995 debug!(
996 "Task {} in scheduled queue has {} remaining dependencies",
997 dependent_id, remaining
998 );
999 }
1000 }
1001 continue;
1002 }
1003 }
1004
1005 warn!(
1007 "Dependent task {} of completed task {} not found in any queue",
1008 dependent_id, completed_task_id
1009 );
1010 }
1011
1012 Ok(ready_to_spawn)
1013 }
1014
1015 async fn load(&self) -> ClResult<()> {
1016 let tasks = self.store.load().await?;
1017 debug!("Loaded {} tasks from store", tasks.len());
1018 for t in tasks {
1019 if let TaskStatus::Pending = t.status {
1020 debug!("Loading task {} {}", t.id, t.kind);
1021 let task = {
1022 let builder_map = self
1023 .task_builders
1024 .read()
1025 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1026 let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1027 format!("task builder not registered: {}", t.kind),
1028 ))?;
1029 builder(t.id, &t.input)?
1030 };
1031 let (retry_count, retry) = match t.retry_data {
1032 Some(retry_str) => {
1033 let (retry_count, retry_min, retry_max, retry_times) = retry_str
1034 .split(',')
1035 .collect_tuple()
1036 .ok_or(Error::Internal("invalid retry policy format".into()))?;
1037 let retry_count: u16 = retry_count
1038 .parse()
1039 .map_err(|_| Error::Internal("retry count must be u16".into()))?;
1040 let retry = RetryPolicy {
1041 wait_min_max: (
1042 retry_min
1043 .parse()
1044 .map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1045 retry_max
1046 .parse()
1047 .map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1048 ),
1049 times: retry_times
1050 .parse()
1051 .map_err(|_| Error::Internal("retry times must be u64".into()))?,
1052 };
1053 debug!("Loaded retry policy: {:?}", retry);
1054 (retry_count, Some(retry))
1055 }
1056 _ => (0, None),
1057 };
1058 let cron =
1060 t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1061
1062 let task_meta = TaskMeta {
1063 task,
1064 next_at: t.next_at,
1065 deps: t.deps.into(),
1066 retry_count,
1067 retry,
1068 cron,
1069 };
1070 self.add_queue(t.id, task_meta).await?;
1071 }
1072 }
1073 Ok(())
1074 }
1075
1076 fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1077 let tx_finish = self.tx_finish.clone();
1078 let store = self.store.clone();
1079 let scheduler = self.clone();
1080 tokio::spawn(async move {
1082 match task.run(&state).await {
1083 Ok(()) => {
1084 debug!("Task {} completed successfully", id);
1085 tx_finish.send(id).unwrap_or(());
1086 }
1087 Err(e) => {
1088 if let Some(retry_policy) = &task_meta.retry {
1089 if retry_policy.should_retry(task_meta.retry_count) {
1090 let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1091 let next_at = Timestamp::from_now(backoff.cast_signed());
1092
1093 info!(
1094 "Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1095 id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1096 );
1097
1098 store
1100 .update_task_error(id, &e.to_string(), Some(next_at))
1101 .await
1102 .unwrap_or(());
1103
1104 match scheduler.tasks_running.lock() {
1106 Ok(mut tasks_running) => {
1107 tasks_running.remove(&id);
1108 }
1109 Err(poisoned) => {
1110 error!("Mutex poisoned: tasks_running (recovering)");
1111 poisoned.into_inner().remove(&id);
1112 }
1113 }
1114
1115 let mut retry_meta = task_meta.clone();
1117 retry_meta.retry_count += 1;
1118 retry_meta.next_at = Some(next_at);
1119 scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1120 } else {
1121 error!(
1123 "Task {} failed after {} retries: {}",
1124 id, task_meta.retry_count, e
1125 );
1126 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1127 tx_finish.send(id).unwrap_or(());
1128 }
1129 } else {
1130 error!("Task {} failed: {}", id, e);
1132 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1133 tx_finish.send(id).unwrap_or(());
1134 }
1135 }
1136 }
1137 });
1138 }
1139
1140 pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1143 let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1144 let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1145 let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1146 let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1147
1148 let mut stuck_tasks = Vec::new();
1150 let mut tasks_with_missing_deps = Vec::new();
1151
1152 {
1154 let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1155 let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1156
1157 for (id, task_meta) in waiting.iter() {
1158 if task_meta.deps.is_empty() {
1159 stuck_tasks.push(*id);
1160 warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1161 } else {
1162 for dep in &task_meta.deps {
1164 let dep_exists =
1166 self.tasks_running.lock().ok().is_some_and(|r| r.contains_key(dep))
1167 || self
1168 .tasks_waiting
1169 .lock()
1170 .ok()
1171 .is_some_and(|w| w.contains_key(dep))
1172 || self.tasks_scheduled.lock().ok().is_some_and(|s| {
1173 s.iter().any(|((_, task_id), _)| task_id == dep)
1174 });
1175
1176 if !dep_exists {
1177 tasks_with_missing_deps.push((*id, *dep));
1178 warn!(
1179 "SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1180 id, dep
1181 );
1182 }
1183 }
1184 }
1185 }
1186 }
1187
1188 Ok(SchedulerHealth {
1189 waiting: waiting_count,
1190 scheduled: scheduled_count,
1191 running: running_count,
1192 dependents: dependents_count,
1193 stuck_tasks,
1194 tasks_with_missing_deps,
1195 })
1196 }
1197}
1198
1199#[derive(Debug, Clone)]
1201pub struct SchedulerHealth {
1202 pub waiting: usize,
1204 pub scheduled: usize,
1206 pub running: usize,
1208 pub dependents: usize,
1210 pub stuck_tasks: Vec<TaskId>,
1212 pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219 use serde::{Deserialize, Serialize};
1220
1221 type State = Arc<Mutex<Vec<u8>>>;
1222
1223 #[derive(Debug, Serialize, Deserialize)]
1224 struct TestTask {
1225 num: u8,
1226 }
1227
1228 impl TestTask {
1229 pub fn new(num: u8) -> Arc<Self> {
1230 Arc::new(Self { num })
1231 }
1232 }
1233
1234 #[async_trait]
1235 impl Task<State> for TestTask {
1236 fn kind() -> &'static str {
1237 "test"
1238 }
1239
1240 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1241 let num: u8 = ctx
1242 .parse()
1243 .map_err(|_| Error::Internal("test task context must be u8".into()))?;
1244 let task = TestTask::new(num);
1245 Ok(task)
1246 }
1247
1248 fn serialize(&self) -> String {
1249 self.num.to_string()
1250 }
1251
1252 fn kind_of(&self) -> &'static str {
1253 "test"
1254 }
1255
1256 async fn run(&self, state: &State) -> ClResult<()> {
1257 info!("Running task {}", self.num);
1258 tokio::time::sleep(std::time::Duration::from_millis(200 * u64::from(self.num))).await;
1259 info!("Completed task {}", self.num);
1260 state.lock().unwrap().push(self.num);
1261 Ok(())
1262 }
1263 }
1264
1265 #[derive(Debug, Clone)]
1266 struct FailingTask {
1267 id: u8,
1268 fail_count: u8,
1269 attempt: Arc<Mutex<u8>>,
1270 }
1271
1272 impl FailingTask {
1273 pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1274 Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1275 }
1276 }
1277
1278 #[async_trait]
1279 impl Task<State> for FailingTask {
1280 fn kind() -> &'static str {
1281 "failing"
1282 }
1283
1284 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1285 let parts: Vec<&str> = ctx.split(',').collect();
1286 if parts.len() != 2 {
1287 return Err(Error::Internal("failing task context must have 2 parts".into()));
1288 }
1289 let id: u8 = parts[0]
1290 .parse()
1291 .map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1292 let fail_count: u8 = parts[1]
1293 .parse()
1294 .map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1295 Ok(FailingTask::new(id, fail_count))
1296 }
1297
1298 fn serialize(&self) -> String {
1299 format!("{},{}", self.id, self.fail_count)
1300 }
1301
1302 fn kind_of(&self) -> &'static str {
1303 "failing"
1304 }
1305
1306 async fn run(&self, state: &State) -> ClResult<()> {
1307 let mut attempt = self.attempt.lock().unwrap();
1308 *attempt += 1;
1309 let current_attempt = *attempt;
1310
1311 info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1312
1313 if current_attempt <= self.fail_count {
1314 error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1315 return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1316 }
1317
1318 info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1319 state.lock().unwrap().push(self.id);
1320 Ok(())
1321 }
1322 }
1323
1324 #[tokio::test]
1325 pub async fn test_scheduler() {
1326 let _ = tracing_subscriber::fmt().try_init();
1327
1328 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1329 let state: State = Arc::new(Mutex::new(Vec::new()));
1330 let scheduler = Scheduler::new(task_store);
1331 scheduler.start(state.clone());
1332 scheduler.register::<TestTask>().unwrap();
1333
1334 let _task1 = TestTask::new(1);
1335 let task2 = TestTask::new(1);
1336 let task3 = TestTask::new(1);
1337
1338 let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1339 let task3_id = scheduler.add(task3).await.unwrap();
1340 scheduler
1341 .task(TestTask::new(1))
1342 .depend_on(vec![task2_id, task3_id])
1343 .schedule()
1344 .await
1345 .unwrap();
1346
1347 tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1348 let task4 = TestTask::new(1);
1349 let task5 = TestTask::new(1);
1350 scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1351 scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1352
1353 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1354
1355 let st = state.lock().unwrap();
1356 info!("res: {}", st.len());
1357 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1358 assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1359 }
1360
1361 #[tokio::test]
1362 pub async fn test_retry_with_backoff() {
1363 let _ = tracing_subscriber::fmt().try_init();
1364
1365 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1366 let state: State = Arc::new(Mutex::new(Vec::new()));
1367 let scheduler = Scheduler::new(task_store);
1368 scheduler.start(state.clone());
1369 scheduler.register::<FailingTask>().unwrap();
1370
1371 let failing_task = FailingTask::new(42, 2);
1374 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1375
1376 scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1377
1378 tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1385
1386 let st = state.lock().unwrap();
1387 assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1388 assert_eq!(st[0], 42);
1389 }
1390
1391 #[tokio::test]
1394 pub async fn test_builder_simple_schedule() {
1395 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1396 let state: State = Arc::new(Mutex::new(Vec::new()));
1397 let scheduler = Scheduler::new(task_store);
1398 scheduler.start(state.clone());
1399 scheduler.register::<TestTask>().unwrap();
1400
1401 let task = TestTask::new(1);
1403 let id = scheduler.task(task).now().await.unwrap();
1404
1405 assert!(id > 0, "Task ID should be positive");
1406
1407 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1408
1409 let st = state.lock().unwrap();
1410 assert_eq!(st.len(), 1, "Task should have executed");
1411 assert_eq!(st[0], 1);
1412 }
1413
1414 #[tokio::test]
1415 pub async fn test_builder_with_key() {
1416 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1417 let state: State = Arc::new(Mutex::new(Vec::new()));
1418 let scheduler = Scheduler::new(task_store);
1419 scheduler.start(state.clone());
1420 scheduler.register::<TestTask>().unwrap();
1421
1422 let task = TestTask::new(1);
1424 let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1425
1426 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1427
1428 let st = state.lock().unwrap();
1429 assert_eq!(st.len(), 1);
1430 assert_eq!(st[0], 1);
1431 }
1432
1433 #[tokio::test]
1434 pub async fn test_builder_with_delay() {
1435 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1436 let state: State = Arc::new(Mutex::new(Vec::new()));
1437 let scheduler = Scheduler::new(task_store);
1438 scheduler.start(state.clone());
1439 scheduler.register::<TestTask>().unwrap();
1440
1441 let task = TestTask::new(1);
1443 let _id = scheduler
1444 .task(task)
1445 .after(1) .await
1447 .unwrap();
1448
1449 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1451 {
1452 let st = state.lock().unwrap();
1453 assert_eq!(st.len(), 0, "Task should not execute yet");
1454 }
1455
1456 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1458
1459 {
1460 let st = state.lock().unwrap();
1461 assert_eq!(st.len(), 1, "Task should have executed");
1462 assert_eq!(st[0], 1);
1463 }
1464 }
1465
1466 #[tokio::test]
1467 pub async fn test_builder_with_dependencies() {
1468 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1469 let state: State = Arc::new(Mutex::new(Vec::new()));
1470 let scheduler = Scheduler::new(task_store);
1471 scheduler.start(state.clone());
1472 scheduler.register::<TestTask>().unwrap();
1473
1474 let task1 = TestTask::new(1);
1476 let id1 = scheduler.task(task1).now().await.unwrap();
1477
1478 let task2 = TestTask::new(1);
1480 let id2 = scheduler.task(task2).now().await.unwrap();
1481
1482 let task3 = TestTask::new(1);
1484 let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1485
1486 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1488
1489 let st = state.lock().unwrap();
1490 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1492 assert_eq!(str_vec.join(":"), "1:1:1");
1493 }
1494
1495 #[tokio::test]
1496 pub async fn test_builder_with_retry() {
1497 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1498 let state: State = Arc::new(Mutex::new(Vec::new()));
1499 let scheduler = Scheduler::new(task_store);
1500 scheduler.start(state.clone());
1501 scheduler.register::<FailingTask>().unwrap();
1502
1503 let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1506
1507 let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1508
1509 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1511
1512 let st = state.lock().unwrap();
1513 assert_eq!(st.len(), 1);
1514 assert_eq!(st[0], 55);
1515 }
1516
1517 #[tokio::test]
1518 pub async fn test_builder_with_automatic_retry() {
1519 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1520 let state: State = Arc::new(Mutex::new(Vec::new()));
1521 let scheduler = Scheduler::new(task_store);
1522 scheduler.start(state.clone());
1523 scheduler.register::<FailingTask>().unwrap();
1524
1525 let failing_task = FailingTask::new(66, 1);
1527 let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1528
1529 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1532
1533 let st = state.lock().unwrap();
1535 let _ = st.len(); }
1539
1540 #[tokio::test]
1541 pub async fn test_builder_fluent_chaining() {
1542 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1543 let state: State = Arc::new(Mutex::new(Vec::new()));
1544 let scheduler = Scheduler::new(task_store);
1545 scheduler.start(state.clone());
1546 scheduler.register::<TestTask>().unwrap();
1547
1548 let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1550 let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1551
1552 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1554
1555 let task = TestTask::new(1);
1556 let _id = scheduler
1557 .task(task)
1558 .key("complex-task")
1559 .schedule_after(0) .depend_on(vec![dep1, dep2])
1561 .with_retry(retry_policy)
1562 .schedule()
1563 .await
1564 .unwrap();
1565
1566 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1567
1568 let st = state.lock().unwrap();
1569 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1571 assert_eq!(str_vec.join(":"), "1:1:1");
1572 }
1573
1574 #[tokio::test]
1575 pub async fn test_builder_backward_compatibility() {
1576 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1577 let state: State = Arc::new(Mutex::new(Vec::new()));
1578 let scheduler = Scheduler::new(task_store);
1579 scheduler.start(state.clone());
1580 scheduler.register::<TestTask>().unwrap();
1581
1582 let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1584
1585 let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1587
1588 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1589
1590 let st = state.lock().unwrap();
1591 assert_eq!(st.len(), 2);
1593 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1594 assert_eq!(str_vec.join(":"), "1:1");
1595 }
1596
1597 #[tokio::test]
1600 pub async fn test_builder_pipeline_scenario() {
1601 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1603 let state: State = Arc::new(Mutex::new(Vec::new()));
1604 let scheduler = Scheduler::new(task_store);
1605 scheduler.start(state.clone());
1606 scheduler.register::<TestTask>().unwrap();
1607
1608 let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1610
1611 let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1613
1614 let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1616
1617 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1619
1620 let st = state.lock().unwrap();
1621 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1623 assert_eq!(str_vec.join(":"), "1:1:1");
1624 }
1625
1626 #[tokio::test]
1627 pub async fn test_builder_multi_dependency_join() {
1628 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1630 let state: State = Arc::new(Mutex::new(Vec::new()));
1631 let scheduler = Scheduler::new(task_store);
1632 scheduler.start(state.clone());
1633 scheduler.register::<TestTask>().unwrap();
1634
1635 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1637 let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1638
1639 let _id3 = scheduler
1641 .task(TestTask::new(1))
1642 .depend_on(vec![id1, id2])
1643 .schedule()
1644 .await
1645 .unwrap();
1646
1647 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1648
1649 let st = state.lock().unwrap();
1650 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1652 assert_eq!(str_vec.join(":"), "1:1:1");
1653 }
1654
1655 #[tokio::test]
1656 pub async fn test_builder_scheduled_task_with_dependencies() {
1657 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1659 let state: State = Arc::new(Mutex::new(Vec::new()));
1660 let scheduler = Scheduler::new(task_store);
1661 scheduler.start(state.clone());
1662 scheduler.register::<TestTask>().unwrap();
1663
1664 let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1666
1667 let ts = Timestamp::from_now(1);
1669 let _task_id = scheduler
1670 .task(TestTask::new(1))
1671 .schedule_at(ts)
1672 .depend_on(vec![dep_id])
1673 .schedule()
1674 .await
1675 .unwrap();
1676
1677 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1679 {
1680 let st = state.lock().unwrap();
1681 assert_eq!(st.len(), 1); }
1683
1684 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1686
1687 {
1688 let st = state.lock().unwrap();
1689 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1690 assert_eq!(str_vec.join(":"), "1:1");
1691 }
1692 }
1693
1694 #[tokio::test]
1695 pub async fn test_builder_mixed_features() {
1696 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1698 let state: State = Arc::new(Mutex::new(Vec::new()));
1699 let scheduler = Scheduler::new(task_store);
1700 scheduler.start(state.clone());
1701 scheduler.register::<TestTask>().unwrap();
1702 scheduler.register::<FailingTask>().unwrap();
1703
1704 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1706
1707 let _id2 = scheduler
1709 .task(TestTask::new(1))
1710 .key("critical-task")
1711 .schedule_after(0)
1712 .depend_on(vec![id1])
1713 .schedule()
1714 .await
1715 .unwrap();
1716
1717 let _id3 = scheduler
1719 .task(FailingTask::new(1, 0)) .key("retryable-task")
1721 .with_retry(RetryPolicy {
1722 wait_min_max: (1, 3600),
1723 times: 3,
1724 })
1725 .schedule()
1726 .await
1727 .unwrap();
1728
1729 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1731
1732 let st = state.lock().unwrap();
1733 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1735 assert_eq!(str_vec.join(":"), "1:1:1");
1736 }
1737
1738 #[tokio::test]
1739 pub async fn test_builder_builder_reuse_not_possible() {
1740 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1742 let _state: State = Arc::new(Mutex::new(Vec::new()));
1743 let scheduler = Scheduler::new(task_store);
1744
1745 let task = TestTask::new(1);
1746 let builder = scheduler.task(task);
1747
1748 let _id = builder.now().await.unwrap();
1754 }
1758
1759 #[tokio::test]
1760 pub async fn test_builder_different_task_types() {
1761 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1763 let state: State = Arc::new(Mutex::new(Vec::new()));
1764 let scheduler = Scheduler::new(task_store);
1765 scheduler.start(state.clone());
1766 scheduler.register::<TestTask>().unwrap();
1767 scheduler.register::<FailingTask>().unwrap();
1768
1769 let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1771
1772 let _id2 = scheduler
1773 .task(FailingTask::new(1, 0)) .key("failing-task")
1775 .now()
1776 .await
1777 .unwrap();
1778
1779 let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1780
1781 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1782
1783 let st = state.lock().unwrap();
1784 assert_eq!(st.len(), 3);
1785 let str_vec = st.iter().map(std::string::ToString::to_string).collect::<Vec<String>>();
1786 assert_eq!(str_vec.join(":"), "1:1:1");
1788 }
1789
1790 #[tokio::test]
1795 pub async fn test_builder_cron_placeholder_syntax() {
1796 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1798 let state: State = Arc::new(Mutex::new(Vec::new()));
1799 let scheduler = Scheduler::new(task_store);
1800 scheduler.start(state.clone());
1801 scheduler.register::<TestTask>().unwrap();
1802
1803 let task = TestTask::new(1);
1805 let _id = scheduler
1806 .task(task)
1807 .key("cron-task")
1808 .cron("0 9 * * *") .schedule()
1810 .await
1811 .unwrap();
1812
1813 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1817
1818 let st = state.lock().unwrap();
1819 assert_eq!(st.len(), 0); }
1823
1824 #[tokio::test]
1825 pub async fn test_builder_daily_at_placeholder() {
1826 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1828 let state: State = Arc::new(Mutex::new(Vec::new()));
1829 let scheduler = Scheduler::new(task_store);
1830 scheduler.start(state.clone());
1831 scheduler.register::<TestTask>().unwrap();
1832
1833 let task = TestTask::new(1);
1835 let _id = scheduler
1836 .task(task)
1837 .key("daily-task")
1838 .daily_at(14, 30) .schedule()
1840 .await
1841 .unwrap();
1842
1843 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1846
1847 let st = state.lock().unwrap();
1848 assert_eq!(st.len(), 0);
1851 }
1852
1853 #[tokio::test]
1854 pub async fn test_builder_weekly_at_placeholder() {
1855 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1857 let state: State = Arc::new(Mutex::new(Vec::new()));
1858 let scheduler = Scheduler::new(task_store);
1859 scheduler.start(state.clone());
1860 scheduler.register::<TestTask>().unwrap();
1861
1862 let task = TestTask::new(1);
1864 let _id = scheduler
1865 .task(task)
1866 .key("weekly-task")
1867 .weekly_at(1, 9, 0) .schedule()
1869 .await
1870 .unwrap();
1871
1872 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1875
1876 let st = state.lock().unwrap();
1877 assert_eq!(st.len(), 0);
1880 }
1881
1882 #[tokio::test]
1883 pub async fn test_builder_cron_with_retry() {
1884 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1886 let state: State = Arc::new(Mutex::new(Vec::new()));
1887 let scheduler = Scheduler::new(task_store);
1888 scheduler.start(state.clone());
1889 scheduler.register::<TestTask>().unwrap();
1890
1891 let task = TestTask::new(1);
1893 let _id = scheduler
1894 .task(task)
1895 .key("reliable-scheduled-task")
1896 .daily_at(2, 0) .with_retry(RetryPolicy {
1898 wait_min_max: (60, 3600),
1899 times: 5,
1900 })
1901 .schedule()
1902 .await
1903 .unwrap();
1904
1905 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1908
1909 let st = state.lock().unwrap();
1910 assert_eq!(st.len(), 0);
1913 }
1914
1915 #[test]
1918 fn test_cron_to_string() {
1919 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1921 assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1922 }
1923
1924 #[tokio::test]
1925 pub async fn test_running_task_not_double_scheduled() {
1926 let _ = tracing_subscriber::fmt().try_init();
1927
1928 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1929 let state: State = Arc::new(Mutex::new(Vec::new()));
1930 let scheduler = Scheduler::new(task_store);
1931 scheduler.start(state.clone());
1932 scheduler.register::<TestTask>().unwrap();
1933
1934 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1937
1938 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1940
1941 {
1943 let running = scheduler.tasks_running.lock().unwrap();
1944 assert!(running.contains_key(&task_id), "Task should be in running queue");
1945 }
1946
1947 let task_meta = TaskMeta {
1949 task: task.clone(),
1950 next_at: Some(Timestamp::now()),
1951 deps: vec![],
1952 retry_count: 0,
1953 retry: None,
1954 cron: None,
1955 };
1956 let result = scheduler.add_queue(task_id, task_meta).await;
1957
1958 assert!(result.is_ok(), "add_queue should succeed");
1960
1961 {
1963 let sched_queue = scheduler.tasks_scheduled.lock().unwrap();
1964 let in_scheduled = sched_queue.iter().any(|((_, id), _)| *id == task_id);
1965 assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1966 }
1967
1968 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1970
1971 let st = state.lock().unwrap();
1973 assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1974 assert_eq!(st[0], 5);
1975 }
1976
1977 #[tokio::test]
1978 pub async fn test_running_task_metadata_updated() {
1979 let _ = tracing_subscriber::fmt().try_init();
1980
1981 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1982 let state: State = Arc::new(Mutex::new(Vec::new()));
1983 let scheduler = Scheduler::new(task_store);
1984 scheduler.start(state.clone());
1985 scheduler.register::<TestTask>().unwrap();
1986
1987 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1990
1991 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1993
1994 {
1996 let running = scheduler.tasks_running.lock().unwrap();
1997 let meta = running.get(&task_id).expect("Task should be running");
1998 assert!(meta.cron.is_none(), "Task should have no cron initially");
1999 }
2000
2001 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2003 let task_meta_with_cron = TaskMeta {
2004 task: task.clone(),
2005 next_at: Some(Timestamp::now()),
2006 deps: vec![],
2007 retry_count: 0,
2008 retry: None,
2009 cron: Some(cron.clone()),
2010 };
2011 let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2012
2013 assert!(result.is_ok(), "add_queue should succeed");
2015
2016 {
2018 let running = scheduler.tasks_running.lock().unwrap();
2019 let meta = running.get(&task_id).expect("Task should still be running");
2020 assert!(meta.cron.is_some(), "Task should now have cron after update");
2021 }
2022
2023 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2025 }
2026}
2027
2028