persistent_scheduler/core/
store.rs1use crate::{
2 core::model::{TaskMeta, TaskStatus},
3 utc_now,
4};
5use ahash::AHashMap;
6use async_trait::async_trait;
7use std::{error::Error, sync::Arc};
8use thiserror::Error;
9use tokio::sync::RwLock;
10use crate::core::task_kind::TaskKind;
11
12#[async_trait::async_trait]
13pub trait TaskStore: Clone + Send {
14 type Error: Error + Send + Sync;
15
16 async fn restore_tasks(&self) -> Result<(), Self::Error>;
33
34 async fn get(&self, task_id: &str) -> Result<Option<TaskMeta>, Self::Error>;
44
45 async fn list(&self) -> Result<Vec<TaskMeta>, Self::Error>;
51
52 async fn store_task(&self, task: TaskMeta) -> Result<(), Self::Error>;
62
63 async fn store_tasks(&self, tasks: Vec<TaskMeta>) -> Result<(), Self::Error>;
73
74 async fn fetch_pending_tasks(&self) -> Result<Vec<TaskMeta>, Self::Error>;
86
87 async fn update_task_execution_status(
100 &self,
101 task_id: &str,
102 is_success: bool,
103 last_error: Option<String>,
104 next_run: Option<i64>,
105 ) -> Result<(), Self::Error>;
106
107 async fn heartbeat(&self, task_id: &str, runner_id: &str) -> Result<(), Self::Error>;
118
119 async fn set_task_stopped(&self, task_id: &str) -> Result<(), Self::Error>;
129
130 async fn set_task_removed(&self, task_id: &str) -> Result<(), Self::Error>;
140
141 async fn cleanup(&self) -> Result<(), Self::Error>;
147}
148
149#[derive(Error, Debug)]
150pub enum InMemoryTaskStoreError {
151 #[error("Task not found")]
152 TaskNotFound,
153 #[error("Task ID conflict: The task with ID '{0}' already exists.")]
154 TaskIdConflict(String),
155}
156
157#[derive(Clone, Default)]
158pub struct InMemoryTaskStore {
159 tasks: Arc<RwLock<AHashMap<String, TaskMeta>>>,
160}
161
162impl InMemoryTaskStore {
163 pub fn new() -> Self {
165 Self {
166 tasks: Arc::new(RwLock::new(AHashMap::new())),
167 }
168 }
169}
170
171pub fn is_candidate_task(kind: &TaskKind, status: &TaskStatus) -> bool {
173 match kind {
174 TaskKind::Cron { .. } | TaskKind::Repeat { .. } => matches!(
175 status,
176 TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
177 ),
178 TaskKind::Once => *status == TaskStatus::Scheduled,
179 }
180}
181
182#[async_trait]
183impl TaskStore for InMemoryTaskStore {
184 type Error = InMemoryTaskStoreError;
185
186 async fn restore_tasks(&self) -> Result<(), Self::Error> {
187 Ok(())
188 }
189
190 async fn get(&self, task_id: &str) -> Result<Option<TaskMeta>, Self::Error> {
191 let tasks = self.tasks.read().await;
192 Ok(tasks.get(task_id).cloned())
193 }
194
195 async fn list(&self) -> Result<Vec<TaskMeta>, Self::Error> {
196 let tasks = self.tasks.read().await;
197 Ok(tasks.values().cloned().collect())
198 }
199
200 async fn store_task(&self, task: TaskMeta) -> Result<(), Self::Error> {
201 let mut tasks = self.tasks.write().await;
202 if tasks.contains_key(&task.id) {
203 return Err(InMemoryTaskStoreError::TaskIdConflict(task.id.clone()));
204 }
205 tasks.insert(task.id.clone(), task);
206 Ok(())
207 }
208
209 async fn store_tasks(&self, tasks: Vec<TaskMeta>) -> Result<(), Self::Error> {
210 let mut w_tasks = self.tasks.write().await;
211 for task in tasks {
212 if w_tasks.contains_key(&task.id) {
213 return Err(InMemoryTaskStoreError::TaskIdConflict(task.id.clone()));
214 }
215 w_tasks.insert(task.id.clone(), task);
216 }
217 Ok(())
218 }
219
220 async fn fetch_pending_tasks(&self) -> Result<Vec<TaskMeta>, Self::Error> {
221 let mut tasks = self.tasks.write().await;
222 let mut result = Vec::new();
223 for task in tasks.values_mut() {
224 if is_candidate_task(&task.kind, &task.status) && task.next_run <= utc_now!() {
225 let t = task.clone();
226 task.status = TaskStatus::Running;
227 task.updated_at = utc_now!();
228 result.push(t);
229 }
230 }
231 Ok(result)
232 }
233
234 async fn update_task_execution_status(
235 &self,
236 task_id: &str,
237 is_success: bool,
238 last_error: Option<String>,
239 next_run: Option<i64>, ) -> Result<(), Self::Error> {
241 let mut tasks = self.tasks.write().await;
242
243 let task = tasks
244 .get_mut(task_id)
245 .ok_or(InMemoryTaskStoreError::TaskNotFound)?;
246
247 if task.status == TaskStatus::Stopped || task.status == TaskStatus::Removed {
248 return Ok(());
249 }
250
251 if is_success {
252 task.success_count += 1;
253 task.status = TaskStatus::Success;
254 } else {
255 task.failure_count += 1;
256 task.status = TaskStatus::Failed;
257 task.last_error = last_error;
258 }
259
260 if let Some(next_run_time) = next_run {
261 println!("now to set next_run={}", next_run_time);
262 task.last_run = task.next_run;
263 task.next_run = next_run_time;
264 }
265
266 task.updated_at = utc_now!();
267
268 Ok(())
269 }
270
271 async fn heartbeat(&self, task_id: &str, runner_id: &str) -> Result<(), Self::Error> {
272 let mut tasks = self.tasks.write().await;
273 if let Some(task) = tasks.get_mut(task_id) {
274 task.heartbeat_at = utc_now!();
275 task.runner_id = Some(runner_id.to_string());
276 Ok(())
277 } else {
278 Err(InMemoryTaskStoreError::TaskNotFound)
279 }
280 }
281
282 async fn set_task_stopped(&self, task_id: &str) -> Result<(), Self::Error> {
283 let mut tasks = self.tasks.write().await;
284 if let Some(task) = tasks.get_mut(task_id) {
285 task.updated_at = utc_now!();
286 task.status = TaskStatus::Stopped;
287 Ok(())
288 } else {
289 Err(InMemoryTaskStoreError::TaskNotFound)
290 }
291 }
292
293 async fn set_task_removed(&self, task_id: &str) -> Result<(), Self::Error> {
294 let mut tasks = self.tasks.write().await;
295 if let Some(task) = tasks.get_mut(task_id) {
296 task.updated_at = utc_now!();
297 task.status = TaskStatus::Removed;
298 Ok(())
299 } else {
300 Err(InMemoryTaskStoreError::TaskNotFound)
301 }
302 }
303
304 async fn cleanup(&self) -> Result<(), Self::Error> {
305 let mut tasks = self.tasks.write().await;
306 tasks.retain(|_, task| task.status != TaskStatus::Removed);
307 Ok(())
308 }
309}