use super::job::ScheduledJob;
use crate::automation::ExecutionEngine;
use crate::automation::callable::ExecutionContext;
use crate::error::DbxResult;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
pub struct Scheduler {
jobs: RwLock<HashMap<String, ScheduledJob>>,
execution_engine: Arc<ExecutionEngine>,
}
impl Scheduler {
pub fn new(execution_engine: Arc<ExecutionEngine>) -> Self {
Self {
jobs: RwLock::new(HashMap::new()),
execution_engine,
}
}
pub fn register(&self, job: ScheduledJob) -> DbxResult<()> {
let name = job.name().to_string();
let mut jobs = self
.jobs
.write()
.map_err(|_| crate::error::DbxError::LockPoisoned)?;
if jobs.contains_key(&name) {
return Err(crate::error::DbxError::DuplicateCallable(name));
}
jobs.insert(name, job);
Ok(())
}
pub fn unregister(&self, name: &str) -> DbxResult<()> {
let mut jobs = self
.jobs
.write()
.map_err(|_| crate::error::DbxError::LockPoisoned)?;
jobs.remove(name)
.ok_or_else(|| crate::error::DbxError::CallableNotFound(name.to_string()))?;
Ok(())
}
pub fn tick(&self, ctx: &ExecutionContext) -> DbxResult<Vec<String>> {
let mut jobs = self
.jobs
.write()
.map_err(|_| crate::error::DbxError::LockPoisoned)?;
let mut executed = Vec::new();
for (name, job) in jobs.iter_mut() {
if job.schedule().is_ready() {
let result = self
.execution_engine
.execute(job.callable_name(), ctx, job.args());
if result.is_ok() {
executed.push(name.clone());
}
job.schedule_mut().update();
}
}
Ok(executed)
}
pub fn list(&self) -> DbxResult<Vec<String>> {
let jobs = self
.jobs
.read()
.map_err(|_| crate::error::DbxError::LockPoisoned)?;
Ok(jobs.keys().cloned().collect())
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new(Arc::new(ExecutionEngine::new()))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::automation::ScalarUDF;
use crate::automation::callable::{DataType, Signature, Value};
use crate::automation::scheduler::{Schedule, ScheduleType};
use crate::engine::Database;
use std::time::Duration;
#[test]
fn test_scheduler_register() {
let scheduler = Scheduler::default();
let schedule = Schedule::new(ScheduleType::Interval(Duration::from_secs(60)));
let job = ScheduledJob::new("test_job", schedule, "test_udf", vec![]);
scheduler.register(job).unwrap();
let jobs = scheduler.list().unwrap();
assert_eq!(jobs.len(), 1);
assert!(jobs.contains(&"test_job".to_string()));
}
#[test]
fn test_scheduler_duplicate() {
let scheduler = Scheduler::default();
let schedule1 = Schedule::new(ScheduleType::Interval(Duration::from_secs(60)));
let job1 = ScheduledJob::new("test_job", schedule1, "test_udf", vec![]);
let schedule2 = Schedule::new(ScheduleType::Interval(Duration::from_secs(60)));
let job2 = ScheduledJob::new("test_job", schedule2, "test_udf", vec![]);
scheduler.register(job1).unwrap();
let result = scheduler.register(job2);
assert!(result.is_err());
}
#[test]
fn test_scheduler_tick() {
let engine = Arc::new(ExecutionEngine::new());
let udf = Arc::new(ScalarUDF::new(
"test_udf",
Signature {
params: vec![],
return_type: DataType::Null,
is_variadic: false,
},
|_args| Ok(Value::Null),
));
engine.register(udf).unwrap();
let scheduler = Scheduler::new(engine);
let schedule = Schedule::new(ScheduleType::Once(Duration::from_secs(0)));
let job = ScheduledJob::new("test_job", schedule, "test_udf", vec![]);
scheduler.register(job).unwrap();
let db = Database::open_in_memory().unwrap();
let ctx = ExecutionContext::new(Arc::new(db));
std::thread::sleep(Duration::from_millis(100));
let executed = scheduler.tick(&ctx).unwrap();
assert_eq!(executed.len(), 1);
assert_eq!(executed[0], "test_job");
}
}