polly_scheduler/nativedb/
mod.rs1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::{native_model, Model};
6use once_cell::sync::{Lazy, OnceCell};
7use serde::{Deserialize, Serialize};
8use tracing::error;
9
10pub mod meta;
11
12static DB: OnceCell<Database> = OnceCell::new();
13
14pub static MODELS: Lazy<Models> = Lazy::new(|| {
15 let mut models = Models::new();
16 models
17 .define::<TaskMetaEntity>()
18 .expect("Error to define TaskMetaEntity");
19 models
20});
21
22pub fn init_nativedb(
23 db_path: Option<String>,
24 cache_size: Option<u64>,
25) -> Result<&'static Database<'static>, SchedulerError> {
26 let mut sys = sysinfo::System::new_all();
27 sys.refresh_memory();
28
29 let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
30
31 let db_file = db_path.unwrap_or_else(|| {
32 std::env::temp_dir()
33 .join("polly-scheduler.db")
34 .to_string_lossy()
35 .into_owned()
36 });
37
38 let database = Builder::new()
39 .set_cache_size(cache_size as usize)
40 .create(&MODELS, db_file.as_str());
41
42 if let Err(e) = database {
43 error!("Error init native db {:?}", e);
44 return Err(SchedulerError::StoreInit);
45 }
46
47 let _ = DB.set(database.unwrap());
48 get_database()
49}
50
51pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
52 DB.get().ok_or_else(|| SchedulerError::StoreInit)
53}
54
55#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
56#[native_model(id = 2, version = 1)]
57#[native_db]
58pub struct TaskMetaEntity {
59 #[primary_key]
60 pub id: String, #[secondary_key]
62 pub task_key: String, pub task_params: String, #[secondary_key]
65 pub queue_name: String, pub updated_at: i64, #[secondary_key]
68 pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_run: i64, pub next_run: i64, pub kind: TaskKind, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, }
88
89impl ToKey for TaskStatus {
90 fn to_key(&self) -> Key {
91 match self {
92 TaskStatus::Scheduled => Key::new(vec![0]),
93 TaskStatus::Running => Key::new(vec![1]),
94 TaskStatus::Success => Key::new(vec![2]),
95 TaskStatus::Failed => Key::new(vec![3]),
96 TaskStatus::Removed => Key::new(vec![4]),
97 TaskStatus::Stopped => Key::new(vec![5]),
98 }
99 }
100
101 fn key_names() -> Vec<String> {
102 vec!["String".to_string(), "&str".to_string()]
103 }
104}
105
106impl From<TaskMetaEntity> for TaskMeta {
107 fn from(entity: TaskMetaEntity) -> Self {
108 TaskMeta {
109 id: entity.id,
110 task_key: entity.task_key,
111 task_params: entity.task_params,
112 queue_name: entity.queue_name,
113 updated_at: entity.updated_at,
114 status: entity.status,
115 stopped_reason: entity.stopped_reason,
116 last_error: entity.last_error,
117 last_run: entity.last_run,
118 next_run: entity.next_run,
119 kind: entity.kind,
120 success_count: entity.success_count,
121 failure_count: entity.failure_count,
122 runner_id: entity.runner_id,
123 retry_strategy: entity.retry_strategy,
124 retry_interval: entity.retry_interval,
125 base_interval: entity.base_interval,
126 delay_seconds: entity.delay_seconds,
127 max_retries: entity.max_retries,
128 cron_schedule: entity.cron_schedule,
129 cron_timezone: entity.cron_timezone,
130 is_repeating: entity.is_repeating,
131 repeat_interval: entity.repeat_interval,
132 heartbeat_at: entity.heartbeat_at,
133 }
134 }
135}
136
137impl From<TaskMeta> for TaskMetaEntity {
138 fn from(entity: TaskMeta) -> Self {
139 TaskMetaEntity {
140 id: entity.id,
141 task_key: entity.task_key,
142 task_params: entity.task_params,
143 queue_name: entity.queue_name,
144 updated_at: entity.updated_at,
145 status: entity.status,
146 stopped_reason: entity.stopped_reason,
147 last_error: entity.last_error,
148 last_run: entity.last_run,
149 next_run: entity.next_run,
150 kind: entity.kind,
151 success_count: entity.success_count,
152 failure_count: entity.failure_count,
153 runner_id: entity.runner_id,
154 retry_strategy: entity.retry_strategy,
155 retry_interval: entity.retry_interval,
156 base_interval: entity.base_interval,
157 delay_seconds: entity.delay_seconds,
158 max_retries: entity.max_retries,
159 cron_schedule: entity.cron_schedule,
160 cron_timezone: entity.cron_timezone,
161 is_repeating: entity.is_repeating,
162 repeat_interval: entity.repeat_interval,
163 heartbeat_at: entity.heartbeat_at,
164 }
165 }
166}
167
168#[cfg(test)]
169mod test {
170 use std::{fs, path::Path, time::Duration};
171
172 use crate::nativedb::{init_nativedb, TaskMetaEntity};
173 use itertools::Itertools;
174
175 fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
176 let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
177 if Path::new(&temp_db_path).exists() {
178 fs::remove_file(temp_db_path)?;
179 println!("File 'polly-scheduler.db' has been deleted.");
180 } else {
181 println!("File 'polly-scheduler.db' does not exist.");
182 }
183
184 Ok(())
185 }
186
187 #[tokio::test]
188 async fn delete_db_file() {
189 delete_temp_db().unwrap();
190 tokio::time::sleep(Duration::from_secs(3)).await;
191 }
192
193 #[tokio::test]
194 async fn test() {
195 let db = init_nativedb(None, None).unwrap();
196 let r = db.r_transaction().unwrap();
197
198 let list: Vec<TaskMetaEntity> = r
199 .scan()
200 .primary()
201 .unwrap()
202 .all()
203 .unwrap()
204 .try_collect()
205 .unwrap();
206
207 println!("{:#?}", list);
208 }
209}