persistent_scheduler/nativedb/
mod.rs1use crate::core::error::SchedulerError;
2use crate::core::model::{Retry, TaskMeta, TaskStatus};
3use crate::core::task_kind::TaskKind;
4use native_db::*;
5use native_model::native_model;
6use native_model::Model;
7use serde::{Deserialize, Serialize};
8use std::sync::{LazyLock, OnceLock};
9use tracing::error;
10
11pub mod meta;
12#[cfg(test)]
13mod tests;
14
15static DB: OnceLock<Database> = OnceLock::new();
16
17pub static MODELS: LazyLock<Models> = LazyLock::new(|| {
18 let mut models = Models::new();
19 models
20 .define::<TaskMetaEntity>()
21 .expect("Error to define TaskMetaEntity");
22 models
23});
24
25pub fn init_nativedb(
26 db_path: Option<String>,
27 cache_size: Option<u64>,
28) -> Result<&'static Database<'static>, SchedulerError> {
29 let mut sys = sysinfo::System::new_all();
30 sys.refresh_memory();
31
32 let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
33
34 let db_file = db_path.unwrap_or_else(|| {
35 std::env::temp_dir()
36 .join("tasks.db")
37 .to_string_lossy()
38 .into_owned()
39 });
40
41 let database = Builder::new()
42 .set_cache_size(cache_size as usize)
43 .create(&MODELS, db_file.as_str());
44
45 if let Err(e) = database {
46 error!("Error init native db {:?}", e);
47 return Err(SchedulerError::StoreInit);
48 }
49
50 let _ = DB.set(database.unwrap());
51 get_database()
52}
53
54pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
55 DB.get().ok_or_else(|| SchedulerError::StoreInit)
56}
57
58#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
59#[native_model(id = 1, version = 1)]
60#[native_db(secondary_key(clean_up -> String), secondary_key(candidate_task -> String))]
61pub struct TaskMetaEntity {
62 #[primary_key]
63 pub id: String, #[secondary_key]
65 pub task_key: String, pub task_params: String, #[secondary_key]
68 pub queue_name: String, pub updated_at: i64, pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_run: i64, pub next_run: i64, pub kind: TaskKindEntity, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, pub created_at: i64, }
91
92#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
93pub enum TaskKindEntity {
95 Cron,
97
98 Repeat,
100
101 #[default]
103 Once,
104}
105
106impl TaskMetaEntity {
107 pub fn clean_up(&self) -> String {
108 let result = match self.kind {
109 TaskKindEntity::Cron | TaskKindEntity::Repeat => {
110 matches!(self.status, TaskStatus::Removed)
111 }
112 TaskKindEntity::Once => matches!(
113 self.status,
114 TaskStatus::Removed | TaskStatus::Success | TaskStatus::Failed
115 ),
116 };
117 result.to_string()
118 }
119
120 pub fn candidate_task(&self) -> String {
121 let result = match self.kind {
122 TaskKindEntity::Cron | TaskKindEntity::Repeat => matches!(
123 self.status,
124 TaskStatus::Scheduled | TaskStatus::Success | TaskStatus::Failed
125 ),
126 TaskKindEntity::Once => self.status == TaskStatus::Scheduled,
127 };
128 result.to_string()
129 }
130}
131
132impl From<TaskMetaEntity> for TaskMeta {
133 fn from(entity: TaskMetaEntity) -> Self {
134 TaskMeta {
135 id: entity.id,
136 task_key: entity.task_key,
137 task_params: entity.task_params,
138 queue_name: entity.queue_name,
139 updated_at: entity.updated_at,
140 created_at: entity.created_at,
141 status: entity.status,
142 stopped_reason: entity.stopped_reason,
143 last_error: entity.last_error,
144 last_run: entity.last_run,
145 next_run: entity.next_run,
146 kind: match entity.kind {
147 TaskKindEntity::Cron => TaskKind::Cron {
148 schedule: entity
149 .cron_schedule
150 .expect("Cron schedule is required for cron kind!"),
151 timezone: entity
152 .cron_timezone
153 .expect("Cron timezone is required for cron kind!"),
154 },
155 TaskKindEntity::Repeat => TaskKind::Repeat {
156 interval_seconds: entity.repeat_interval,
157 },
158 TaskKindEntity::Once => TaskKind::Once,
159 },
160 success_count: entity.success_count,
161 failure_count: entity.failure_count,
162 runner_id: entity.runner_id,
163 retry_strategy: entity.retry_strategy,
164 retry_interval: entity.retry_interval,
165 base_interval: entity.base_interval,
166 delay_seconds: entity.delay_seconds,
167 max_retries: entity.max_retries,
168 is_repeating: entity.is_repeating,
169 heartbeat_at: entity.heartbeat_at,
170 }
171 }
172}
173
174impl From<TaskMeta> for TaskMetaEntity {
175 fn from(entity: TaskMeta) -> Self {
176 let kind;
177 let cron_schedule;
178 let cron_timezone;
179 let repeat_interval;
180
181 match entity.kind {
182 TaskKind::Cron { schedule, timezone } => {
183 kind = TaskKindEntity::Cron;
184 cron_schedule = Some(schedule);
185 cron_timezone = Some(timezone);
186 repeat_interval = 0;
187 }
188 TaskKind::Repeat { interval_seconds } => {
189 kind = TaskKindEntity::Repeat;
190 cron_schedule = None;
191 cron_timezone = None;
192 repeat_interval = interval_seconds;
193 }
194 TaskKind::Once => {
195 kind = TaskKindEntity::Once;
196 cron_schedule = None;
197 cron_timezone = None;
198 repeat_interval = 0;
199 }
200 }
201
202 TaskMetaEntity {
203 id: entity.id,
204 task_key: entity.task_key,
205 task_params: entity.task_params,
206 queue_name: entity.queue_name,
207 updated_at: entity.updated_at,
208 created_at: entity.created_at,
209 status: entity.status,
210 stopped_reason: entity.stopped_reason,
211 last_error: entity.last_error,
212 last_run: entity.last_run,
213 next_run: entity.next_run,
214 kind,
215 success_count: entity.success_count,
216 failure_count: entity.failure_count,
217 runner_id: entity.runner_id,
218 retry_strategy: entity.retry_strategy,
219 retry_interval: entity.retry_interval,
220 base_interval: entity.base_interval,
221 delay_seconds: entity.delay_seconds,
222 max_retries: entity.max_retries,
223 cron_schedule,
224 cron_timezone,
225 is_repeating: entity.is_repeating,
226 repeat_interval,
227 heartbeat_at: entity.heartbeat_at,
228 }
229 }
230}
231
232#[cfg(test)]
233mod test {
234 use std::{fs, path::Path, time::Duration};
235
236 use crate::nativedb::{init_nativedb, TaskMetaEntity};
237 use itertools::Itertools;
238
239 fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
240 let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
241 if Path::new(&temp_db_path).exists() {
242 fs::remove_file(temp_db_path)?;
243 println!("File 'polly-scheduler.db' has been deleted.");
244 } else {
245 println!("File 'polly-scheduler.db' does not exist.");
246 }
247
248 Ok(())
249 }
250
251 #[tokio::test]
252 async fn delete_db_file() {
253 delete_temp_db().unwrap();
254 tokio::time::sleep(Duration::from_secs(3)).await;
255 }
256
257 #[tokio::test]
258 async fn test() {
259 let db = init_nativedb(None, None).unwrap();
260 let r = db.r_transaction().unwrap();
261
262 let list: Vec<TaskMetaEntity> = r
263 .scan()
264 .primary()
265 .unwrap()
266 .all()
267 .unwrap()
268 .try_collect()
269 .unwrap();
270
271 println!("{:#?}", list);
272 }
273}