persistent_scheduler/nativedb/
mod.rs1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18 let mut models = Models::new();
19 models
20 .define::<TaskMetaEntity>()
21 .expect("Error to define TaskMetaEntity");
22 models
23});
24
25pub fn init_nativedb(
26 db_path: Option<String>,
27 cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29 let mut sys = sysinfo::System::new_all();
30 sys.refresh_memory();
31
32 let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34 let db_file = db_path.unwrap_or_else(|| {
35 std::env::temp_dir()
36 .join("tasks.db")
37 .to_string_lossy()
38 .into_owned()
39 });
40
41 let database = Builder::new()
42 .set_cache_size(cache_size as usize)
43 .create(&MODELS, db_file.as_str());
44
45 if let Err(e) = database {
46 error!("Error init native db {:?}", e);
47 return Err(SchedulerError::StoreInit);
48 }
49
50 let _ = DB.set(database.unwrap());
51 get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55 DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String), secondary_key(queued_once_task -> String))]
61pub struct TaskMetaEntity {
62 #[primary_key]
63 pub id: String, #[secondary_key]
65 pub task_key: String, pub task_params: String, #[secondary_key]
68 pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_duration_ms: Option<usize>,
75 pub last_retry_count: Option<usize>,
77 pub last_run: i64, pub next_run: i64, pub kind: TaskKindEntity, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, pub created_at: i64, }
95
96#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
97pub enum TaskKindEntity {
99 Cron,
101
102 Repeat,
104
105 #[default]
107 Once,
108}
109
110impl TaskMetaEntity {
111 pub fn clean_up(&self) -> String {
112 let result = match self.kind {
113 TaskKindEntity::Cron | TaskKindEntity::Repeat => {
114 matches!(self.status, TaskStatus::Removed)
115 }
116 TaskKindEntity::Once => matches!(
117 self.status,
118 TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
119 ),
120 };
121 result.to_string()
122 }
123
124 pub fn queued_once_task(&self) -> String {
125 let result = match self.kind {
126 TaskKindEntity::Cron | TaskKindEntity::Repeat => false,
127 TaskKindEntity::Once => matches!(
128 self.status,
129 TaskStatus::Scheduled | TaskStatus::Running | TaskStatus::Failed
130 ),
131 };
132 result.to_string()
133 }
134
135 pub fn candidate_task(&self) -> String {
136 let result = match self.kind {
137 TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
138 self.status,
139 TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
140 ),
141 TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
142 };
143 result.to_string()
144 }
145}
146
147impl From<TaskMetaEntity> for TaskMeta {
148 fn from(entity: TaskMetaEntity) -> Self {
149 TaskMeta {
150 id: entity.id,
151 task_key: entity.task_key,
152 task_params: entity.task_params,
153 queue_name: entity.queue_name,
154 updated_at: entity.updated_at,
155 created_at: entity.created_at,
156 status: entity.status,
157 stopped_reason: entity.stopped_reason,
158 last_error: entity.last_error,
159 last_duration_ms: entity.last_duration_ms,
160 last_retry_count: entity.last_retry_count,
161 last_run: entity.last_run,
162 next_run: entity.next_run,
163 kind: match entity.kind {
164 TaskKindEntity::Cron => TaskKind::Cron {
165 schedule: entity
166 .cron_schedule
167 .expect("Cron schedule is required for cron kind!"),
168 timezone: entity
169 .cron_timezone
170 .expect("Cron timezone is required for cron kind!"),
171 },
172 TaskKindEntity::Repeat => TaskKind::Repeat {
173 interval_seconds: entity.repeat_interval,
174 },
175 TaskKindEntity::Once => TaskKind::Once,
176 },
177 success_count: entity.success_count,
178 failure_count: entity.failure_count,
179 runner_id: entity.runner_id,
180 retry_strategy: entity.retry_strategy,
181 retry_interval: entity.retry_interval,
182 base_interval: entity.base_interval,
183 delay_seconds: entity.delay_seconds,
184 max_retries: entity.max_retries,
185 is_repeating: entity.is_repeating,
186 heartbeat_at: entity.heartbeat_at,
187 }
188 }
189}
190
191impl From<TaskMeta> for TaskMetaEntity {
192 fn from(entity: TaskMeta) -> Self {
193 let kind;
194 let cron_schedule;
195 let cron_timezone;
196 let repeat_interval;
197
198 match entity.kind {
199 TaskKind::Cron { schedule, timezone } => {
200 kind = TaskKindEntity::Cron;
201 cron_schedule = Some(schedule);
202 cron_timezone = Some(timezone);
203 repeat_interval = 0;
204 }
205 TaskKind::Repeat { interval_seconds } => {
206 kind = TaskKindEntity::Repeat;
207 cron_schedule = None;
208 cron_timezone = None;
209 repeat_interval = interval_seconds;
210 }
211 TaskKind::Once => {
212 kind = TaskKindEntity::Once;
213 cron_schedule = None;
214 cron_timezone = None;
215 repeat_interval = 0;
216 }
217 }
218
219 TaskMetaEntity {
220 id: entity.id,
221 task_key: entity.task_key,
222 task_params: entity.task_params,
223 queue_name: entity.queue_name,
224 updated_at: entity.updated_at,
225 created_at: entity.created_at,
226 status: entity.status,
227 stopped_reason: entity.stopped_reason,
228 last_error: entity.last_error,
229 last_duration_ms: entity.last_duration_ms,
230 last_retry_count: entity.last_retry_count,
231 last_run: entity.last_run,
232 next_run: entity.next_run,
233 kind,
234 success_count: entity.success_count,
235 failure_count: entity.failure_count,
236 runner_id: entity.runner_id,
237 retry_strategy: entity.retry_strategy,
238 retry_interval: entity.retry_interval,
239 base_interval: entity.base_interval,
240 delay_seconds: entity.delay_seconds,
241 max_retries: entity.max_retries,
242 cron_schedule,
243 cron_timezone,
244 is_repeating: entity.is_repeating,
245 repeat_interval,
246 heartbeat_at: entity.heartbeat_at,
247 }
248 }
249}
250
251#[cfg(test)]
252mod test {
253 use std::{fs, path::Path, time::Duration};
254
255 use crate::nativedb::{init_nativedb, TaskMetaEntity};
256 use itertools::Itertools;
257
258 fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
259 let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
260 if Path::new(&temp_db_path).exists() {
261 fs::remove_file(temp_db_path)?;
262 println!("File 'polly-scheduler.db' has been deleted.");
263 } else {
264 println!("File 'polly-scheduler.db' does not exist.");
265 }
266
267 Ok(())
268 }
269
270 #[tokio::test]
271 async fn delete_db_file() {
272 delete_temp_db().unwrap();
273 tokio::time::sleep(Duration::from_secs(3)).await;
274 }
275
276 #[tokio::test]
277 async fn test() {
278 let db = init_nativedb(None, None).unwrap();
279 let r = db.r_transaction().unwrap();
280
281 let list: Vec<TaskMetaEntity> = r
282 .scan()
283 .primary()
284 .unwrap()
285 .all()
286 .unwrap()
287 .try_collect()
288 .unwrap();
289
290 println!("{:#?}", list);
291 }
292}