persistent_scheduler/nativedb/
mod.rs

1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use native_db::*;
4use native_model::native_model;
5use native_model::Model;
6use serde::{Deserialize, Serialize};
7use std::sync::{LazyLock, OnceLock};
8use tracing::error;
9use crate::core::task_kind::TaskKind;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18    let mut models = Models::new();
19    models
20        .define::<TaskMetaEntity>()
21        .expect("Error to define TaskMetaEntity");
22    models
23});
24
25pub fn init_nativedb(
26    db_path: Option<String>,
27    cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29    let mut sys = sysinfo::System::new_all();
30    sys.refresh_memory();
31
32    let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34    let db_file = db_path.unwrap_or_else(|| {
35        std::env::temp_dir()
36            .join("tasks.db")
37            .to_string_lossy()
38            .into_owned()
39    });
40
41    let database = Builder::new()
42        .set_cache_size(cache_size as usize)
43        .create(&MODELS, db_file.as_str());
44
45    if let Err(e) = database {
46        error!("Error init native db {:?}", e);
47        return Err(SchedulerError::StoreInit);
48    }
49
50    let _ = DB.set(database.unwrap());
51    get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55    DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62    #[primary_key]
63    pub id: String, // Unique identifier for the task
64    #[secondary_key]
65    pub task_key: String, // Key to identify the specific task
66    pub task_params: String, // Arguments for the task, stored as a JSON object
67    #[secondary_key]
68    pub queue_name: String, // Name of the queue for the task
69    pub updated_at: i64,     // Timestamp of the last update
70    pub status: TaskStatus,  // Current status of the task
71    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
72    pub last_error: Option<String>, // Error message from the last execution, if any
73    pub last_run: i64,       // Timestamp of the last run
74    pub next_run: i64,       // Timestamp of the next scheduled run
75    pub kind: TaskKindEntity,      // Type of the task
76    pub success_count: u32,  // Count of successful runs
77    pub failure_count: u32,  // Count of failed runs
78    pub runner_id: Option<String>, // The ID of the current task runner, may be None
79    pub retry_strategy: Retry, // Retry strategy for handling failures
80    pub retry_interval: u32, // Interval for retrying the task
81    pub base_interval: u32,  // Base interval for exponential backoff
82    pub delay_seconds: u32,  //Delay before executing a Once task, specified in seconds
83    pub max_retries: Option<u32>, // Maximum number of retries allowed
84    pub cron_schedule: Option<String>, // Cron expression for scheduling
85    pub cron_timezone: Option<String>, // Timezone for the cron schedule (stored as a string)
86    pub is_repeating: bool,  // Indicates if the task is repeating
87    pub repeat_interval: u32, // Interval for repeating task
88    pub heartbeat_at: i64,   // Timestamp of the last heartbeat in milliseconds
89}
90
91#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
92/// Defines the type of task to be executed.
93pub enum TaskKindEntity {
94    /// Represents a cron job, which is scheduled to run at specific intervals.
95    Cron,
96
97    /// Represents a repeated job that runs at a regular interval.
98    Repeat,
99
100    /// Represents a one-time job that runs once and then completes.
101    #[default]
102    Once,
103}
104
105impl TaskMetaEntity {
106    
107    pub fn clean_up(&self) -> String {
108        let result = match self.kind {
109            TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(self.status, TaskStatus::Removed),
110            TaskKindEntity::Once => matches!(
111                self.status,
112                TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
113            ),
114        };
115        result.to_string()
116    }
117
118    pub fn candidate_task(&self) -> String {
119        let result = match self.kind {
120            TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
121                self.status,
122                TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
123            ),
124            TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
125        };
126        result.to_string()
127    }
128}
129
130impl From<TaskMetaEntity> for TaskMeta {
131    fn from(entity: TaskMetaEntity) -> Self {
132        TaskMeta {
133            id: entity.id,
134            task_key: entity.task_key,
135            task_params: entity.task_params,
136            queue_name: entity.queue_name,
137            updated_at: entity.updated_at,
138            status: entity.status,
139            stopped_reason: entity.stopped_reason,
140            last_error: entity.last_error,
141            last_run: entity.last_run,
142            next_run: entity.next_run,
143            kind: match entity.kind {
144                TaskKindEntity::Cron => TaskKind::Cron {
145                    schedule: entity.cron_schedule.expect("Cron schedule is required for cron kind!"),
146                    timezone: entity.cron_timezone.expect("Cron timezone is required for cron kind!"),
147                },
148                TaskKindEntity::Repeat => TaskKind::Repeat {
149                    interval_seconds: entity.repeat_interval,
150                },
151                TaskKindEntity::Once => TaskKind::Once
152            },
153            success_count: entity.success_count,
154            failure_count: entity.failure_count,
155            runner_id: entity.runner_id,
156            retry_strategy: entity.retry_strategy,
157            retry_interval: entity.retry_interval,
158            base_interval: entity.base_interval,
159            delay_seconds: entity.delay_seconds,
160            max_retries: entity.max_retries,
161            is_repeating: entity.is_repeating,
162            heartbeat_at: entity.heartbeat_at,
163        }
164    }
165}
166
167impl From<TaskMeta> for TaskMetaEntity {
168    fn from(entity: TaskMeta) -> Self {
169        let kind;
170        let cron_schedule;
171        let cron_timezone;
172        let repeat_interval;
173
174        match entity.kind {
175            TaskKind::Cron { schedule, timezone } => {
176                kind = TaskKindEntity::Cron;
177                cron_schedule = Some(schedule);
178                cron_timezone = Some(timezone);
179                repeat_interval = 0;
180            }
181            TaskKind::Repeat { interval_seconds } => {
182                kind = TaskKindEntity::Repeat;
183                cron_schedule = None;
184                cron_timezone = None;
185                repeat_interval = interval_seconds;
186            }
187            TaskKind::Once => {
188                kind = TaskKindEntity::Once;
189                cron_schedule = None;
190                cron_timezone = None;
191                repeat_interval = 0;
192            }
193        }
194
195        TaskMetaEntity {
196            id: entity.id,
197            task_key: entity.task_key,
198            task_params: entity.task_params,
199            queue_name: entity.queue_name,
200            updated_at: entity.updated_at,
201            status: entity.status,
202            stopped_reason: entity.stopped_reason,
203            last_error: entity.last_error,
204            last_run: entity.last_run,
205            next_run: entity.next_run,
206            kind,
207            success_count: entity.success_count,
208            failure_count: entity.failure_count,
209            runner_id: entity.runner_id,
210            retry_strategy: entity.retry_strategy,
211            retry_interval: entity.retry_interval,
212            base_interval: entity.base_interval,
213            delay_seconds: entity.delay_seconds,
214            max_retries: entity.max_retries,
215            cron_schedule,
216            cron_timezone,
217            is_repeating: entity.is_repeating,
218            repeat_interval,
219            heartbeat_at: entity.heartbeat_at,
220        }
221    }
222}
223
224#[cfg(test)]
225mod test {
226    use std::{fs, path::Path, time::Duration};
227
228    use crate::nativedb::{init_nativedb, TaskMetaEntity};
229    use itertools::Itertools;
230
231    fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
232        let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
233        if Path::new(&temp_db_path).exists() {
234            fs::remove_file(temp_db_path)?;
235            println!("File 'polly-scheduler.db' has been deleted.");
236        } else {
237            println!("File 'polly-scheduler.db' does not exist.");
238        }
239
240        Ok(())
241    }
242
243    #[tokio::test]
244    async fn delete_db_file() {
245        delete_temp_db().unwrap();
246        tokio::time::sleep(Duration::from_secs(3)).await;
247    }
248
249    #[tokio::test]
250    async fn test() {
251        let db = init_nativedb(None, None).unwrap();
252        let r = db.r_transaction().unwrap();
253
254        let list: Vec<TaskMetaEntity> = r
255            .scan()
256            .primary()
257            .unwrap()
258            .all()
259            .unwrap()
260            .try_collect()
261            .unwrap();
262
263        println!("{:#?}", list);
264    }
265}