persistent_scheduler/nativedb/
mod.rs1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use native_db::*;
4use native_model::native_model;
5use native_model::Model;
6use serde::{Deserialize, Serialize};
7use std::sync::{LazyLock, OnceLock};
8use tracing::error;
9use crate::core::task_kind::TaskKind;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18 let mut models = Models::new();
19 models
20 .define::<TaskMetaEntity>()
21 .expect("Error to define TaskMetaEntity");
22 models
23});
24
25pub fn init_nativedb(
26 db_path: Option<String>,
27 cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29 let mut sys = sysinfo::System::new_all();
30 sys.refresh_memory();
31
32 let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34 let db_file = db_path.unwrap_or_else(|| {
35 std::env::temp_dir()
36 .join("tasks.db")
37 .to_string_lossy()
38 .into_owned()
39 });
40
41 let database = Builder::new()
42 .set_cache_size(cache_size as usize)
43 .create(&MODELS, db_file.as_str());
44
45 if let Err(e) = database {
46 error!("Error init native db {:?}", e);
47 return Err(SchedulerError::StoreInit);
48 }
49
50 let _ = DB.set(database.unwrap());
51 get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55 DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62 #[primary_key]
63 pub id: String, #[secondary_key]
65 pub task_key: String, pub task_params: String, #[secondary_key]
68 pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_run: i64, pub next_run: i64, pub kind: TaskKindEntity, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, }
90
91#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
92pub enum TaskKindEntity {
94 Cron,
96
97 Repeat,
99
100 #[default]
102 Once,
103}
104
105impl TaskMetaEntity {
106
107 pub fn clean_up(&self) -> String {
108 let result = match self.kind {
109 TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(self.status, TaskStatus::Removed),
110 TaskKindEntity::Once => matches!(
111 self.status,
112 TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
113 ),
114 };
115 result.to_string()
116 }
117
118 pub fn candidate_task(&self) -> String {
119 let result = match self.kind {
120 TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
121 self.status,
122 TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
123 ),
124 TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
125 };
126 result.to_string()
127 }
128}
129
130impl From<TaskMetaEntity> for TaskMeta {
131 fn from(entity: TaskMetaEntity) -> Self {
132 TaskMeta {
133 id: entity.id,
134 task_key: entity.task_key,
135 task_params: entity.task_params,
136 queue_name: entity.queue_name,
137 updated_at: entity.updated_at,
138 status: entity.status,
139 stopped_reason: entity.stopped_reason,
140 last_error: entity.last_error,
141 last_run: entity.last_run,
142 next_run: entity.next_run,
143 kind: match entity.kind {
144 TaskKindEntity::Cron => TaskKind::Cron {
145 schedule: entity.cron_schedule.expect("Cron schedule is required for cron kind!"),
146 timezone: entity.cron_timezone.expect("Cron timezone is required for cron kind!"),
147 },
148 TaskKindEntity::Repeat => TaskKind::Repeat {
149 interval_seconds: entity.repeat_interval,
150 },
151 TaskKindEntity::Once => TaskKind::Once
152 },
153 success_count: entity.success_count,
154 failure_count: entity.failure_count,
155 runner_id: entity.runner_id,
156 retry_strategy: entity.retry_strategy,
157 retry_interval: entity.retry_interval,
158 base_interval: entity.base_interval,
159 delay_seconds: entity.delay_seconds,
160 max_retries: entity.max_retries,
161 is_repeating: entity.is_repeating,
162 heartbeat_at: entity.heartbeat_at,
163 }
164 }
165}
166
167impl From<TaskMeta> for TaskMetaEntity {
168 fn from(entity: TaskMeta) -> Self {
169 let kind;
170 let cron_schedule;
171 let cron_timezone;
172 let repeat_interval;
173
174 match entity.kind {
175 TaskKind::Cron { schedule, timezone } => {
176 kind = TaskKindEntity::Cron;
177 cron_schedule = Some(schedule);
178 cron_timezone = Some(timezone);
179 repeat_interval = 0;
180 }
181 TaskKind::Repeat { interval_seconds } => {
182 kind = TaskKindEntity::Repeat;
183 cron_schedule = None;
184 cron_timezone = None;
185 repeat_interval = interval_seconds;
186 }
187 TaskKind::Once => {
188 kind = TaskKindEntity::Once;
189 cron_schedule = None;
190 cron_timezone = None;
191 repeat_interval = 0;
192 }
193 }
194
195 TaskMetaEntity {
196 id: entity.id,
197 task_key: entity.task_key,
198 task_params: entity.task_params,
199 queue_name: entity.queue_name,
200 updated_at: entity.updated_at,
201 status: entity.status,
202 stopped_reason: entity.stopped_reason,
203 last_error: entity.last_error,
204 last_run: entity.last_run,
205 next_run: entity.next_run,
206 kind,
207 success_count: entity.success_count,
208 failure_count: entity.failure_count,
209 runner_id: entity.runner_id,
210 retry_strategy: entity.retry_strategy,
211 retry_interval: entity.retry_interval,
212 base_interval: entity.base_interval,
213 delay_seconds: entity.delay_seconds,
214 max_retries: entity.max_retries,
215 cron_schedule,
216 cron_timezone,
217 is_repeating: entity.is_repeating,
218 repeat_interval,
219 heartbeat_at: entity.heartbeat_at,
220 }
221 }
222}
223
224#[cfg(test)]
225mod test {
226 use std::{fs, path::Path, time::Duration};
227
228 use crate::nativedb::{init_nativedb, TaskMetaEntity};
229 use itertools::Itertools;
230
231 fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
232 let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
233 if Path::new(&temp_db_path).exists() {
234 fs::remove_file(temp_db_path)?;
235 println!("File 'polly-scheduler.db' has been deleted.");
236 } else {
237 println!("File 'polly-scheduler.db' does not exist.");
238 }
239
240 Ok(())
241 }
242
243 #[tokio::test]
244 async fn delete_db_file() {
245 delete_temp_db().unwrap();
246 tokio::time::sleep(Duration::from_secs(3)).await;
247 }
248
249 #[tokio::test]
250 async fn test() {
251 let db = init_nativedb(None, None).unwrap();
252 let r = db.r_transaction().unwrap();
253
254 let list: Vec<TaskMetaEntity> = r
255 .scan()
256 .primary()
257 .unwrap()
258 .all()
259 .unwrap()
260 .try_collect()
261 .unwrap();
262
263 println!("{:#?}", list);
264 }
265}