persistent_scheduler/nativedb/
mod.rs

1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18    let mut models = Models::new();
19    models
20        .define::<TaskMetaEntity>()
21        .expect("Error to define TaskMetaEntity");
22    models
23});
24
25pub fn init_nativedb(
26    db_path: Option<String>,
27    cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29    let mut sys = sysinfo::System::new_all();
30    sys.refresh_memory();
31
32    let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34    let db_file = db_path.unwrap_or_else(|| {
35        std::env::temp_dir()
36            .join("tasks.db")
37            .to_string_lossy()
38            .into_owned()
39    });
40
41    let database = Builder::new()
42        .set_cache_size(cache_size as usize)
43        .create(&MODELS, db_file.as_str());
44
45    if let Err(e) = database {
46        error!("Error init native db {:?}", e);
47        return Err(SchedulerError::StoreInit);
48    }
49
50    let _ = DB.set(database.unwrap());
51    get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55    DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62    #[primary_key]
63    pub id: String, // Unique identifier for the task
64    #[secondary_key]
65    pub task_key: String, // Key to identify the specific task
66    pub task_params: String, // Arguments for the task, stored as a JSON object
67    #[secondary_key]
68    pub queue_name: String, // Name of the queue for the task
69    pub updated_at: i64,     // Timestamp of the last update
70    pub status: TaskStatus,  // Current status of the task
71    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
72    pub last_error: Option<String>, // Error message from the last execution, if any
73    /// Duration of last run in milliseconds, including retries
74    pub last_duration_ms: Option<usize>,
75    /// Number of retries before last completion (success or final failure)
76    pub last_retry_count: Option<usize>,
77    pub last_run: i64,                 // Timestamp of the last run
78    pub next_run: i64,                 // Timestamp of the next scheduled run
79    pub kind: TaskKindEntity,          // Type of the task
80    pub success_count: u32,            // Count of successful runs
81    pub failure_count: u32,            // Count of failed runs
82    pub runner_id: Option<String>,     // The ID of the current task runner, may be None
83    pub retry_strategy: Retry,         // Retry strategy for handling failures
84    pub retry_interval: u32,           // Interval for retrying the task
85    pub base_interval: u32,            // Base interval for exponential backoff
86    pub delay_seconds: u32,            //Delay before executing a Once task, specified in seconds
87    pub max_retries: Option<u32>,      // Maximum number of retries allowed
88    pub cron_schedule: Option<String>, // Cron expression for scheduling
89    pub cron_timezone: Option<String>, // Timezone for the cron schedule (stored as a string)
90    pub is_repeating: bool,            // Indicates if the task is repeating
91    pub repeat_interval: u32,          // Interval for repeating task
92    pub heartbeat_at: i64,             // Timestamp of the last heartbeat in milliseconds
93    pub created_at: i64,               // Timestamp of the task creation
94}
95
96#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
97/// Defines the type of task to be executed.
98pub enum TaskKindEntity {
99    /// Represents a cron job, which is scheduled to run at specific intervals.
100    Cron,
101
102    /// Represents a repeated job that runs at a regular interval.
103    Repeat,
104
105    /// Represents a one-time job that runs once and then completes.
106    #[default]
107    Once,
108}
109
110impl TaskMetaEntity {
111    pub fn clean_up(&self) -> String {
112        let result = match self.kind {
113            TaskKindEntity::Cron | TaskKindEntity::Repeat => {
114                matches!(self.status, TaskStatus::Removed)
115            }
116            TaskKindEntity::Once => matches!(
117                self.status,
118                TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
119            ),
120        };
121        result.to_string()
122    }
123
124    pub fn candidate_task(&self) -> String {
125        let result = match self.kind {
126            TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
127                self.status,
128                TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
129            ),
130            TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
131        };
132        result.to_string()
133    }
134}
135
136impl From<TaskMetaEntity> for TaskMeta {
137    fn from(entity: TaskMetaEntity) -> Self {
138        TaskMeta {
139            id: entity.id,
140            task_key: entity.task_key,
141            task_params: entity.task_params,
142            queue_name: entity.queue_name,
143            updated_at: entity.updated_at,
144            created_at: entity.created_at,
145            status: entity.status,
146            stopped_reason: entity.stopped_reason,
147            last_error: entity.last_error,
148            last_duration_ms: entity.last_duration_ms,
149            last_retry_count: entity.last_retry_count,
150            last_run: entity.last_run,
151            next_run: entity.next_run,
152            kind: match entity.kind {
153                TaskKindEntity::Cron => TaskKind::Cron {
154                    schedule: entity
155                        .cron_schedule
156                        .expect("Cron schedule is required for cron kind!"),
157                    timezone: entity
158                        .cron_timezone
159                        .expect("Cron timezone is required for cron kind!"),
160                },
161                TaskKindEntity::Repeat => TaskKind::Repeat {
162                    interval_seconds: entity.repeat_interval,
163                },
164                TaskKindEntity::Once => TaskKind::Once,
165            },
166            success_count: entity.success_count,
167            failure_count: entity.failure_count,
168            runner_id: entity.runner_id,
169            retry_strategy: entity.retry_strategy,
170            retry_interval: entity.retry_interval,
171            base_interval: entity.base_interval,
172            delay_seconds: entity.delay_seconds,
173            max_retries: entity.max_retries,
174            is_repeating: entity.is_repeating,
175            heartbeat_at: entity.heartbeat_at,
176        }
177    }
178}
179
180impl From<TaskMeta> for TaskMetaEntity {
181    fn from(entity: TaskMeta) -> Self {
182        let kind;
183        let cron_schedule;
184        let cron_timezone;
185        let repeat_interval;
186
187        match entity.kind {
188            TaskKind::Cron { schedule, timezone } => {
189                kind = TaskKindEntity::Cron;
190                cron_schedule = Some(schedule);
191                cron_timezone = Some(timezone);
192                repeat_interval = 0;
193            }
194            TaskKind::Repeat { interval_seconds } => {
195                kind = TaskKindEntity::Repeat;
196                cron_schedule = None;
197                cron_timezone = None;
198                repeat_interval = interval_seconds;
199            }
200            TaskKind::Once => {
201                kind = TaskKindEntity::Once;
202                cron_schedule = None;
203                cron_timezone = None;
204                repeat_interval = 0;
205            }
206        }
207
208        TaskMetaEntity {
209            id: entity.id,
210            task_key: entity.task_key,
211            task_params: entity.task_params,
212            queue_name: entity.queue_name,
213            updated_at: entity.updated_at,
214            created_at: entity.created_at,
215            status: entity.status,
216            stopped_reason: entity.stopped_reason,
217            last_error: entity.last_error,
218            last_duration_ms: entity.last_duration_ms,
219            last_retry_count: entity.last_retry_count,
220            last_run: entity.last_run,
221            next_run: entity.next_run,
222            kind,
223            success_count: entity.success_count,
224            failure_count: entity.failure_count,
225            runner_id: entity.runner_id,
226            retry_strategy: entity.retry_strategy,
227            retry_interval: entity.retry_interval,
228            base_interval: entity.base_interval,
229            delay_seconds: entity.delay_seconds,
230            max_retries: entity.max_retries,
231            cron_schedule,
232            cron_timezone,
233            is_repeating: entity.is_repeating,
234            repeat_interval,
235            heartbeat_at: entity.heartbeat_at,
236        }
237    }
238}
239
240#[cfg(test)]
241mod test {
242    use std::{fs, path::Path, time::Duration};
243
244    use crate::nativedb::{init_nativedb, TaskMetaEntity};
245    use itertools::Itertools;
246
247    fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
248        let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
249        if Path::new(&temp_db_path).exists() {
250            fs::remove_file(temp_db_path)?;
251            println!("File 'polly-scheduler.db' has been deleted.");
252        } else {
253            println!("File 'polly-scheduler.db' does not exist.");
254        }
255
256        Ok(())
257    }
258
259    #[tokio::test]
260    async fn delete_db_file() {
261        delete_temp_db().unwrap();
262        tokio::time::sleep(Duration::from_secs(3)).await;
263    }
264
265    #[tokio::test]
266    async fn test() {
267        let db = init_nativedb(None, None).unwrap();
268        let r = db.r_transaction().unwrap();
269
270        let list: Vec<TaskMetaEntity> = r
271            .scan()
272            .primary()
273            .unwrap()
274            .all()
275            .unwrap()
276            .try_collect()
277            .unwrap();
278
279        println!("{:#?}", list);
280    }
281}