persistent_scheduler/nativedb/
mod.rsuse crate::core::error::SchedulerError;
use crate::core::model::{Retry, TaskMeta, TaskStatus};
use crate::core::task_kind::TaskKind;
use native_db::*;
use native_model::{native_model, Model};
use once_cell::sync::{Lazy, OnceCell};
use serde::{Deserialize, Serialize};
use tracing::error;
pub mod meta;
static DB: OnceCell<Database> = OnceCell::new();
pub static MODELS: Lazy<Models> = Lazy::new(|| {
let mut models = Models::new();
models
.define::<TaskMetaEntity>()
.expect("Error to define TaskMetaEntity");
models
});
pub fn init_nativedb(
db_path: Option<String>,
cache_size: Option<u64>,
) -> Result<&'static Database<'static>, SchedulerError> {
let mut sys = sysinfo::System::new_all();
sys.refresh_memory();
let cache_size = cache_size.unwrap_or_else(|| sys.free_memory() / 4);
let db_file = db_path.unwrap_or_else(|| {
std::env::temp_dir()
.join("polly-scheduler.db")
.to_string_lossy()
.into_owned()
});
let database = Builder::new()
.set_cache_size(cache_size as usize)
.create(&MODELS, db_file.as_str());
if let Err(e) = database {
error!("Error init native db {:?}", e);
return Err(SchedulerError::StoreInit);
}
let _ = DB.set(database.unwrap());
get_database()
}
pub fn get_database() -> Result<&'static Database<'static>, SchedulerError> {
DB.get().ok_or_else(|| SchedulerError::StoreInit)
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[native_model(id = 2, version = 1)]
#[native_db]
pub struct TaskMetaEntity {
#[primary_key]
pub id: String, #[secondary_key]
pub task_key: String, pub task_params: String, #[secondary_key]
pub queue_name: String, pub updated_at: i64, #[secondary_key]
pub status: TaskStatus, pub stopped_reason: Option<String>, pub last_error: Option<String>, pub last_run: i64, pub next_run: i64, pub kind: TaskKind, pub success_count: u32, pub failure_count: u32, pub runner_id: Option<String>, pub retry_strategy: Retry, pub retry_interval: u32, pub base_interval: u32, pub delay_seconds: u32, pub max_retries: Option<u32>, pub cron_schedule: Option<String>, pub cron_timezone: Option<String>, pub is_repeating: bool, pub repeat_interval: u32, pub heartbeat_at: i64, }
impl ToKey for TaskStatus {
fn to_key(&self) -> Key {
match self {
TaskStatus::Scheduled => Key::new(vec![0]),
TaskStatus::Running => Key::new(vec![1]),
TaskStatus::Success => Key::new(vec![2]),
TaskStatus::Failed => Key::new(vec![3]),
TaskStatus::Removed => Key::new(vec![4]),
TaskStatus::Stopped => Key::new(vec![5]),
}
}
fn key_names() -> Vec<String> {
vec!["String".to_string(), "&str".to_string()]
}
}
impl From<TaskMetaEntity> for TaskMeta {
fn from(entity: TaskMetaEntity) -> Self {
TaskMeta {
id: entity.id,
task_key: entity.task_key,
task_params: entity.task_params,
queue_name: entity.queue_name,
updated_at: entity.updated_at,
status: entity.status,
stopped_reason: entity.stopped_reason,
last_error: entity.last_error,
last_run: entity.last_run,
next_run: entity.next_run,
kind: entity.kind,
success_count: entity.success_count,
failure_count: entity.failure_count,
runner_id: entity.runner_id,
retry_strategy: entity.retry_strategy,
retry_interval: entity.retry_interval,
base_interval: entity.base_interval,
delay_seconds: entity.delay_seconds,
max_retries: entity.max_retries,
cron_schedule: entity.cron_schedule,
cron_timezone: entity.cron_timezone,
is_repeating: entity.is_repeating,
repeat_interval: entity.repeat_interval,
heartbeat_at: entity.heartbeat_at,
}
}
}
impl From<TaskMeta> for TaskMetaEntity {
fn from(entity: TaskMeta) -> Self {
TaskMetaEntity {
id: entity.id,
task_key: entity.task_key,
task_params: entity.task_params,
queue_name: entity.queue_name,
updated_at: entity.updated_at,
status: entity.status,
stopped_reason: entity.stopped_reason,
last_error: entity.last_error,
last_run: entity.last_run,
next_run: entity.next_run,
kind: entity.kind,
success_count: entity.success_count,
failure_count: entity.failure_count,
runner_id: entity.runner_id,
retry_strategy: entity.retry_strategy,
retry_interval: entity.retry_interval,
base_interval: entity.base_interval,
delay_seconds: entity.delay_seconds,
max_retries: entity.max_retries,
cron_schedule: entity.cron_schedule,
cron_timezone: entity.cron_timezone,
is_repeating: entity.is_repeating,
repeat_interval: entity.repeat_interval,
heartbeat_at: entity.heartbeat_at,
}
}
}
#[cfg(test)]
mod test {
use std::{fs, path::Path, time::Duration};
use crate::nativedb::{init_nativedb, TaskMetaEntity};
use itertools::Itertools;
fn delete_temp_db() -> Result<(), Box<dyn std::error::Error>> {
let temp_db_path = std::env::temp_dir().join("polly-scheduler.db");
if Path::new(&temp_db_path).exists() {
fs::remove_file(temp_db_path)?;
println!("File 'polly-scheduler.db' has been deleted.");
} else {
println!("File 'polly-scheduler.db' does not exist.");
}
Ok(())
}
#[tokio::test]
async fn delete_db_file() {
delete_temp_db().unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
}
#[tokio::test]
async fn test() {
let db = init_nativedb(None, None).unwrap();
let r = db.r_transaction().unwrap();
let list: Vec<TaskMetaEntity> = r
.scan()
.primary()
.unwrap()
.all()
.unwrap()
.try_collect()
.unwrap();
println!("{:#?}", list);
}
}