persistent_scheduler/nativedb/
mod.rs

1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18    let mut models = Models::new();
19    models
20        .define::<TaskMetaEntity>()
21        .expect("Error to define TaskMetaEntity");
22    models
23});
24
25pub fn init_nativedb(
26    db_path: Option<String>,
27    cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29    let mut sys = sysinfo::System::new_all();
30    sys.refresh_memory();
31
32    let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34    let db_file = db_path.unwrap_or_else(|| {
35        std::env::temp_dir()
36            .join("tasks.db")
37            .to_string_lossy()
38            .into_owned()
39    });
40
41    let database = Builder::new()
42        .set_cache_size(cache_size as usize)
43        .create(&MODELS, db_file.as_str());
44
45    if let Err(e) = database {
46        error!("Error init native db {:?}", e);
47        return Err(SchedulerError::StoreInit);
48    }
49
50    let _ = DB.set(database.unwrap());
51    get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55    DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String), secondary_key(queued_once_task -> String))]
61pub struct TaskMetaEntity {
62    #[primary_key]
63    pub id: String, // Unique identifier for the task
64    #[secondary_key]
65    pub task_key: String, // Key to identify the specific task
66    pub task_params: String, // Arguments for the task, stored as a JSON object
67    #[secondary_key]
68    pub queue_name: String, // Name of the queue for the task
69    pub updated_at: i64,     // Timestamp of the last update
70    pub status: TaskStatus,  // Current status of the task
71    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
72    pub last_error: Option<String>, // Error message from the last execution, if any
73    /// Duration of last run in milliseconds, including retries
74    pub last_duration_ms: Option<usize>,
75    /// Number of retries before last completion (success or final failure)
76    pub last_retry_count: Option<usize>,
77    pub last_run: i64,                 // Timestamp of the last run
78    pub next_run: i64,                 // Timestamp of the next scheduled run
79    pub kind: TaskKindEntity,          // Type of the task
80    pub success_count: u32,            // Count of successful runs
81    pub failure_count: u32,            // Count of failed runs
82    pub runner_id: Option<String>,     // The ID of the current task runner, may be None
83    pub retry_strategy: Retry,         // Retry strategy for handling failures
84    pub retry_interval: u32,           // Interval for retrying the task
85    pub base_interval: u32,            // Base interval for exponential backoff
86    pub delay_seconds: u32,            //Delay before executing a Once task, specified in seconds
87    pub max_retries: Option<u32>,      // Maximum number of retries allowed
88    pub cron_schedule: Option<String>, // Cron expression for scheduling
89    pub cron_timezone: Option<String>, // Timezone for the cron schedule (stored as a string)
90    pub is_repeating: bool,            // Indicates if the task is repeating
91    pub repeat_interval: u32,          // Interval for repeating task
92    pub heartbeat_at: i64,             // Timestamp of the last heartbeat in milliseconds
93    pub created_at: i64,               // Timestamp of the task creation
94}
95
96#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
97/// Defines the type of task to be executed.
98pub enum TaskKindEntity {
99    /// Represents a cron job, which is scheduled to run at specific intervals.
100    Cron,
101
102    /// Represents a repeated job that runs at a regular interval.
103    Repeat,
104
105    /// Represents a one-time job that runs once and then completes.
106    #[default]
107    Once,
108}
109
110impl TaskMetaEntity {
111    pub fn clean_up(&self) -> String {
112        let result = match self.kind {
113            TaskKindEntity::Cron | TaskKindEntity::Repeat => {
114                matches!(self.status, TaskStatus::Removed)
115            }
116            TaskKindEntity::Once => matches!(
117                self.status,
118                TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
119            ),
120        };
121        result.to_string()
122    }
123
124    pub fn queued_once_task(&self) -> String {
125        let result = match self.kind {
126            TaskKindEntity::Cron | TaskKindEntity::Repeat => false,
127            TaskKindEntity::Once => matches!(
128                self.status,
129                TaskStatus::Scheduled | TaskStatus::Running | TaskStatus::Failed
130            ),
131        };
132        result.to_string()
133    }
134
135    pub fn candidate_task(&self) -> String {
136        let result = match self.kind {
137            TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
138                self.status,
139                TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
140            ),
141            TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
142        };
143        result.to_string()
144    }
145}
146
147impl From<TaskMetaEntity> for TaskMeta {
148    fn from(entity: TaskMetaEntity) -> Self {
149        TaskMeta {
150            id: entity.id,
151            task_key: entity.task_key,
152            task_params: entity.task_params,
153            queue_name: entity.queue_name,
154            updated_at: entity.updated_at,
155            created_at: entity.created_at,
156            status: entity.status,
157            stopped_reason: entity.stopped_reason,
158            last_error: entity.last_error,
159            last_duration_ms: entity.last_duration_ms,
160            last_retry_count: entity.last_retry_count,
161            last_run: entity.last_run,
162            next_run: entity.next_run,
163            kind: match entity.kind {
164                TaskKindEntity::Cron => TaskKind::Cron {
165                    schedule: entity
166                        .cron_schedule
167                        .expect("Cron schedule is required for cron kind!"),
168                    timezone: entity
169                        .cron_timezone
170                        .expect("Cron timezone is required for cron kind!"),
171                },
172                TaskKindEntity::Repeat => TaskKind::Repeat {
173                    interval_seconds: entity.repeat_interval,
174                },
175                TaskKindEntity::Once => TaskKind::Once,
176            },
177            success_count: entity.success_count,
178            failure_count: entity.failure_count,
179            runner_id: entity.runner_id,
180            retry_strategy: entity.retry_strategy,
181            retry_interval: entity.retry_interval,
182            base_interval: entity.base_interval,
183            delay_seconds: entity.delay_seconds,
184            max_retries: entity.max_retries,
185            is_repeating: entity.is_repeating,
186            heartbeat_at: entity.heartbeat_at,
187        }
188    }
189}
190
191impl From<TaskMeta> for TaskMetaEntity {
192    fn from(entity: TaskMeta) -> Self {
193        let kind;
194        let cron_schedule;
195        let cron_timezone;
196        let repeat_interval;
197
198        match entity.kind {
199            TaskKind::Cron { schedule, timezone } => {
200                kind = TaskKindEntity::Cron;
201                cron_schedule = Some(schedule);
202                cron_timezone = Some(timezone);
203                repeat_interval = 0;
204            }
205            TaskKind::Repeat { interval_seconds } => {
206                kind = TaskKindEntity::Repeat;
207                cron_schedule = None;
208                cron_timezone = None;
209                repeat_interval = interval_seconds;
210            }
211            TaskKind::Once => {
212                kind = TaskKindEntity::Once;
213                cron_schedule = None;
214                cron_timezone = None;
215                repeat_interval = 0;
216            }
217        }
218
219        TaskMetaEntity {
220            id: entity.id,
221            task_key: entity.task_key,
222            task_params: entity.task_params,
223            queue_name: entity.queue_name,
224            updated_at: entity.updated_at,
225            created_at: entity.created_at,
226            status: entity.status,
227            stopped_reason: entity.stopped_reason,
228            last_error: entity.last_error,
229            last_duration_ms: entity.last_duration_ms,
230            last_retry_count: entity.last_retry_count,
231            last_run: entity.last_run,
232            next_run: entity.next_run,
233            kind,
234            success_count: entity.success_count,
235            failure_count: entity.failure_count,
236            runner_id: entity.runner_id,
237            retry_strategy: entity.retry_strategy,
238            retry_interval: entity.retry_interval,
239            base_interval: entity.base_interval,
240            delay_seconds: entity.delay_seconds,
241            max_retries: entity.max_retries,
242            cron_schedule,
243            cron_timezone,
244            is_repeating: entity.is_repeating,
245            repeat_interval,
246            heartbeat_at: entity.heartbeat_at,
247        }
248    }
249}
250
251#[cfg(test)]
252mod test {
253    use std::{fs, path::Path, time::Duration};
254
255    use crate::nativedb::{init_nativedb, TaskMetaEntity};
256    use itertools::Itertools;
257
258    fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
259        let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
260        if Path::new(&temp_db_path).exists() {
261            fs::remove_file(temp_db_path)?;
262            println!("File 'polly-scheduler.db' has been deleted.");
263        } else {
264            println!("File 'polly-scheduler.db' does not exist.");
265        }
266
267        Ok(())
268    }
269
270    #[tokio::test]
271    async fn delete_db_file() {
272        delete_temp_db().unwrap();
273        tokio::time::sleep(Duration::from_secs(3)).await;
274    }
275
276    #[tokio::test]
277    async fn test() {
278        let db = init_nativedb(None, None).unwrap();
279        let r = db.r_transaction().unwrap();
280
281        let list: Vec<TaskMetaEntity> = r
282            .scan()
283            .primary()
284            .unwrap()
285            .all()
286            .unwrap()
287            .try_collect()
288            .unwrap();
289
290        println!("{:#?}", list);
291    }
292}