persistent_scheduler/nativedb/
mod.rs

1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18    let mut models = Models::new();
19    models
20        .define::<TaskMetaEntity>()
21        .expect("Error to define TaskMetaEntity");
22    models
23});
24
25pub fn init_nativedb(
26    db_path: Option<String>,
27    cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29    let mut sys = sysinfo::System::new_all();
30    sys.refresh_memory();
31
32    let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34    let db_file = db_path.unwrap_or_else(|| {
35        std::env::temp_dir()
36            .join("tasks.db")
37            .to_string_lossy()
38            .into_owned()
39    });
40
41    let database = Builder::new()
42        .set_cache_size(cache_size as usize)
43        .create(&MODELS, db_file.as_str());
44
45    if let Err(e) = database {
46        error!("Error init native db {:?}", e);
47        return Err(SchedulerError::StoreInit);
48    }
49
50    let _ = DB.set(database.unwrap());
51    get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55    DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62    #[primary_key]
63    pub id: String, // Unique identifier for the task
64    #[secondary_key]
65    pub task_key: String, // Key to identify the specific task
66    pub task_params: String, // Arguments for the task, stored as a JSON object
67    #[secondary_key]
68    pub queue_name: String, // Name of the queue for the task
69    pub updated_at: i64,     // Timestamp of the last update
70    pub status: TaskStatus,  // Current status of the task
71    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
72    pub last_error: Option<String>, // Error message from the last execution, if any
73    pub last_run: i64,       // Timestamp of the last run
74    pub next_run: i64,       // Timestamp of the next scheduled run
75    pub kind: TaskKindEntity, // Type of the task
76    pub success_count: u32,  // Count of successful runs
77    pub failure_count: u32,  // Count of failed runs
78    pub runner_id: Option<String>, // The ID of the current task runner, may be None
79    pub retry_strategy: Retry, // Retry strategy for handling failures
80    pub retry_interval: u32, // Interval for retrying the task
81    pub base_interval: u32,  // Base interval for exponential backoff
82    pub delay_seconds: u32,  //Delay before executing a Once task, specified in seconds
83    pub max_retries: Option<u32>, // Maximum number of retries allowed
84    pub cron_schedule: Option<String>, // Cron expression for scheduling
85    pub cron_timezone: Option<String>, // Timezone for the cron schedule (stored as a string)
86    pub is_repeating: bool,  // Indicates if the task is repeating
87    pub repeat_interval: u32, // Interval for repeating task
88    pub heartbeat_at: i64,   // Timestamp of the last heartbeat in milliseconds
89    pub created_at: i64,     // Timestamp of the task creation
90}
91
92#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
93/// Defines the type of task to be executed.
94pub enum TaskKindEntity {
95    /// Represents a cron job, which is scheduled to run at specific intervals.
96    Cron,
97
98    /// Represents a repeated job that runs at a regular interval.
99    Repeat,
100
101    /// Represents a one-time job that runs once and then completes.
102    #[default]
103    Once,
104}
105
106impl TaskMetaEntity {
107    pub fn clean_up(&self) -> String {
108        let result = match self.kind {
109            TaskKindEntity::Cron | TaskKindEntity::Repeat => {
110                matches!(self.status, TaskStatus::Removed)
111            }
112            TaskKindEntity::Once => matches!(
113                self.status,
114                TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
115            ),
116        };
117        result.to_string()
118    }
119
120    pub fn candidate_task(&self) -> String {
121        let result = match self.kind {
122            TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
123                self.status,
124                TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
125            ),
126            TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
127        };
128        result.to_string()
129    }
130}
131
132impl From<TaskMetaEntity> for TaskMeta {
133    fn from(entity: TaskMetaEntity) -> Self {
134        TaskMeta {
135            id: entity.id,
136            task_key: entity.task_key,
137            task_params: entity.task_params,
138            queue_name: entity.queue_name,
139            updated_at: entity.updated_at,
140            created_at: entity.created_at,
141            status: entity.status,
142            stopped_reason: entity.stopped_reason,
143            last_error: entity.last_error,
144            last_run: entity.last_run,
145            next_run: entity.next_run,
146            kind: match entity.kind {
147                TaskKindEntity::Cron => TaskKind::Cron {
148                    schedule: entity
149                        .cron_schedule
150                        .expect("Cron schedule is required for cron kind!"),
151                    timezone: entity
152                        .cron_timezone
153                        .expect("Cron timezone is required for cron kind!"),
154                },
155                TaskKindEntity::Repeat => TaskKind::Repeat {
156                    interval_seconds: entity.repeat_interval,
157                },
158                TaskKindEntity::Once => TaskKind::Once,
159            },
160            success_count: entity.success_count,
161            failure_count: entity.failure_count,
162            runner_id: entity.runner_id,
163            retry_strategy: entity.retry_strategy,
164            retry_interval: entity.retry_interval,
165            base_interval: entity.base_interval,
166            delay_seconds: entity.delay_seconds,
167            max_retries: entity.max_retries,
168            is_repeating: entity.is_repeating,
169            heartbeat_at: entity.heartbeat_at,
170        }
171    }
172}
173
174impl From<TaskMeta> for TaskMetaEntity {
175    fn from(entity: TaskMeta) -> Self {
176        let kind;
177        let cron_schedule;
178        let cron_timezone;
179        let repeat_interval;
180
181        match entity.kind {
182            TaskKind::Cron { schedule, timezone } => {
183                kind = TaskKindEntity::Cron;
184                cron_schedule = Some(schedule);
185                cron_timezone = Some(timezone);
186                repeat_interval = 0;
187            }
188            TaskKind::Repeat { interval_seconds } => {
189                kind = TaskKindEntity::Repeat;
190                cron_schedule = None;
191                cron_timezone = None;
192                repeat_interval = interval_seconds;
193            }
194            TaskKind::Once => {
195                kind = TaskKindEntity::Once;
196                cron_schedule = None;
197                cron_timezone = None;
198                repeat_interval = 0;
199            }
200        }
201
202        TaskMetaEntity {
203            id: entity.id,
204            task_key: entity.task_key,
205            task_params: entity.task_params,
206            queue_name: entity.queue_name,
207            updated_at: entity.updated_at,
208            created_at: entity.created_at,
209            status: entity.status,
210            stopped_reason: entity.stopped_reason,
211            last_error: entity.last_error,
212            last_run: entity.last_run,
213            next_run: entity.next_run,
214            kind,
215            success_count: entity.success_count,
216            failure_count: entity.failure_count,
217            runner_id: entity.runner_id,
218            retry_strategy: entity.retry_strategy,
219            retry_interval: entity.retry_interval,
220            base_interval: entity.base_interval,
221            delay_seconds: entity.delay_seconds,
222            max_retries: entity.max_retries,
223            cron_schedule,
224            cron_timezone,
225            is_repeating: entity.is_repeating,
226            repeat_interval,
227            heartbeat_at: entity.heartbeat_at,
228        }
229    }
230}
231
232#[cfg(test)]
233mod test {
234    use std::{fs, path::Path, time::Duration};
235
236    use crate::nativedb::{init_nativedb, TaskMetaEntity};
237    use itertools::Itertools;
238
239    fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
240        let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
241        if Path::new(&temp_db_path).exists() {
242            fs::remove_file(temp_db_path)?;
243            println!("File 'polly-scheduler.db' has been deleted.");
244        } else {
245            println!("File 'polly-scheduler.db' does not exist.");
246        }
247
248        Ok(())
249    }
250
251    #[tokio::test]
252    async fn delete_db_file() {
253        delete_temp_db().unwrap();
254        tokio::time::sleep(Duration::from_secs(3)).await;
255    }
256
257    #[tokio::test]
258    async fn test() {
259        let db = init_nativedb(None, None).unwrap();
260        let r = db.r_transaction().unwrap();
261
262        let list: Vec<TaskMetaEntity> = r
263            .scan()
264            .primary()
265            .unwrap()
266            .all()
267            .unwrap()
268            .try_collect()
269            .unwrap();
270
271        println!("{:#?}", list);
272    }
273}