persistent_scheduler/nativedb/
mod.rs1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18 let mut models = Models::new();
19 models
20 .define::<TaskMetaEntity>()
21 .expect("Error to define TaskMetaEntity");
22 models
23});
24
25pub fn init_nativedb(
26 db_path: Option<String>,
27 cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29 let mut sys = sysinfo::System::new_all();
30 sys.refresh_memory();
31
32 let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34 let db_file = db_path.unwrap_or_else(|| {
35 std::env::temp_dir()
36 .join("tasks.db")
37 .to_string_lossy()
38 .into_owned()
39 });
40
41 let database = Builder::new()
42 .set_cache_size(cache_size as usize)
43 .create(&MODELS, db_file.as_str());
44
45 if let Err(e) = database {
46 error!("Error init native db {:?}", e);
47 return Err(SchedulerError::StoreInit);
48 }
49
50 let _ = DB.set(database.unwrap());
51 get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55 DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62 #[primary_key]
63 pub id: String, #[secondary_key]
65 pub task_key: String, pub task_params: String, #[secondary_key]
68 pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_duration_ms: Option<usize>,
75 pub last_retry_count: Option<usize>,
77 pub last_run: i64, pub next_run: i64, pub kind: TaskKindEntity, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, pub created_at: i64, }
95
96#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
97pub enum TaskKindEntity {
99 Cron,
101
102 Repeat,
104
105 #[default]
107 Once,
108}
109
110impl TaskMetaEntity {
111 pub fn clean_up(&self) -> String {
112 let result = match self.kind {
113 TaskKindEntity::Cron | TaskKindEntity::Repeat => {
114 matches!(self.status, TaskStatus::Removed)
115 }
116 TaskKindEntity::Once => matches!(
117 self.status,
118 TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
119 ),
120 };
121 result.to_string()
122 }
123
124 pub fn candidate_task(&self) -> String {
125 let result = match self.kind {
126 TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
127 self.status,
128 TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
129 ),
130 TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
131 };
132 result.to_string()
133 }
134}
135
136impl From<TaskMetaEntity> for TaskMeta {
137 fn from(entity: TaskMetaEntity) -> Self {
138 TaskMeta {
139 id: entity.id,
140 task_key: entity.task_key,
141 task_params: entity.task_params,
142 queue_name: entity.queue_name,
143 updated_at: entity.updated_at,
144 created_at: entity.created_at,
145 status: entity.status,
146 stopped_reason: entity.stopped_reason,
147 last_error: entity.last_error,
148 last_duration_ms: entity.last_duration_ms,
149 last_retry_count: entity.last_retry_count,
150 last_run: entity.last_run,
151 next_run: entity.next_run,
152 kind: match entity.kind {
153 TaskKindEntity::Cron => TaskKind::Cron {
154 schedule: entity
155 .cron_schedule
156 .expect("Cron schedule is required for cron kind!"),
157 timezone: entity
158 .cron_timezone
159 .expect("Cron timezone is required for cron kind!"),
160 },
161 TaskKindEntity::Repeat => TaskKind::Repeat {
162 interval_seconds: entity.repeat_interval,
163 },
164 TaskKindEntity::Once => TaskKind::Once,
165 },
166 success_count: entity.success_count,
167 failure_count: entity.failure_count,
168 runner_id: entity.runner_id,
169 retry_strategy: entity.retry_strategy,
170 retry_interval: entity.retry_interval,
171 base_interval: entity.base_interval,
172 delay_seconds: entity.delay_seconds,
173 max_retries: entity.max_retries,
174 is_repeating: entity.is_repeating,
175 heartbeat_at: entity.heartbeat_at,
176 }
177 }
178}
179
180impl From<TaskMeta> for TaskMetaEntity {
181 fn from(entity: TaskMeta) -> Self {
182 let kind;
183 let cron_schedule;
184 let cron_timezone;
185 let repeat_interval;
186
187 match entity.kind {
188 TaskKind::Cron { schedule, timezone } => {
189 kind = TaskKindEntity::Cron;
190 cron_schedule = Some(schedule);
191 cron_timezone = Some(timezone);
192 repeat_interval = 0;
193 }
194 TaskKind::Repeat { interval_seconds } => {
195 kind = TaskKindEntity::Repeat;
196 cron_schedule = None;
197 cron_timezone = None;
198 repeat_interval = interval_seconds;
199 }
200 TaskKind::Once => {
201 kind = TaskKindEntity::Once;
202 cron_schedule = None;
203 cron_timezone = None;
204 repeat_interval = 0;
205 }
206 }
207
208 TaskMetaEntity {
209 id: entity.id,
210 task_key: entity.task_key,
211 task_params: entity.task_params,
212 queue_name: entity.queue_name,
213 updated_at: entity.updated_at,
214 created_at: entity.created_at,
215 status: entity.status,
216 stopped_reason: entity.stopped_reason,
217 last_error: entity.last_error,
218 last_duration_ms: entity.last_duration_ms,
219 last_retry_count: entity.last_retry_count,
220 last_run: entity.last_run,
221 next_run: entity.next_run,
222 kind,
223 success_count: entity.success_count,
224 failure_count: entity.failure_count,
225 runner_id: entity.runner_id,
226 retry_strategy: entity.retry_strategy,
227 retry_interval: entity.retry_interval,
228 base_interval: entity.base_interval,
229 delay_seconds: entity.delay_seconds,
230 max_retries: entity.max_retries,
231 cron_schedule,
232 cron_timezone,
233 is_repeating: entity.is_repeating,
234 repeat_interval,
235 heartbeat_at: entity.heartbeat_at,
236 }
237 }
238}
239
240#[cfg(test)]
241mod test {
242 use std::{fs, path::Path, time::Duration};
243
244 use crate::nativedb::{init_nativedb, TaskMetaEntity};
245 use itertools::Itertools;
246
247 fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
248 let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
249 if Path::new(&temp_db_path).exists() {
250 fs::remove_file(temp_db_path)?;
251 println!("File 'polly-scheduler.db' has been deleted.");
252 } else {
253 println!("File 'polly-scheduler.db' does not exist.");
254 }
255
256 Ok(())
257 }
258
259 #[tokio::test]
260 async fn delete_db_file() {
261 delete_temp_db().unwrap();
262 tokio::time::sleep(Duration::from_secs(3)).await;
263 }
264
265 #[tokio::test]
266 async fn test() {
267 let db = init_nativedb(None, None).unwrap();
268 let r = db.r_transaction().unwrap();
269
270 let list: Vec<TaskMetaEntity> = r
271 .scan()
272 .primary()
273 .unwrap()
274 .all()
275 .unwrap()
276 .try_collect()
277 .unwrap();
278
279 println!("{:#?}", list);
280 }
281}