1use async_trait::async_trait;
4use flume;
5use itertools::Itertools;
6use std::{
7 collections::{BTreeMap, HashMap},
8 fmt::Debug,
9 sync::{Arc, Mutex, RwLock},
10};
11
12use chrono::{DateTime, Utc};
13use croner::Cron;
14use std::str::FromStr;
15
16use crate::prelude::*;
17use cloudillo_types::{lock, meta_adapter};
18
19pub type TaskId = u64;
20
21pub enum TaskType {
22 Periodic,
23 Once,
24}
25
26#[derive(Debug, Clone)]
29pub struct CronSchedule {
30 expr: Box<str>,
32 cron: Cron,
34}
35
36impl CronSchedule {
37 pub fn parse(expr: &str) -> ClResult<Self> {
39 let cron = Cron::from_str(expr)
40 .map_err(|e| Error::ValidationError(format!("invalid cron expression: {}", e)))?;
41 Ok(Self { expr: expr.into(), cron })
42 }
43
44 pub fn next_execution(&self, after: Timestamp) -> ClResult<Timestamp> {
49 let dt = DateTime::<Utc>::from_timestamp(after.0, 0).unwrap_or_else(Utc::now);
50
51 self.cron
52 .find_next_occurrence(&dt, false)
53 .map(|next| Timestamp(next.timestamp()))
54 .map_err(|e| {
55 tracing::error!("Failed to find next cron occurrence for '{}': {}", self.expr, e);
56 Error::ValidationError(format!("cron next_execution failed: {}", e))
57 })
58 }
59
60 pub fn to_cron_string(&self) -> String {
62 self.expr.to_string()
63 }
64}
65
66impl PartialEq for CronSchedule {
67 fn eq(&self, other: &Self) -> bool {
68 self.expr == other.expr
69 }
70}
71
72impl Eq for CronSchedule {}
73
74#[async_trait]
75pub trait Task<S: Clone>: Send + Sync + Debug {
76 fn kind() -> &'static str
77 where
78 Self: Sized;
79 fn build(id: TaskId, context: &str) -> ClResult<Arc<dyn Task<S>>>
80 where
81 Self: Sized;
82 fn serialize(&self) -> String;
83 async fn run(&self, state: &S) -> ClResult<()>;
84
85 fn kind_of(&self) -> &'static str;
86}
87
88#[derive(Debug)]
89pub enum TaskStatus {
90 Pending,
91 Completed,
92 Failed,
93}
94
95pub struct TaskData {
96 id: TaskId,
97 kind: Box<str>,
98 status: TaskStatus,
99 input: Box<str>,
100 deps: Box<[TaskId]>,
101 retry_data: Option<Box<str>>,
102 cron_data: Option<Box<str>>,
103 next_at: Option<Timestamp>,
104}
105
106#[async_trait]
107pub trait TaskStore<S: Clone>: Send + Sync {
108 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId>;
109 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()>;
110 async fn load(&self) -> ClResult<Vec<TaskData>>;
111 async fn update_task_error(
112 &self,
113 task_id: TaskId,
114 output: &str,
115 next_at: Option<Timestamp>,
116 ) -> ClResult<()>;
117 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>>;
118 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()>;
119}
120
121pub struct InMemoryTaskStore {
124 last_id: Mutex<TaskId>,
125}
126
127impl InMemoryTaskStore {
128 pub fn new() -> Arc<Self> {
129 Arc::new(Self { last_id: Mutex::new(0) })
130 }
131}
132
133#[async_trait]
134impl<S: Clone> TaskStore<S> for InMemoryTaskStore {
135 async fn add(&self, _task: &TaskMeta<S>, _key: Option<&str>) -> ClResult<TaskId> {
136 let mut last_id = lock!(self.last_id)?;
137 *last_id += 1;
138 Ok(*last_id)
139 }
140
141 async fn finished(&self, _id: TaskId, _output: &str) -> ClResult<()> {
142 Ok(())
143 }
144
145 async fn load(&self) -> ClResult<Vec<TaskData>> {
146 Ok(vec![])
147 }
148
149 async fn update_task_error(
150 &self,
151 _task_id: TaskId,
152 _output: &str,
153 _next_at: Option<Timestamp>,
154 ) -> ClResult<()> {
155 Ok(())
156 }
157
158 async fn find_by_key(&self, _key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
159 Ok(None)
161 }
162
163 async fn update_task(&self, _id: TaskId, _task: &TaskMeta<S>) -> ClResult<()> {
164 Ok(())
166 }
167}
168
169pub struct MetaAdapterTaskStore {
172 meta_adapter: Arc<dyn meta_adapter::MetaAdapter>,
173}
174
175impl MetaAdapterTaskStore {
176 pub fn new(meta_adapter: Arc<dyn meta_adapter::MetaAdapter>) -> Arc<Self> {
177 Arc::new(Self { meta_adapter })
178 }
179}
180
181#[async_trait]
182impl<S: Clone> TaskStore<S> for MetaAdapterTaskStore {
183 async fn add(&self, task: &TaskMeta<S>, key: Option<&str>) -> ClResult<TaskId> {
184 let id = self
185 .meta_adapter
186 .create_task(task.task.kind_of(), key, &task.task.serialize(), &task.deps)
187 .await?;
188
189 if let Some(cron) = &task.cron {
191 self.meta_adapter
192 .update_task(
193 id,
194 &meta_adapter::TaskPatch {
195 cron: Patch::Value(cron.to_cron_string()),
196 ..Default::default()
197 },
198 )
199 .await?;
200 }
201
202 Ok(id)
203 }
204
205 async fn finished(&self, id: TaskId, output: &str) -> ClResult<()> {
206 self.meta_adapter.update_task_finished(id, output).await
207 }
208
209 async fn load(&self) -> ClResult<Vec<TaskData>> {
210 let tasks = self.meta_adapter.list_tasks(meta_adapter::ListTaskOptions::default()).await?;
211 let tasks = tasks
212 .into_iter()
213 .map(|t| TaskData {
214 id: t.task_id,
215 kind: t.kind,
216 status: match t.status {
217 'P' => TaskStatus::Pending,
218 'F' => TaskStatus::Completed,
219 'E' => TaskStatus::Failed,
220 _ => TaskStatus::Failed,
221 },
222 input: t.input,
223 deps: t.deps,
224 retry_data: t.retry,
225 cron_data: t.cron,
226 next_at: t.next_at,
227 })
228 .collect();
229 Ok(tasks)
230 }
231
232 async fn update_task_error(
233 &self,
234 task_id: TaskId,
235 output: &str,
236 next_at: Option<Timestamp>,
237 ) -> ClResult<()> {
238 self.meta_adapter.update_task_error(task_id, output, next_at).await
239 }
240
241 async fn find_by_key(&self, key: &str) -> ClResult<Option<(TaskId, TaskData)>> {
242 let task_opt = self.meta_adapter.find_task_by_key(key).await?;
243
244 match task_opt {
245 Some(t) => Ok(Some((
246 t.task_id,
247 TaskData {
248 id: t.task_id,
249 kind: t.kind,
250 status: match t.status {
251 'P' => TaskStatus::Pending,
252 'F' => TaskStatus::Completed,
253 'E' => TaskStatus::Failed,
254 _ => TaskStatus::Failed,
255 },
256 input: t.input,
257 deps: t.deps,
258 retry_data: t.retry,
259 cron_data: t.cron,
260 next_at: t.next_at,
261 },
262 ))),
263 None => Ok(None),
264 }
265 }
266
267 async fn update_task(&self, id: TaskId, task: &TaskMeta<S>) -> ClResult<()> {
268 use cloudillo_types::types::Patch;
269
270 let mut patch = meta_adapter::TaskPatch {
272 input: Patch::Value(task.task.serialize()),
273 next_at: match task.next_at {
274 Some(ts) => Patch::Value(ts),
275 None => Patch::Null,
276 },
277 ..Default::default()
278 };
279
280 if !task.deps.is_empty() {
282 patch.deps = Patch::Value(task.deps.clone());
283 }
284
285 if let Some(ref retry) = task.retry {
287 let retry_str = format!(
288 "{},{},{},{}",
289 task.retry_count, retry.wait_min_max.0, retry.wait_min_max.1, retry.times
290 );
291 patch.retry = Patch::Value(retry_str);
292 }
293
294 if let Some(ref cron) = task.cron {
296 patch.cron = Patch::Value(cron.to_cron_string());
297 }
298
299 self.meta_adapter.update_task(id, &patch).await
300 }
301}
302
303type TaskBuilder<S> = dyn Fn(TaskId, &str) -> ClResult<Arc<dyn Task<S>>> + Send + Sync;
305
306#[derive(Debug, Clone)]
307pub struct RetryPolicy {
308 wait_min_max: (u64, u64),
309 times: u16,
310}
311
312impl Default for RetryPolicy {
313 fn default() -> Self {
314 Self { wait_min_max: (60, 3600), times: 10 }
315 }
316}
317
318impl RetryPolicy {
319 pub fn new(wait_min_max: (u64, u64), times: u16) -> Self {
321 Self { wait_min_max, times }
322 }
323
324 pub fn calculate_backoff(&self, attempt_count: u16) -> u64 {
326 let (min, max) = self.wait_min_max;
327 let backoff = min * (1u64 << attempt_count as u64);
328 backoff.min(max)
329 }
330
331 pub fn should_retry(&self, attempt_count: u16) -> bool {
333 attempt_count < self.times
334 }
335}
336
337pub struct TaskSchedulerBuilder<'a, S: Clone> {
340 scheduler: &'a Scheduler<S>,
341 task: Arc<dyn Task<S>>,
342 key: Option<String>,
343 next_at: Option<Timestamp>,
344 deps: Vec<TaskId>,
345 retry: Option<RetryPolicy>,
346 cron: Option<CronSchedule>,
347}
348
349impl<'a, S: Clone + Send + Sync + 'static> TaskSchedulerBuilder<'a, S> {
350 fn new(scheduler: &'a Scheduler<S>, task: Arc<dyn Task<S>>) -> Self {
352 Self {
353 scheduler,
354 task,
355 key: None,
356 next_at: None,
357 deps: Vec::new(),
358 retry: None,
359 cron: None,
360 }
361 }
362
363 pub fn key(mut self, key: impl Into<String>) -> Self {
365 self.key = Some(key.into());
366 self
367 }
368
369 pub fn schedule_at(mut self, timestamp: Timestamp) -> Self {
371 self.next_at = Some(timestamp);
372 self
373 }
374
375 pub fn schedule_after(mut self, seconds: i64) -> Self {
377 self.next_at = Some(Timestamp::from_now(seconds));
378 self
379 }
380
381 pub fn depend_on(mut self, deps: Vec<TaskId>) -> Self {
383 self.deps = deps;
384 self
385 }
386
387 pub fn depends_on(mut self, dep: TaskId) -> Self {
389 self.deps.push(dep);
390 self
391 }
392
393 pub fn with_retry(mut self, policy: RetryPolicy) -> Self {
395 self.retry = Some(policy);
396 self
397 }
398
399 pub fn cron(mut self, expr: impl Into<String>) -> Self {
404 if let Ok(cron_schedule) = CronSchedule::parse(&expr.into()) {
405 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
408 self.cron = Some(cron_schedule);
409 }
410 self
411 }
412
413 pub fn daily_at(mut self, hour: u8, minute: u8) -> Self {
416 if hour <= 23 && minute <= 59 {
417 let expr = format!("{} {} * * *", minute, hour);
418 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
419 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
422 self.cron = Some(cron_schedule);
423 }
424 }
425 self
426 }
427
428 pub fn weekly_at(mut self, weekday: u8, hour: u8, minute: u8) -> Self {
432 if weekday <= 6 && hour <= 23 && minute <= 59 {
433 let expr = format!("{} {} * * {}", minute, hour, weekday);
434 if let Ok(cron_schedule) = CronSchedule::parse(&expr) {
435 self.next_at = cron_schedule.next_execution(Timestamp::now()).ok();
438 self.cron = Some(cron_schedule);
439 }
440 }
441 self
442 }
443
444 pub async fn now(self) -> ClResult<TaskId> {
446 self.schedule().await
447 }
448
449 pub async fn at(mut self, ts: Timestamp) -> ClResult<TaskId> {
451 self.next_at = Some(ts);
452 self.schedule().await
453 }
454
455 pub async fn after(mut self, seconds: i64) -> ClResult<TaskId> {
457 self.next_at = Some(Timestamp::from_now(seconds));
458 self.schedule().await
459 }
460
461 pub async fn after_task(mut self, dep: TaskId) -> ClResult<TaskId> {
463 self.deps.push(dep);
464 self.schedule().await
465 }
466
467 pub async fn with_automatic_retry(mut self) -> ClResult<TaskId> {
469 self.retry = Some(RetryPolicy::default());
470 self.schedule().await
471 }
472
473 pub async fn schedule(self) -> ClResult<TaskId> {
475 self.scheduler
476 ._schedule_task(
477 self.task,
478 self.key.as_deref(),
479 self.next_at,
480 if self.deps.is_empty() { None } else { Some(self.deps) },
481 self.retry,
482 self.cron,
483 )
484 .await
485 }
486}
487
488#[derive(Debug, Clone)]
489pub struct TaskMeta<S: Clone> {
490 pub task: Arc<dyn Task<S>>,
491 pub next_at: Option<Timestamp>,
492 pub deps: Vec<TaskId>,
493 retry_count: u16,
494 pub retry: Option<RetryPolicy>,
495 pub cron: Option<CronSchedule>,
496}
497
498#[allow(clippy::type_complexity)]
500#[derive(Clone)]
501pub struct Scheduler<S: Clone> {
502 task_builders: Arc<RwLock<HashMap<&'static str, Box<TaskBuilder<S>>>>>,
503 store: Arc<dyn TaskStore<S>>,
504 tasks_running: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
505 tasks_waiting: Arc<Mutex<HashMap<TaskId, TaskMeta<S>>>>,
506 task_dependents: Arc<Mutex<HashMap<TaskId, Vec<TaskId>>>>,
507 tasks_scheduled: Arc<Mutex<BTreeMap<(Timestamp, TaskId), TaskMeta<S>>>>,
508 tx_finish: flume::Sender<TaskId>,
509 rx_finish: flume::Receiver<TaskId>,
510 notify_schedule: Arc<tokio::sync::Notify>,
511}
512
513impl<S: Clone + Send + Sync + 'static> Scheduler<S> {
514 pub fn new(store: Arc<dyn TaskStore<S>>) -> Arc<Self> {
515 let (tx_finish, rx_finish) = flume::unbounded();
516
517 let scheduler = Self {
518 task_builders: Arc::new(RwLock::new(HashMap::new())),
519 store,
520 tasks_running: Arc::new(Mutex::new(HashMap::new())),
521 tasks_waiting: Arc::new(Mutex::new(HashMap::new())),
522 task_dependents: Arc::new(Mutex::new(HashMap::new())),
523 tasks_scheduled: Arc::new(Mutex::new(BTreeMap::new())),
524 tx_finish,
525 rx_finish,
526 notify_schedule: Arc::new(tokio::sync::Notify::new()),
527 };
528
529 Arc::new(scheduler)
532 }
533
534 pub fn start(&self, state: S) {
535 let schedule = self.clone();
537 let stat = state.clone();
538 let rx_finish = self.rx_finish.clone();
539
540 tokio::spawn(async move {
541 while let Ok(id) = rx_finish.recv_async().await {
542 debug!("Completed task {} (notified)", id);
543
544 let task_meta_opt = {
546 let tasks_running = match schedule.tasks_running.lock() {
547 Ok(guard) => guard,
548 Err(poisoned) => {
549 error!("Mutex poisoned: tasks_running (recovering)");
550 poisoned.into_inner()
551 }
552 };
553 tasks_running.get(&id).cloned()
554 };
555
556 if let Some(task_meta) = task_meta_opt {
557 let mut transition_ok = false;
559
560 if let Some(ref cron) = task_meta.cron {
562 let next_at = match cron.next_execution(Timestamp::now()) {
564 Ok(ts) => ts,
565 Err(e) => {
566 error!(
567 "Failed to calculate next execution for recurring task {}: {} - task will not reschedule",
568 id, e
569 );
570 if let Err(e) = schedule.store.finished(id, "").await {
572 error!("Failed to mark task {} as finished: {}", id, e);
573 }
574 continue;
575 }
576 };
577 info!(
578 "Recurring task {} completed, scheduling next execution at {}",
579 id, next_at
580 );
581
582 let mut updated_meta = task_meta.clone();
584 updated_meta.next_at = Some(next_at);
585
586 if let Err(e) = schedule.store.update_task(id, &updated_meta).await {
588 error!("Failed to update recurring task {} next_at: {}", id, e);
589 }
590
591 match schedule.tasks_running.lock() {
593 Ok(mut tasks_running) => {
594 tasks_running.remove(&id);
595 }
596 Err(poisoned) => {
597 error!("Mutex poisoned: tasks_running (recovering)");
598 poisoned.into_inner().remove(&id);
599 }
600 }
601
602 match schedule.add_queue(id, updated_meta).await {
604 Ok(_) => transition_ok = true,
605 Err(e) => {
606 error!(
607 "Failed to reschedule recurring task {}: {} - task lost!",
608 id, e
609 );
610 }
611 }
612 } else {
613 match schedule.store.finished(id, "").await {
615 Ok(_) => transition_ok = true,
616 Err(e) => {
617 error!(
618 "Failed to mark task {} as finished: {} - task remains in running queue",
619 id, e
620 );
621 }
622 }
623 }
624
625 if transition_ok {
627 match schedule.tasks_running.lock() {
628 Ok(mut tasks_running) => {
629 tasks_running.remove(&id);
630 }
631 Err(poisoned) => {
632 error!("Mutex poisoned: tasks_running (recovering)");
633 poisoned.into_inner().remove(&id);
634 }
635 }
636 }
637
638 match schedule.release_dependents(id) {
640 Ok(ready_to_spawn) => {
641 for (dep_id, dep_task_meta) in ready_to_spawn {
642 match schedule.tasks_running.lock() {
644 Ok(mut tasks_running) => {
645 tasks_running.insert(dep_id, dep_task_meta.clone());
646 }
647 Err(poisoned) => {
648 error!("Mutex poisoned: tasks_running (recovering)");
649 poisoned.into_inner().insert(dep_id, dep_task_meta.clone());
650 }
651 }
652 schedule.spawn_task(
653 stat.clone(),
654 dep_task_meta.task.clone(),
655 dep_id,
656 dep_task_meta,
657 );
658 }
659 }
660 Err(e) => {
661 error!("Failed to release dependents of task {}: {}", id, e);
662 }
663 }
664 } else {
665 warn!("Completed task {} not found in running queue", id);
666 }
667 }
668 });
669
670 let schedule = self.clone();
672 tokio::spawn(async move {
673 loop {
674 let is_empty = match schedule.tasks_scheduled.lock() {
675 Ok(guard) => guard.is_empty(),
676 Err(poisoned) => {
677 error!("Mutex poisoned: tasks_scheduled (recovering)");
678 poisoned.into_inner().is_empty()
679 }
680 };
681 if is_empty {
682 schedule.notify_schedule.notified().await;
683 }
684 let time = Timestamp::now();
685 if let Some((timestamp, _id)) = loop {
686 let mut tasks_scheduled = match schedule.tasks_scheduled.lock() {
687 Ok(guard) => guard,
688 Err(poisoned) => {
689 error!("Mutex poisoned: tasks_scheduled (recovering)");
690 poisoned.into_inner()
691 }
692 };
693 if let Some((&(timestamp, id), _)) = tasks_scheduled.first_key_value() {
694 let (timestamp, id) = (timestamp, id);
695 if timestamp <= Timestamp::now() {
696 debug!("Spawning task id {} (from schedule)", id);
697 if let Some(task) = tasks_scheduled.remove(&(timestamp, id)) {
698 let mut tasks_running = match schedule.tasks_running.lock() {
699 Ok(guard) => guard,
700 Err(poisoned) => {
701 error!("Mutex poisoned: tasks_running (recovering)");
702 poisoned.into_inner()
703 }
704 };
705 tasks_running.insert(id, task.clone());
706 schedule.spawn_task(state.clone(), task.task.clone(), id, task);
707 } else {
708 error!("Task disappeared while being removed from schedule");
709 break None;
710 }
711 } else {
712 break Some((timestamp, id));
713 }
714 } else {
715 break None;
716 }
717 } {
718 let wait = tokio::time::Duration::from_secs((timestamp.0 - time.0) as u64);
719 tokio::select! {
720 _ = tokio::time::sleep(wait) => (), _ = schedule.notify_schedule.notified() => ()
721 };
722 }
723 }
724 });
725
726 let schedule = self.clone();
727 tokio::spawn(async move {
728 let _ignore_err = schedule.load().await;
729 });
730 }
731
732 fn register_builder(
733 &self,
734 name: &'static str,
735 builder: &'static TaskBuilder<S>,
736 ) -> ClResult<&Self> {
737 let mut task_builders = self
738 .task_builders
739 .write()
740 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
741 task_builders.insert(name, Box::new(builder));
742 Ok(self)
743 }
744
745 pub fn register<T: Task<S>>(&self) -> ClResult<&Self> {
746 info!("Registering task type {}", T::kind());
747 self.register_builder(T::kind(), &|id: TaskId, params: &str| T::build(id, params))?;
748 Ok(self)
749 }
750
751 pub fn task(&self, task: Arc<dyn Task<S>>) -> TaskSchedulerBuilder<'_, S> {
753 TaskSchedulerBuilder::new(self, task)
754 }
755
756 async fn _schedule_task(
759 &self,
760 task: Arc<dyn Task<S>>,
761 key: Option<&str>,
762 next_at: Option<Timestamp>,
763 deps: Option<Vec<TaskId>>,
764 retry: Option<RetryPolicy>,
765 cron: Option<CronSchedule>,
766 ) -> ClResult<TaskId> {
767 let task_meta = TaskMeta {
768 task: task.clone(),
769 next_at,
770 deps: deps.clone().unwrap_or_default(),
771 retry_count: 0,
772 retry,
773 cron,
774 };
775
776 if let Some(key) = key {
778 if let Some((existing_id, existing_data)) = self.store.find_by_key(key).await? {
779 let new_serialized = task.serialize();
780 let existing_serialized = existing_data.input.as_ref();
781
782 if new_serialized == existing_serialized {
784 info!(
785 "Recurring task '{}' already exists with identical parameters (id={})",
786 key, existing_id
787 );
788 self.store.update_task(existing_id, &task_meta).await?;
790 self.add_queue(existing_id, task_meta).await?;
792 return Ok(existing_id);
793 } else {
794 info!(
795 "Updating recurring task '{}' (id={}) - parameters changed",
796 key, existing_id
797 );
798 info!(" Old params: {}", existing_serialized);
799 info!(" New params: {}", new_serialized);
800
801 self.remove_from_queues(existing_id)?;
803
804 self.store.update_task(existing_id, &task_meta).await?;
806
807 self.add_queue(existing_id, task_meta).await?;
809
810 return Ok(existing_id);
811 }
812 }
813 }
814
815 let id = self.store.add(&task_meta, key).await?;
817 self.add_queue(id, task_meta).await
818 }
819
820 pub async fn add(&self, task: Arc<dyn Task<S>>) -> ClResult<TaskId> {
821 self.task(task).now().await
822 }
823
824 pub async fn add_queue(&self, id: TaskId, task_meta: TaskMeta<S>) -> ClResult<TaskId> {
825 {
828 let mut running = lock!(self.tasks_running, "tasks_running")?;
829 if let Some(existing_meta) = running.get_mut(&id) {
830 debug!(
831 "Task {} is already running, updating metadata (will reschedule on completion)",
832 id
833 );
834 *existing_meta = task_meta;
836 return Ok(id);
837 }
838 }
839
840 {
842 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
843 if let Some(key) = scheduled
844 .iter()
845 .find(|((_, tid), _)| *tid == id)
846 .map(|((ts, tid), _)| (*ts, *tid))
847 {
848 scheduled.remove(&key);
849 debug!("Removed existing scheduled entry for task {} before re-queueing", id);
850 }
851 }
852 {
853 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
854 if waiting.remove(&id).is_some() {
855 debug!("Removed existing waiting entry for task {} before re-queueing", id);
856 }
857 }
858
859 let deps = task_meta.deps.clone();
860
861 if !deps.is_empty() && task_meta.next_at.is_some() {
863 warn!("Task {} has both dependencies and scheduled time - ignoring next_at, placing in waiting queue", id);
864 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
866 debug!("Task {} is waiting for {:?}", id, &deps);
867 for dep in deps {
868 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
869 }
870 return Ok(id);
871 }
872
873 if deps.is_empty() && task_meta.next_at.unwrap_or(Timestamp(0)) < Timestamp::now() {
874 debug!("Spawning task {}", id);
875 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((Timestamp(0), id), task_meta);
876 self.notify_schedule.notify_one();
877 } else if let Some(next_at) = task_meta.next_at {
878 debug!("Scheduling task {} for {}", id, next_at);
879 lock!(self.tasks_scheduled, "tasks_scheduled")?.insert((next_at, id), task_meta);
880 self.notify_schedule.notify_one();
881 } else {
882 lock!(self.tasks_waiting, "tasks_waiting")?.insert(id, task_meta);
883 debug!("Task {} is waiting for {:?}", id, &deps);
884 for dep in deps {
885 lock!(self.task_dependents, "task_dependents")?.entry(dep).or_default().push(id);
886 }
887 }
888 Ok(id)
889 }
890
891 fn remove_from_queues(&self, task_id: TaskId) -> ClResult<Option<TaskMeta<S>>> {
894 if let Some(task_meta) = lock!(self.tasks_waiting, "tasks_waiting")?.remove(&task_id) {
896 debug!("Removed task {} from waiting queue for update", task_id);
897 return Ok(Some(task_meta));
898 }
899
900 {
902 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
903 if let Some(key) = scheduled
904 .iter()
905 .find(|((_, id), _)| *id == task_id)
906 .map(|((ts, id), _)| (*ts, *id))
907 {
908 if let Some(task_meta) = scheduled.remove(&key) {
909 debug!("Removed task {} from scheduled queue for update", task_id);
910 return Ok(Some(task_meta));
911 }
912 }
913 }
914
915 if let Some(task_meta) = lock!(self.tasks_running, "tasks_running")?.remove(&task_id) {
917 warn!("Removed task {} from running queue during update", task_id);
918 return Ok(Some(task_meta));
919 }
920
921 Ok(None)
922 }
923
924 fn release_dependents(
927 &self,
928 completed_task_id: TaskId,
929 ) -> ClResult<Vec<(TaskId, TaskMeta<S>)>> {
930 let dependents = {
932 let mut deps_map = lock!(self.task_dependents, "task_dependents")?;
933 deps_map.remove(&completed_task_id).unwrap_or_default()
934 };
935
936 if dependents.is_empty() {
937 return Ok(Vec::new()); }
939
940 debug!("Releasing {} dependents of completed task {}", dependents.len(), completed_task_id);
941
942 let mut ready_to_spawn = Vec::new();
943
944 for dependent_id in dependents {
946 {
948 let mut waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
949 if let Some(task_meta) = waiting.get_mut(&dependent_id) {
950 task_meta.deps.retain(|x| *x != completed_task_id);
952
953 if task_meta.deps.is_empty() {
955 if let Some(task_to_spawn) = waiting.remove(&dependent_id) {
956 debug!(
957 "Dependent task {} ready to spawn (all dependencies cleared)",
958 dependent_id
959 );
960 ready_to_spawn.push((dependent_id, task_to_spawn));
961 }
962 } else {
963 debug!(
964 "Task {} still has {} remaining dependencies",
965 dependent_id,
966 task_meta.deps.len()
967 );
968 }
969 continue;
970 }
971 }
972
973 {
975 let mut scheduled = lock!(self.tasks_scheduled, "tasks_scheduled")?;
976 if let Some(scheduled_key) = scheduled
977 .iter()
978 .find(|((_, id), _)| *id == dependent_id)
979 .map(|((ts, id), _)| (*ts, *id))
980 {
981 if let Some(task_meta) = scheduled.get_mut(&scheduled_key) {
982 task_meta.deps.retain(|x| *x != completed_task_id);
983 let remaining = task_meta.deps.len();
984 if remaining == 0 {
985 debug!(
986 "Task {} in scheduled queue has no remaining dependencies",
987 dependent_id
988 );
989 } else {
990 debug!(
991 "Task {} in scheduled queue has {} remaining dependencies",
992 dependent_id, remaining
993 );
994 }
995 }
996 continue;
997 }
998 }
999
1000 warn!(
1002 "Dependent task {} of completed task {} not found in any queue",
1003 dependent_id, completed_task_id
1004 );
1005 }
1006
1007 Ok(ready_to_spawn)
1008 }
1009
1010 async fn load(&self) -> ClResult<()> {
1011 let tasks = self.store.load().await?;
1012 debug!("Loaded {} tasks from store", tasks.len());
1013 for t in tasks {
1014 if let TaskStatus::Pending = t.status {
1015 debug!("Loading task {} {}", t.id, t.kind);
1016 let task = {
1017 let builder_map = self
1018 .task_builders
1019 .read()
1020 .map_err(|_| Error::Internal("task_builders RwLock poisoned".into()))?;
1021 let builder = builder_map.get(t.kind.as_ref()).ok_or(Error::Internal(
1022 format!("task builder not registered: {}", t.kind),
1023 ))?;
1024 builder(t.id, &t.input)?
1025 };
1026 let (retry_count, retry) = match t.retry_data {
1027 Some(retry_str) => {
1028 let (retry_count, retry_min, retry_max, retry_times) = retry_str
1029 .split(',')
1030 .collect_tuple()
1031 .ok_or(Error::Internal("invalid retry policy format".into()))?;
1032 let retry_count: u16 = retry_count
1033 .parse()
1034 .map_err(|_| Error::Internal("retry count must be u16".into()))?;
1035 let retry = RetryPolicy {
1036 wait_min_max: (
1037 retry_min
1038 .parse()
1039 .map_err(|_| Error::Internal("retry_min must be u64".into()))?,
1040 retry_max
1041 .parse()
1042 .map_err(|_| Error::Internal("retry_max must be u64".into()))?,
1043 ),
1044 times: retry_times
1045 .parse()
1046 .map_err(|_| Error::Internal("retry times must be u64".into()))?,
1047 };
1048 debug!("Loaded retry policy: {:?}", retry);
1049 (retry_count, Some(retry))
1050 }
1051 _ => (0, None),
1052 };
1053 let cron =
1055 t.cron_data.as_ref().and_then(|cron_str| CronSchedule::parse(cron_str).ok());
1056
1057 let task_meta = TaskMeta {
1058 task,
1059 next_at: t.next_at,
1060 deps: t.deps.into(),
1061 retry_count,
1062 retry,
1063 cron,
1064 };
1065 self.add_queue(t.id, task_meta).await?;
1066 }
1067 }
1068 Ok(())
1069 }
1070
1071 fn spawn_task(&self, state: S, task: Arc<dyn Task<S>>, id: TaskId, task_meta: TaskMeta<S>) {
1072 let tx_finish = self.tx_finish.clone();
1073 let store = self.store.clone();
1074 let scheduler = self.clone();
1075 tokio::spawn(async move {
1077 match task.run(&state).await {
1078 Ok(()) => {
1079 debug!("Task {} completed successfully", id);
1080 tx_finish.send(id).unwrap_or(());
1081 }
1082 Err(e) => {
1083 if let Some(retry_policy) = &task_meta.retry {
1084 if retry_policy.should_retry(task_meta.retry_count) {
1085 let backoff = retry_policy.calculate_backoff(task_meta.retry_count);
1086 let next_at = Timestamp::from_now(backoff as i64);
1087
1088 info!(
1089 "Task {} failed (attempt {}/{}). Scheduling retry in {} seconds: {}",
1090 id, task_meta.retry_count + 1, retry_policy.times, backoff, e
1091 );
1092
1093 store
1095 .update_task_error(id, &e.to_string(), Some(next_at))
1096 .await
1097 .unwrap_or(());
1098
1099 match scheduler.tasks_running.lock() {
1101 Ok(mut tasks_running) => {
1102 tasks_running.remove(&id);
1103 }
1104 Err(poisoned) => {
1105 error!("Mutex poisoned: tasks_running (recovering)");
1106 poisoned.into_inner().remove(&id);
1107 }
1108 }
1109
1110 let mut retry_meta = task_meta.clone();
1112 retry_meta.retry_count += 1;
1113 retry_meta.next_at = Some(next_at);
1114 scheduler.add_queue(id, retry_meta).await.unwrap_or(id);
1115 } else {
1116 error!(
1118 "Task {} failed after {} retries: {}",
1119 id, task_meta.retry_count, e
1120 );
1121 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1122 tx_finish.send(id).unwrap_or(());
1123 }
1124 } else {
1125 error!("Task {} failed: {}", id, e);
1127 store.update_task_error(id, &e.to_string(), None).await.unwrap_or(());
1128 tx_finish.send(id).unwrap_or(());
1129 }
1130 }
1131 }
1132 });
1133 }
1134
1135 pub async fn health_check(&self) -> ClResult<SchedulerHealth> {
1138 let waiting_count = lock!(self.tasks_waiting, "tasks_waiting")?.len();
1139 let scheduled_count = lock!(self.tasks_scheduled, "tasks_scheduled")?.len();
1140 let running_count = lock!(self.tasks_running, "tasks_running")?.len();
1141 let dependents_count = lock!(self.task_dependents, "task_dependents")?.len();
1142
1143 let mut stuck_tasks = Vec::new();
1145 let mut tasks_with_missing_deps = Vec::new();
1146
1147 {
1149 let waiting = lock!(self.tasks_waiting, "tasks_waiting")?;
1150 let _deps_map = lock!(self.task_dependents, "task_dependents")?;
1151
1152 for (id, task_meta) in waiting.iter() {
1153 if task_meta.deps.is_empty() {
1154 stuck_tasks.push(*id);
1155 warn!("SCHEDULER HEALTH: Task {} in waiting with no dependencies", id);
1156 } else {
1157 for dep in &task_meta.deps {
1159 let dep_exists = self
1161 .tasks_running
1162 .lock()
1163 .ok()
1164 .map(|r| r.contains_key(dep))
1165 .unwrap_or(false) || self
1166 .tasks_waiting
1167 .lock()
1168 .ok()
1169 .map(|w| w.contains_key(dep))
1170 .unwrap_or(false) || self
1171 .tasks_scheduled
1172 .lock()
1173 .ok()
1174 .map(|s| s.iter().any(|((_, task_id), _)| task_id == dep))
1175 .unwrap_or(false);
1176
1177 if !dep_exists {
1178 tasks_with_missing_deps.push((*id, *dep));
1179 warn!(
1180 "SCHEDULER HEALTH: Task {} depends on non-existent task {}",
1181 id, dep
1182 );
1183 }
1184 }
1185 }
1186 }
1187 }
1188
1189 Ok(SchedulerHealth {
1190 waiting: waiting_count,
1191 scheduled: scheduled_count,
1192 running: running_count,
1193 dependents: dependents_count,
1194 stuck_tasks,
1195 tasks_with_missing_deps,
1196 })
1197 }
1198}
1199
1200#[derive(Debug, Clone)]
1202pub struct SchedulerHealth {
1203 pub waiting: usize,
1205 pub scheduled: usize,
1207 pub running: usize,
1209 pub dependents: usize,
1211 pub stuck_tasks: Vec<TaskId>,
1213 pub tasks_with_missing_deps: Vec<(TaskId, TaskId)>,
1215}
1216
1217#[cfg(test)]
1218mod tests {
1219 use super::*;
1220 use serde::{Deserialize, Serialize};
1221
1222 type State = Arc<Mutex<Vec<u8>>>;
1223
1224 #[derive(Debug, Serialize, Deserialize)]
1225 struct TestTask {
1226 num: u8,
1227 }
1228
1229 impl TestTask {
1230 pub fn new(num: u8) -> Arc<Self> {
1231 Arc::new(Self { num })
1232 }
1233 }
1234
1235 #[async_trait]
1236 impl Task<State> for TestTask {
1237 fn kind() -> &'static str {
1238 "test"
1239 }
1240
1241 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1242 let num: u8 = ctx
1243 .parse()
1244 .map_err(|_| Error::Internal("test task context must be u8".into()))?;
1245 let task = TestTask::new(num);
1246 Ok(task)
1247 }
1248
1249 fn serialize(&self) -> String {
1250 self.num.to_string()
1251 }
1252
1253 fn kind_of(&self) -> &'static str {
1254 "test"
1255 }
1256
1257 async fn run(&self, state: &State) -> ClResult<()> {
1258 info!("Running task {}", self.num);
1259 tokio::time::sleep(std::time::Duration::from_millis(200 * self.num as u64)).await;
1260 info!("Completed task {}", self.num);
1261 state.lock().unwrap().push(self.num);
1262 Ok(())
1263 }
1264 }
1265
1266 #[derive(Debug, Serialize, Deserialize, Clone)]
1267 struct FailingTask {
1268 id: u8,
1269 fail_count: u8,
1270 attempt: Arc<Mutex<u8>>,
1271 }
1272
1273 impl FailingTask {
1274 pub fn new(id: u8, fail_count: u8) -> Arc<Self> {
1275 Arc::new(Self { id, fail_count, attempt: Arc::new(Mutex::new(0)) })
1276 }
1277 }
1278
1279 #[async_trait]
1280 impl Task<State> for FailingTask {
1281 fn kind() -> &'static str {
1282 "failing"
1283 }
1284
1285 fn build(_id: TaskId, ctx: &str) -> ClResult<Arc<dyn Task<State>>> {
1286 let parts: Vec<&str> = ctx.split(',').collect();
1287 if parts.len() != 2 {
1288 return Err(Error::Internal("failing task context must have 2 parts".into()));
1289 }
1290 let id: u8 = parts[0]
1291 .parse()
1292 .map_err(|_| Error::Internal("failing task id must be u8".into()))?;
1293 let fail_count: u8 = parts[1]
1294 .parse()
1295 .map_err(|_| Error::Internal("failing task fail_count must be u8".into()))?;
1296 Ok(FailingTask::new(id, fail_count))
1297 }
1298
1299 fn serialize(&self) -> String {
1300 format!("{},{}", self.id, self.fail_count)
1301 }
1302
1303 fn kind_of(&self) -> &'static str {
1304 "failing"
1305 }
1306
1307 async fn run(&self, state: &State) -> ClResult<()> {
1308 let mut attempt = self.attempt.lock().unwrap();
1309 *attempt += 1;
1310 let current_attempt = *attempt;
1311
1312 info!("FailingTask {} - attempt {}/{}", self.id, current_attempt, self.fail_count + 1);
1313
1314 if current_attempt <= self.fail_count {
1315 error!("FailingTask {} failed on attempt {}", self.id, current_attempt);
1316 return Err(Error::ServiceUnavailable(format!("Task {} failed", self.id)));
1317 }
1318
1319 info!("FailingTask {} succeeded on attempt {}", self.id, current_attempt);
1320 state.lock().unwrap().push(self.id);
1321 Ok(())
1322 }
1323 }
1324
1325 #[tokio::test]
1326 pub async fn test_scheduler() {
1327 let _ = tracing_subscriber::fmt().try_init();
1328
1329 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1330 let state: State = Arc::new(Mutex::new(Vec::new()));
1331 let scheduler = Scheduler::new(task_store);
1332 scheduler.start(state.clone());
1333 scheduler.register::<TestTask>().unwrap();
1334
1335 let _task1 = TestTask::new(1);
1336 let task2 = TestTask::new(1);
1337 let task3 = TestTask::new(1);
1338
1339 let task2_id = scheduler.task(task2).schedule_after(2).schedule().await.unwrap();
1340 let task3_id = scheduler.add(task3).await.unwrap();
1341 scheduler
1342 .task(TestTask::new(1))
1343 .depend_on(vec![task2_id, task3_id])
1344 .schedule()
1345 .await
1346 .unwrap();
1347
1348 tokio::time::sleep(std::time::Duration::from_secs(4)).await;
1349 let task4 = TestTask::new(1);
1350 let task5 = TestTask::new(1);
1351 scheduler.task(task4).schedule_after(2).schedule().await.unwrap();
1352 scheduler.task(task5).schedule_after(1).schedule().await.unwrap();
1353
1354 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1355
1356 let st = state.lock().unwrap();
1357 info!("res: {}", st.len());
1358 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1359 assert_eq!(str_vec.join(":"), "1:1:1:1:1");
1360 }
1361
1362 #[tokio::test]
1363 pub async fn test_retry_with_backoff() {
1364 let _ = tracing_subscriber::fmt().try_init();
1365
1366 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1367 let state: State = Arc::new(Mutex::new(Vec::new()));
1368 let scheduler = Scheduler::new(task_store);
1369 scheduler.start(state.clone());
1370 scheduler.register::<FailingTask>().unwrap();
1371
1372 let failing_task = FailingTask::new(42, 2);
1375 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1376
1377 scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1378
1379 tokio::time::sleep(std::time::Duration::from_secs(6)).await;
1386
1387 let st = state.lock().unwrap();
1388 assert_eq!(st.len(), 1, "Task should have succeeded after retries");
1389 assert_eq!(st[0], 42);
1390 }
1391
1392 #[tokio::test]
1395 pub async fn test_builder_simple_schedule() {
1396 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1397 let state: State = Arc::new(Mutex::new(Vec::new()));
1398 let scheduler = Scheduler::new(task_store);
1399 scheduler.start(state.clone());
1400 scheduler.register::<TestTask>().unwrap();
1401
1402 let task = TestTask::new(1);
1404 let id = scheduler.task(task).now().await.unwrap();
1405
1406 assert!(id > 0, "Task ID should be positive");
1407
1408 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1409
1410 let st = state.lock().unwrap();
1411 assert_eq!(st.len(), 1, "Task should have executed");
1412 assert_eq!(st[0], 1);
1413 }
1414
1415 #[tokio::test]
1416 pub async fn test_builder_with_key() {
1417 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1418 let state: State = Arc::new(Mutex::new(Vec::new()));
1419 let scheduler = Scheduler::new(task_store);
1420 scheduler.start(state.clone());
1421 scheduler.register::<TestTask>().unwrap();
1422
1423 let task = TestTask::new(1);
1425 let _id = scheduler.task(task).key("my-task-key").now().await.unwrap();
1426
1427 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1428
1429 let st = state.lock().unwrap();
1430 assert_eq!(st.len(), 1);
1431 assert_eq!(st[0], 1);
1432 }
1433
1434 #[tokio::test]
1435 pub async fn test_builder_with_delay() {
1436 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1437 let state: State = Arc::new(Mutex::new(Vec::new()));
1438 let scheduler = Scheduler::new(task_store);
1439 scheduler.start(state.clone());
1440 scheduler.register::<TestTask>().unwrap();
1441
1442 let task = TestTask::new(1);
1444 let _id = scheduler
1445 .task(task)
1446 .after(1) .await
1448 .unwrap();
1449
1450 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1452 {
1453 let st = state.lock().unwrap();
1454 assert_eq!(st.len(), 0, "Task should not execute yet");
1455 }
1456
1457 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1459
1460 {
1461 let st = state.lock().unwrap();
1462 assert_eq!(st.len(), 1, "Task should have executed");
1463 assert_eq!(st[0], 1);
1464 }
1465 }
1466
1467 #[tokio::test]
1468 pub async fn test_builder_with_dependencies() {
1469 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1470 let state: State = Arc::new(Mutex::new(Vec::new()));
1471 let scheduler = Scheduler::new(task_store);
1472 scheduler.start(state.clone());
1473 scheduler.register::<TestTask>().unwrap();
1474
1475 let task1 = TestTask::new(1);
1477 let id1 = scheduler.task(task1).now().await.unwrap();
1478
1479 let task2 = TestTask::new(1);
1481 let id2 = scheduler.task(task2).now().await.unwrap();
1482
1483 let task3 = TestTask::new(1);
1485 let _id3 = scheduler.task(task3).depend_on(vec![id1, id2]).schedule().await.unwrap();
1486
1487 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
1489
1490 let st = state.lock().unwrap();
1491 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1493 assert_eq!(str_vec.join(":"), "1:1:1");
1494 }
1495
1496 #[tokio::test]
1497 pub async fn test_builder_with_retry() {
1498 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1499 let state: State = Arc::new(Mutex::new(Vec::new()));
1500 let scheduler = Scheduler::new(task_store);
1501 scheduler.start(state.clone());
1502 scheduler.register::<FailingTask>().unwrap();
1503
1504 let failing_task = FailingTask::new(55, 1); let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1507
1508 let _id = scheduler.task(failing_task).with_retry(retry_policy).schedule().await.unwrap();
1509
1510 tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1512
1513 let st = state.lock().unwrap();
1514 assert_eq!(st.len(), 1);
1515 assert_eq!(st[0], 55);
1516 }
1517
1518 #[tokio::test]
1519 pub async fn test_builder_with_automatic_retry() {
1520 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1521 let state: State = Arc::new(Mutex::new(Vec::new()));
1522 let scheduler = Scheduler::new(task_store);
1523 scheduler.start(state.clone());
1524 scheduler.register::<FailingTask>().unwrap();
1525
1526 let failing_task = FailingTask::new(66, 1);
1528 let _id = scheduler.task(failing_task).with_automatic_retry().await.unwrap();
1529
1530 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1533
1534 let st = state.lock().unwrap();
1536 let _ = st.len(); }
1540
1541 #[tokio::test]
1542 pub async fn test_builder_fluent_chaining() {
1543 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1544 let state: State = Arc::new(Mutex::new(Vec::new()));
1545 let scheduler = Scheduler::new(task_store);
1546 scheduler.start(state.clone());
1547 scheduler.register::<TestTask>().unwrap();
1548
1549 let dep1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1551 let dep2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1552
1553 let retry_policy = RetryPolicy { wait_min_max: (1, 3600), times: 3 };
1555
1556 let task = TestTask::new(1);
1557 let _id = scheduler
1558 .task(task)
1559 .key("complex-task")
1560 .schedule_after(0) .depend_on(vec![dep1, dep2])
1562 .with_retry(retry_policy)
1563 .schedule()
1564 .await
1565 .unwrap();
1566
1567 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1568
1569 let st = state.lock().unwrap();
1570 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1572 assert_eq!(str_vec.join(":"), "1:1:1");
1573 }
1574
1575 #[tokio::test]
1576 pub async fn test_builder_backward_compatibility() {
1577 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1578 let state: State = Arc::new(Mutex::new(Vec::new()));
1579 let scheduler = Scheduler::new(task_store);
1580 scheduler.start(state.clone());
1581 scheduler.register::<TestTask>().unwrap();
1582
1583 let _id1 = scheduler.add(TestTask::new(1)).await.unwrap();
1585
1586 let _id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1588
1589 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1590
1591 let st = state.lock().unwrap();
1592 assert_eq!(st.len(), 2);
1594 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1595 assert_eq!(str_vec.join(":"), "1:1");
1596 }
1597
1598 #[tokio::test]
1601 pub async fn test_builder_pipeline_scenario() {
1602 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1604 let state: State = Arc::new(Mutex::new(Vec::new()));
1605 let scheduler = Scheduler::new(task_store);
1606 scheduler.start(state.clone());
1607 scheduler.register::<TestTask>().unwrap();
1608
1609 let id1 = scheduler.task(TestTask::new(1)).key("stage-1").now().await.unwrap();
1611
1612 let id2 = scheduler.task(TestTask::new(1)).key("stage-2").after_task(id1).await.unwrap();
1614
1615 let _id3 = scheduler.task(TestTask::new(1)).key("stage-3").after_task(id2).await.unwrap();
1617
1618 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1620
1621 let st = state.lock().unwrap();
1622 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1624 assert_eq!(str_vec.join(":"), "1:1:1");
1625 }
1626
1627 #[tokio::test]
1628 pub async fn test_builder_multi_dependency_join() {
1629 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1631 let state: State = Arc::new(Mutex::new(Vec::new()));
1632 let scheduler = Scheduler::new(task_store);
1633 scheduler.start(state.clone());
1634 scheduler.register::<TestTask>().unwrap();
1635
1636 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1638 let id2 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1639
1640 let _id3 = scheduler
1642 .task(TestTask::new(1))
1643 .depend_on(vec![id1, id2])
1644 .schedule()
1645 .await
1646 .unwrap();
1647
1648 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1649
1650 let st = state.lock().unwrap();
1651 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1653 assert_eq!(str_vec.join(":"), "1:1:1");
1654 }
1655
1656 #[tokio::test]
1657 pub async fn test_builder_scheduled_task_with_dependencies() {
1658 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1660 let state: State = Arc::new(Mutex::new(Vec::new()));
1661 let scheduler = Scheduler::new(task_store);
1662 scheduler.start(state.clone());
1663 scheduler.register::<TestTask>().unwrap();
1664
1665 let dep_id = scheduler.task(TestTask::new(1)).now().await.unwrap();
1667
1668 let ts = Timestamp::from_now(1);
1670 let _task_id = scheduler
1671 .task(TestTask::new(1))
1672 .schedule_at(ts)
1673 .depend_on(vec![dep_id])
1674 .schedule()
1675 .await
1676 .unwrap();
1677
1678 tokio::time::sleep(std::time::Duration::from_millis(300)).await;
1680 {
1681 let st = state.lock().unwrap();
1682 assert_eq!(st.len(), 1); }
1684
1685 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
1687
1688 {
1689 let st = state.lock().unwrap();
1690 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1691 assert_eq!(str_vec.join(":"), "1:1");
1692 }
1693 }
1694
1695 #[tokio::test]
1696 pub async fn test_builder_mixed_features() {
1697 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1699 let state: State = Arc::new(Mutex::new(Vec::new()));
1700 let scheduler = Scheduler::new(task_store);
1701 scheduler.start(state.clone());
1702 scheduler.register::<TestTask>().unwrap();
1703 scheduler.register::<FailingTask>().unwrap();
1704
1705 let id1 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1707
1708 let _id2 = scheduler
1710 .task(TestTask::new(1))
1711 .key("critical-task")
1712 .schedule_after(0)
1713 .depend_on(vec![id1])
1714 .schedule()
1715 .await
1716 .unwrap();
1717
1718 let _id3 = scheduler
1720 .task(FailingTask::new(1, 0)) .key("retryable-task")
1722 .with_retry(RetryPolicy {
1723 wait_min_max: (1, 3600),
1724 times: 3,
1725 })
1726 .schedule()
1727 .await
1728 .unwrap();
1729
1730 tokio::time::sleep(std::time::Duration::from_millis(1200)).await;
1732
1733 let st = state.lock().unwrap();
1734 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1736 assert_eq!(str_vec.join(":"), "1:1:1");
1737 }
1738
1739 #[tokio::test]
1740 pub async fn test_builder_builder_reuse_not_possible() {
1741 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1743 let _state: State = Arc::new(Mutex::new(Vec::new()));
1744 let scheduler = Scheduler::new(task_store);
1745
1746 let task = TestTask::new(1);
1747 let builder = scheduler.task(task);
1748
1749 let _id = builder.now().await.unwrap();
1755 }
1759
1760 #[tokio::test]
1761 pub async fn test_builder_different_task_types() {
1762 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1764 let state: State = Arc::new(Mutex::new(Vec::new()));
1765 let scheduler = Scheduler::new(task_store);
1766 scheduler.start(state.clone());
1767 scheduler.register::<TestTask>().unwrap();
1768 scheduler.register::<FailingTask>().unwrap();
1769
1770 let _id1 = scheduler.task(TestTask::new(1)).key("test-task").now().await.unwrap();
1772
1773 let _id2 = scheduler
1774 .task(FailingTask::new(1, 0)) .key("failing-task")
1776 .now()
1777 .await
1778 .unwrap();
1779
1780 let _id3 = scheduler.task(TestTask::new(1)).now().await.unwrap();
1781
1782 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1783
1784 let st = state.lock().unwrap();
1785 assert_eq!(st.len(), 3);
1786 let str_vec = st.iter().map(|x| x.to_string()).collect::<Vec<String>>();
1787 assert_eq!(str_vec.join(":"), "1:1:1");
1789 }
1790
1791 #[tokio::test]
1796 pub async fn test_builder_cron_placeholder_syntax() {
1797 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1799 let state: State = Arc::new(Mutex::new(Vec::new()));
1800 let scheduler = Scheduler::new(task_store);
1801 scheduler.start(state.clone());
1802 scheduler.register::<TestTask>().unwrap();
1803
1804 let task = TestTask::new(1);
1806 let _id = scheduler
1807 .task(task)
1808 .key("cron-task")
1809 .cron("0 9 * * *") .schedule()
1811 .await
1812 .unwrap();
1813
1814 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1818
1819 let st = state.lock().unwrap();
1820 assert_eq!(st.len(), 0); }
1824
1825 #[tokio::test]
1826 pub async fn test_builder_daily_at_placeholder() {
1827 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1829 let state: State = Arc::new(Mutex::new(Vec::new()));
1830 let scheduler = Scheduler::new(task_store);
1831 scheduler.start(state.clone());
1832 scheduler.register::<TestTask>().unwrap();
1833
1834 let task = TestTask::new(1);
1836 let _id = scheduler
1837 .task(task)
1838 .key("daily-task")
1839 .daily_at(14, 30) .schedule()
1841 .await
1842 .unwrap();
1843
1844 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1847
1848 let st = state.lock().unwrap();
1849 assert_eq!(st.len(), 0);
1852 }
1853
1854 #[tokio::test]
1855 pub async fn test_builder_weekly_at_placeholder() {
1856 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1858 let state: State = Arc::new(Mutex::new(Vec::new()));
1859 let scheduler = Scheduler::new(task_store);
1860 scheduler.start(state.clone());
1861 scheduler.register::<TestTask>().unwrap();
1862
1863 let task = TestTask::new(1);
1865 let _id = scheduler
1866 .task(task)
1867 .key("weekly-task")
1868 .weekly_at(1, 9, 0) .schedule()
1870 .await
1871 .unwrap();
1872
1873 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1876
1877 let st = state.lock().unwrap();
1878 assert_eq!(st.len(), 0);
1881 }
1882
1883 #[tokio::test]
1884 pub async fn test_builder_cron_with_retry() {
1885 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1887 let state: State = Arc::new(Mutex::new(Vec::new()));
1888 let scheduler = Scheduler::new(task_store);
1889 scheduler.start(state.clone());
1890 scheduler.register::<TestTask>().unwrap();
1891
1892 let task = TestTask::new(1);
1894 let _id = scheduler
1895 .task(task)
1896 .key("reliable-scheduled-task")
1897 .daily_at(2, 0) .with_retry(RetryPolicy {
1899 wait_min_max: (60, 3600),
1900 times: 5,
1901 })
1902 .schedule()
1903 .await
1904 .unwrap();
1905
1906 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
1909
1910 let st = state.lock().unwrap();
1911 assert_eq!(st.len(), 0);
1914 }
1915
1916 #[test]
1919 fn test_cron_to_string() {
1920 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
1922 assert_eq!(cron.to_cron_string(), "*/5 * * * *");
1923 }
1924
1925 #[tokio::test]
1926 pub async fn test_running_task_not_double_scheduled() {
1927 let _ = tracing_subscriber::fmt().try_init();
1928
1929 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1930 let state: State = Arc::new(Mutex::new(Vec::new()));
1931 let scheduler = Scheduler::new(task_store);
1932 scheduler.start(state.clone());
1933 scheduler.register::<TestTask>().unwrap();
1934
1935 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1938
1939 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1941
1942 {
1944 let running = scheduler.tasks_running.lock().unwrap();
1945 assert!(running.contains_key(&task_id), "Task should be in running queue");
1946 }
1947
1948 let task_meta = TaskMeta {
1950 task: task.clone(),
1951 next_at: Some(Timestamp::now()),
1952 deps: vec![],
1953 retry_count: 0,
1954 retry: None,
1955 cron: None,
1956 };
1957 let result = scheduler.add_queue(task_id, task_meta).await;
1958
1959 assert!(result.is_ok(), "add_queue should succeed");
1961
1962 {
1964 let scheduled = scheduler.tasks_scheduled.lock().unwrap();
1965 let in_scheduled = scheduled.iter().any(|((_, id), _)| *id == task_id);
1966 assert!(!in_scheduled, "Task should NOT be in scheduled queue while running");
1967 }
1968
1969 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1971
1972 let st = state.lock().unwrap();
1974 assert_eq!(st.len(), 1, "Only one task execution should have occurred");
1975 assert_eq!(st[0], 5);
1976 }
1977
1978 #[tokio::test]
1979 pub async fn test_running_task_metadata_updated() {
1980 let _ = tracing_subscriber::fmt().try_init();
1981
1982 let task_store: Arc<dyn TaskStore<State>> = InMemoryTaskStore::new();
1983 let state: State = Arc::new(Mutex::new(Vec::new()));
1984 let scheduler = Scheduler::new(task_store);
1985 scheduler.start(state.clone());
1986 scheduler.register::<TestTask>().unwrap();
1987
1988 let task = TestTask::new(5); let task_id = scheduler.add(task.clone()).await.unwrap();
1991
1992 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1994
1995 {
1997 let running = scheduler.tasks_running.lock().unwrap();
1998 let meta = running.get(&task_id).expect("Task should be running");
1999 assert!(meta.cron.is_none(), "Task should have no cron initially");
2000 }
2001
2002 let cron = CronSchedule::parse("*/5 * * * *").unwrap();
2004 let task_meta_with_cron = TaskMeta {
2005 task: task.clone(),
2006 next_at: Some(Timestamp::now()),
2007 deps: vec![],
2008 retry_count: 0,
2009 retry: None,
2010 cron: Some(cron.clone()),
2011 };
2012 let result = scheduler.add_queue(task_id, task_meta_with_cron).await;
2013
2014 assert!(result.is_ok(), "add_queue should succeed");
2016
2017 {
2019 let running = scheduler.tasks_running.lock().unwrap();
2020 let meta = running.get(&task_id).expect("Task should still be running");
2021 assert!(meta.cron.is_some(), "Task should now have cron after update");
2022 }
2023
2024 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
2026 }
2027}
2028
2029