persistent_scheduler/core/
store.rs

1use crate::{
2    core::model::{TaskMeta, TaskStatus},
3    utc_now,
4};
5use ahash::AHashMap;
6use async_trait::async_trait;
7use std::{error::Error, sync::Arc};
8use thiserror::Error;
9use tokio::sync::RwLock;
10use crate::core::task_kind::TaskKind;
11
12#[async_trait::async_trait]
13pub trait TaskStore: Clone + Send {
14    type Error: Error + Send + Sync;
15
16    /// Restores task states by cleaning up all tasks in a running state and handling their next run times.
17    ///
18    /// This method performs the following actions:
19    /// - Cleans up all tasks that are currently in the `Running` state and may handle their `next_run` fields.
20    /// - Additional restoration logic can be added within this method.
21    ///
22    /// # Returns
23    /// Returns a `Result`, which is `Ok(())` if the operation succeeds; otherwise, it returns the appropriate error.
24    ///
25    /// # Examples
26    /// ```
27    /// # async fn example() {
28    /// #     let store = TaskStore::new();
29    /// #     store.restore_tasks().await.unwrap();
30    /// # }
31    /// ```
32    async fn restore_tasks(&self) -> Result<(), Self::Error>;
33
34    /// Retrieves task metadata based on the task ID.
35    ///
36    /// # Arguments
37    ///
38    /// * `task_id`: A unique identifier for the task.
39    ///
40    /// # Returns
41    ///
42    /// Returns an `Option<TaskMetaEntity>`. If the task is found, it returns `Some(TaskMetaEntity)`, otherwise it returns `None`.
43    async fn get(&self, task_id: &str) -> Result<Option<TaskMeta>, Self::Error>;
44
45    /// Lists all task metadata.
46    ///
47    /// # Returns
48    ///
49    /// Returns a vector containing all task metadata.
50    async fn list(&self) -> Result<Vec<TaskMeta>, Self::Error>;
51
52    /// Stores task metadata.
53    ///
54    /// # Arguments
55    ///
56    /// * `task`: The task metadata to be stored.
57    ///
58    /// # Returns
59    ///
60    /// Returns `Ok(())` if the task is successfully stored; returns an error if the task ID already exists.
61    async fn store_task(&self, task: TaskMeta) -> Result<(), Self::Error>;
62
63    /// Stores tasks metadata.
64    ///
65    /// # Arguments
66    ///
67    /// * `tasks`: The task metadata to be stored.
68    ///
69    /// # Returns
70    ///
71    /// Returns `Ok(())` if the tasks is successfully stored; returns an error if any task ID already exists.
72    async fn store_tasks(&self, tasks: Vec<TaskMeta>) -> Result<(), Self::Error>;
73
74    /// Fetches all pending tasks from the store.
75    ///
76    /// # Returns
77    ///
78    /// Returns a `Result` containing a `Vec<TaskMeta>` if successful, or an error of type `Self::Error` if fetching tasks fails.
79    ///
80    /// The returned `Vec<TaskMeta>` contains all tasks that are currently in a pending state, ready for processing.
81    ///
82    /// # Errors
83    ///
84    /// This function will return an error of type `Self::Error` if there is an issue querying the task store.
85    async fn fetch_pending_tasks(&self) -> Result<Vec<TaskMeta>, Self::Error>;
86
87    /// Updates the execution status of a task.
88    ///
89    /// # Arguments
90    ///
91    /// * `task_id`: The ID of the task to update.
92    /// * `is_success`: A boolean indicating whether the task succeeded.
93    /// * `last_error`: An optional string containing the last error message (if applicable).
94    /// * `next_run`: An optional timestamp for the next scheduled run of the task.
95    ///
96    /// # Returns
97    ///
98    /// Returns `Ok(())` if the update is successful; returns an error if the task is not found or if it is stopped or removed.
99    async fn update_task_execution_status(
100        &self,
101        task_id: &str,
102        is_success: bool,
103        last_error: Option<String>,
104        next_run: Option<i64>,
105    ) -> Result<(), Self::Error>;
106
107    /// Updates the heartbeat for a task.
108    ///
109    /// # Arguments
110    ///
111    /// * `task_id`: The ID of the task to update.
112    /// * `runner_id`: The ID of the runner that is currently executing the task.
113    ///
114    /// # Returns
115    ///
116    /// Returns `Ok(())` if the update is successful; returns an error if the task is not found.
117    async fn heartbeat(&self, task_id: &str, runner_id: &str) -> Result<(), Self::Error>;
118
119    /// Marks a task as stopped.
120    ///
121    /// # Arguments
122    ///
123    /// * `task_id`: The ID of the task to mark as stopped.
124    ///
125    /// # Returns
126    ///
127    /// Returns `Ok(())` if the task is successfully marked; returns an error if the task is not found.
128    async fn set_task_stopped(&self, task_id: &str) -> Result<(), Self::Error>;
129
130    /// Marks a task as removed.
131    ///
132    /// # Arguments
133    ///
134    /// * `task_id`: The ID of the task to mark as removed.
135    ///
136    /// # Returns
137    ///
138    /// Returns `Ok(())` if the task is successfully marked; returns an error if the task is not found.
139    async fn set_task_removed(&self, task_id: &str) -> Result<(), Self::Error>;
140
141    /// Cleans up the task store by removing tasks marked as removed.
142    ///
143    /// # Returns
144    ///
145    /// Returns `Ok(())` if the cleanup is successful.
146    async fn cleanup(&self) -> Result<(), Self::Error>;
147}
148
149#[derive(Error, Debug)]
150pub enum InMemoryTaskStoreError {
151    #[error("Task not found")]
152    TaskNotFound,
153    #[error("Task ID conflict: The task with ID '{0}' already exists.")]
154    TaskIdConflict(String),
155}
156
157#[derive(Clone, Default)]
158pub struct InMemoryTaskStore {
159    tasks: Arc<RwLock<AHashMap<String, TaskMeta>>>,
160}
161
162impl InMemoryTaskStore {
163    /// Creates a new instance of `InMemoryTaskStore`.
164    pub fn new() -> Self {
165        Self {
166            tasks: Arc::new(RwLock::new(AHashMap::new())),
167        }
168    }
169}
170
171/// Determines if a task can be executed based on its kind and status.
172pub fn is_candidate_task(kind: &TaskKind, status: &TaskStatus) -> bool {
173    match kind {
174        TaskKind::Cron { .. } | TaskKind::Repeat { .. } => matches!(
175            status,
176            TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
177        ),
178        TaskKind::Once => *status == TaskStatus::Scheduled,
179    }
180}
181
182#[async_trait]
183impl TaskStore for InMemoryTaskStore {
184    type Error = InMemoryTaskStoreError;
185
186    async fn restore_tasks(&self) -> Result<(), Self::Error> {
187        Ok(())
188    }
189
190    async fn get(&self, task_id: &str) -> Result<Option<TaskMeta>, Self::Error> {
191        let tasks = self.tasks.read().await;
192        Ok(tasks.get(task_id).cloned())
193    }
194
195    async fn list(&self) -> Result<Vec<TaskMeta>, Self::Error> {
196        let tasks = self.tasks.read().await;
197        Ok(tasks.values().cloned().collect())
198    }
199
200    async fn store_task(&self, task: TaskMeta) -> Result<(), Self::Error> {
201        let mut tasks = self.tasks.write().await;
202        if tasks.contains_key(&task.id) {
203            return Err(InMemoryTaskStoreError::TaskIdConflict(task.id.clone()));
204        }
205        tasks.insert(task.id.clone(), task);
206        Ok(())
207    }
208
209    async fn store_tasks(&self, tasks: Vec<TaskMeta>) -> Result<(), Self::Error> {
210        let mut w_tasks = self.tasks.write().await;
211        for task in tasks {
212            if w_tasks.contains_key(&task.id) {
213                return Err(InMemoryTaskStoreError::TaskIdConflict(task.id.clone()));
214            }
215            w_tasks.insert(task.id.clone(), task);
216        }
217        Ok(())
218    }
219
220    async fn fetch_pending_tasks(&self) -> Result<Vec<TaskMeta>, Self::Error> {
221        let mut tasks = self.tasks.write().await;
222        let mut result = Vec::new();
223        for task in tasks.values_mut() {
224            if is_candidate_task(&task.kind, &task.status) && task.next_run <= utc_now!() {
225                let t = task.clone();
226                task.status = TaskStatus::Running;
227                task.updated_at = utc_now!();
228                result.push(t);
229            }
230        }
231        Ok(result)
232    }
233
234    async fn update_task_execution_status(
235        &self,
236        task_id: &str,
237        is_success: bool,
238        last_error: Option<String>,
239        next_run: Option<i64>, // when is None?
240    ) -> Result<(), Self::Error> {
241        let mut tasks = self.tasks.write().await;
242
243        let task = tasks
244            .get_mut(task_id)
245            .ok_or(InMemoryTaskStoreError::TaskNotFound)?;
246
247        if task.status == TaskStatus::Stopped || task.status == TaskStatus::Removed {
248            return Ok(());
249        }
250
251        if is_success {
252            task.success_count += 1;
253            task.status = TaskStatus::Success;
254        } else {
255            task.failure_count += 1;
256            task.status = TaskStatus::Failed;
257            task.last_error = last_error;
258        }
259
260        if let Some(next_run_time) = next_run {
261            println!("now to set next_run={}", next_run_time);
262            task.last_run = task.next_run;
263            task.next_run = next_run_time;
264        }
265
266        task.updated_at = utc_now!();
267
268        Ok(())
269    }
270
271    async fn heartbeat(&self, task_id: &str, runner_id: &str) -> Result<(), Self::Error> {
272        let mut tasks = self.tasks.write().await;
273        if let Some(task) = tasks.get_mut(task_id) {
274            task.heartbeat_at = utc_now!();
275            task.runner_id = Some(runner_id.to_string());
276            Ok(())
277        } else {
278            Err(InMemoryTaskStoreError::TaskNotFound)
279        }
280    }
281
282    async fn set_task_stopped(&self, task_id: &str) -> Result<(), Self::Error> {
283        let mut tasks = self.tasks.write().await;
284        if let Some(task) = tasks.get_mut(task_id) {
285            task.updated_at = utc_now!();
286            task.status = TaskStatus::Stopped;
287            Ok(())
288        } else {
289            Err(InMemoryTaskStoreError::TaskNotFound)
290        }
291    }
292
293    async fn set_task_removed(&self, task_id: &str) -> Result<(), Self::Error> {
294        let mut tasks = self.tasks.write().await;
295        if let Some(task) = tasks.get_mut(task_id) {
296            task.updated_at = utc_now!();
297            task.status = TaskStatus::Removed;
298            Ok(())
299        } else {
300            Err(InMemoryTaskStoreError::TaskNotFound)
301        }
302    }
303
304    async fn cleanup(&self) -> Result<(), Self::Error> {
305        let mut tasks = self.tasks.write().await;
306        tasks.retain(|_, task| task.status != TaskStatus::Removed);
307        Ok(())
308    }
309}