persistent_scheduler/core/
context.rs1use crate::core::cleaner::TaskCleaner;
2use crate::core::cron::next_run;
3use crate::core::handlers::TaskHandlers;
4use crate::core::model::TaskMeta;
5use crate::core::status_updater::TaskStatusUpdater;
6use crate::core::store::TaskStore;
7use crate::core::task::Task;
8use crate::core::task_kind::TaskKind;
9use crate::utc_now;
10use ahash::AHashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use super::flow::TaskFlow;
15
16pub struct TaskContext<S>
17where
18 S: TaskStore + Send + Sync + Clone + 'static, {
20 queue_concurrency: AHashMap<String, usize>, handlers: TaskHandlers, store: Arc<S>, }
24
25impl<S> TaskContext<S>
26where
27 S: TaskStore + Send + Sync + Clone + 'static, {
29 pub fn new(store: S) -> Self {
31 let store = Arc::new(store);
32 Self {
33 queue_concurrency: AHashMap::new(), handlers: TaskHandlers::new(), store: store.clone(), }
37 }
38
39 pub fn with_arc_store(store: Arc<S>) -> Self {
41 Self {
42 queue_concurrency: AHashMap::new(), handlers: TaskHandlers::new(), store, }
46 }
47
48 pub fn register<T>(mut self) -> Self
50 where
51 T: Task, {
53 self.handlers.register::<T>(); self.queue_concurrency.insert(T::TASK_QUEUE.to_owned(), 4); self
56 }
57
58 pub fn set_concurrency(mut self, queue: &str, count: usize) -> Self {
60 self.queue_concurrency.insert(queue.to_owned(), count); self
62 }
63
64 fn start_task_cleaner(&self) {
66 let cleaner = Arc::new(TaskCleaner::new(self.store.clone())); cleaner.start(Duration::from_secs(60 * 10)); }
69
70 async fn start_flow(&self) {
72 let status_updater = Arc::new(TaskStatusUpdater::new(
73 self.store.clone(),
74 self.queue_concurrency.len(),
75 ));
76
77 let flow = Arc::new(TaskFlow::new(
78 self.store.clone(),
79 &self.queue_concurrency,
80 Arc::new(self.handlers.clone()),
81 status_updater,
82 ));
83
84 flow.start().await;
85 }
86
87 pub async fn start(self) -> Self {
89 self.start_flow().await; self
91 }
92
93 pub async fn start_with_cleaner(self) -> Self {
95 self.start_flow().await; self.start_task_cleaner(); self
98 }
99
100 pub async fn add_task<T>(
102 &self,
103 task: T,
104 kind: TaskKind,
105 delay_seconds: Option<u32>,
106 ) -> Result<(), String>
107 where
108 T: Task + Send + Sync + 'static, {
110 let mut task_meta = task.new_meta(kind); let next_run = match &task_meta.kind {
112 TaskKind::Once | TaskKind::Repeat { .. } => {
113 let delay_seconds = delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
114 utc_now!() + delay_seconds as i64
115 } TaskKind::Cron { schedule, timezone } => {
117 next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
119 format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
120 })?
121 }
122 };
123
124 task_meta.next_run = next_run;
125 task_meta.last_run = next_run;
126 self.store
127 .store_task(task_meta) .await
129 .map_err(|e| e.to_string()) }
131
132 pub async fn add_tasks<T>(&self, tasks: Vec<TaskConfiguration<T>>) -> Result<(), String>
134 where
135 T: Task + Send + Sync + 'static, {
137 let mut batch: Vec<TaskMeta> = Vec::new();
138
139 for task in tasks {
140 let mut task_meta = task.inner.new_meta(task.kind); let next_run = match &task_meta.kind {
142 TaskKind::Once | TaskKind::Repeat { .. } => {
143 let delay_seconds =
144 task.delay_seconds.unwrap_or(task_meta.delay_seconds) * 1000;
145 utc_now!() + delay_seconds as i64
146 } TaskKind::Cron { schedule, timezone } => {
148 next_run(&*schedule, &*timezone, 0).ok_or_else(|| {
150 format!("Failed to calculate next run for cron task '{}': invalid schedule or timezone", T::TASK_KEY)
151 })?
152 }
153 };
154
155 task_meta.next_run = next_run;
156 task_meta.last_run = next_run;
157 batch.push(task_meta);
158 }
159
160 self.store
161 .store_tasks(batch) .await
163 .map_err(|e| e.to_string()) }
165}
166
167pub struct TaskConfiguration<T: Task> {
168 pub inner: T,
169 pub kind: TaskKind,
170 pub delay_seconds: Option<u32>,
171}