polly_scheduler/nativedb/
mod.rs

1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::{native_model, Model};
6use once_cell::sync::{Lazy, OnceCell};
7use serde::{Deserialize, Serialize};
8use tracing::error;
9
10pub mod meta;
11
12static DB: OnceCell<Database> = OnceCell::new();
13
14pub static MODELS: Lazy<Models> = Lazy::new(|| {
15    let mut models = Models::new();
16    models
17        .define::<TaskMetaEntity>()
18        .expect("Error to define TaskMetaEntity");
19    models
20});
21
22pub fn init_nativedb(
23    db_path: Option<String>,
24    cache_size: Option<u64>,
25) -> Result<&'static Database<'static>, SchedulerError> {
26    let mut sys = sysinfo::System::new_all();
27    sys.refresh_memory();
28
29    let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
30
31    let db_file = db_path.unwrap_or_else(|| {
32        std::env::temp_dir()
33            .join("polly-scheduler.db")
34            .to_string_lossy()
35            .into_owned()
36    });
37
38    let database = Builder::new()
39        .set_cache_size(cache_size as usize)
40        .create(&MODELS, db_file.as_str());
41
42    if let Err(e) = database {
43        error!("Error init native db {:?}", e);
44        return Err(SchedulerError::StoreInit);
45    }
46
47    let _ = DB.set(database.unwrap());
48    get_database()
49}
50
51pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
52    DB.get().ok_or_else(|| SchedulerError::StoreInit)
53}
54
55#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
56#[native_model(id = 2, version = 1)]
57#[native_db]
58pub struct TaskMetaEntity {
59    #[primary_key]
60    pub id: String, // Unique identifier for the task
61    #[secondary_key]
62    pub task_key: String, // Key to identify the specific task
63    pub task_params: String, // Arguments for the task, stored as a JSON object
64    #[secondary_key]
65    pub queue_name: String, // Name of the queue for the task
66    pub updated_at: i64,     // Timestamp of the last update
67    #[secondary_key]
68    pub status: TaskStatus, // Current status of the task
69    pub stopped_reason: Option<String>, // Optional reason for why the task was stopped
70    pub last_error: Option<String>, // Error message from the last execution, if any
71    pub last_run: i64,       // Timestamp of the last run
72    pub next_run: i64,       // Timestamp of the next scheduled run
73    pub kind: TaskKind,      // Type of the task
74    pub success_count: u32,  // Count of successful runs
75    pub failure_count: u32,  // Count of failed runs
76    pub runner_id: Option<String>, // The ID of the current task runner, may be None
77    pub retry_strategy: Retry, // Retry strategy for handling failures
78    pub retry_interval: u32, // Interval for retrying the task
79    pub base_interval: u32, // Base interval for exponential backoff
80    pub delay_seconds: u32,  //Delay before executing a Once task, specified in seconds
81    pub max_retries: Option<u32>, // Maximum number of retries allowed
82    pub cron_schedule: Option<String>, // Cron expression for scheduling
83    pub cron_timezone: Option<String>, // Timezone for the cron schedule (stored as a string)
84    pub is_repeating: bool,  // Indicates if the task is repeating
85    pub repeat_interval: u32, // Interval for repeating task
86    pub heartbeat_at: i64,   // Timestamp of the last heartbeat in milliseconds
87}
88
89impl ToKey for TaskStatus {
90    fn to_key(&self) -> Key {
91        match self {
92            TaskStatus::Scheduled => Key::new(vec![0]),
93            TaskStatus::Running => Key::new(vec![1]),
94            TaskStatus::Success => Key::new(vec![2]),
95            TaskStatus::Failed => Key::new(vec![3]),
96            TaskStatus::Removed => Key::new(vec![4]),
97            TaskStatus::Stopped => Key::new(vec![5]),
98        }
99    }
100
101    fn key_names() -> Vec<String> {
102        vec!["String".to_string(), "&str".to_string()]
103    }
104}
105
106impl From<TaskMetaEntity> for TaskMeta {
107    fn from(entity: TaskMetaEntity) -> Self {
108        TaskMeta {
109            id: entity.id,
110            task_key: entity.task_key,
111            task_params: entity.task_params,
112            queue_name: entity.queue_name,
113            updated_at: entity.updated_at,
114            status: entity.status,
115            stopped_reason: entity.stopped_reason,
116            last_error: entity.last_error,
117            last_run: entity.last_run,
118            next_run: entity.next_run,
119            kind: entity.kind,
120            success_count: entity.success_count,
121            failure_count: entity.failure_count,
122            runner_id: entity.runner_id,
123            retry_strategy: entity.retry_strategy,
124            retry_interval: entity.retry_interval,
125            base_interval: entity.base_interval,
126            delay_seconds: entity.delay_seconds,
127            max_retries: entity.max_retries,
128            cron_schedule: entity.cron_schedule,
129            cron_timezone: entity.cron_timezone,
130            is_repeating: entity.is_repeating,
131            repeat_interval: entity.repeat_interval,
132            heartbeat_at: entity.heartbeat_at,
133        }
134    }
135}
136
137impl From<TaskMeta> for TaskMetaEntity {
138    fn from(entity: TaskMeta) -> Self {
139        TaskMetaEntity {
140            id: entity.id,
141            task_key: entity.task_key,
142            task_params: entity.task_params,
143            queue_name: entity.queue_name,
144            updated_at: entity.updated_at,
145            status: entity.status,
146            stopped_reason: entity.stopped_reason,
147            last_error: entity.last_error,
148            last_run: entity.last_run,
149            next_run: entity.next_run,
150            kind: entity.kind,
151            success_count: entity.success_count,
152            failure_count: entity.failure_count,
153            runner_id: entity.runner_id,
154            retry_strategy: entity.retry_strategy,
155            retry_interval: entity.retry_interval,
156            base_interval: entity.base_interval,
157            delay_seconds: entity.delay_seconds,
158            max_retries: entity.max_retries,
159            cron_schedule: entity.cron_schedule,
160            cron_timezone: entity.cron_timezone,
161            is_repeating: entity.is_repeating,
162            repeat_interval: entity.repeat_interval,
163            heartbeat_at: entity.heartbeat_at,
164        }
165    }
166}
167
168#[cfg(test)]
169mod test {
170    use std::{fs, path::Path, time::Duration};
171
172    use crate::nativedb::{init_nativedb, TaskMetaEntity};
173    use itertools::Itertools;
174
175    fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
176        let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
177        if Path::new(&temp_db_path).exists() {
178            fs::remove_file(temp_db_path)?;
179            println!("File 'polly-scheduler.db' has been deleted.");
180        } else {
181            println!("File 'polly-scheduler.db' does not exist.");
182        }
183
184        Ok(())
185    }
186
187    #[tokio::test]
188    async fn delete_db_file() {
189        delete_temp_db().unwrap();
190        tokio::time::sleep(Duration::from_secs(3)).await;
191    }
192
193    #[tokio::test]
194    async fn test() {
195        let db = init_nativedb(None, None).unwrap();
196        let r = db.r_transaction().unwrap();
197
198        let list: Vec<TaskMetaEntity> = r
199            .scan()
200            .primary()
201            .unwrap()
202            .all()
203            .unwrap()
204            .try_collect()
205            .unwrap();
206
207        println!("{:#?}", list);
208    }
209}