use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use cron::Schedule;
use crate::core::{DataType, Row, Value};
use crate::executor::Executor;
use crate::storage::jobs::{SYS_CRON, SYS_CRON_RUNS};
use crate::storage::traits::Engine;
pub struct JobScheduler {
executor: Executor,
shutdown_flag: Arc<AtomicBool>,
}
impl JobScheduler {
pub fn start(executor: Executor) -> Arc<AtomicBool> {
let shutdown_flag = Arc::new(AtomicBool::new(false));
let flag_clone = Arc::clone(&shutdown_flag);
let scheduler = Self {
executor,
shutdown_flag: flag_clone,
};
thread::Builder::new()
.name("oxibase-job-scheduler".to_string())
.spawn(move || {
scheduler.run_loop();
})
.expect("Failed to spawn job scheduler thread");
shutdown_flag
}
fn run_loop(&self) {
tracing::info!("Job scheduler background thread started");
let interval = Duration::from_secs(1);
while !self.shutdown_flag.load(Ordering::Relaxed) {
let start = SystemTime::now();
if let Err(e) = self.evaluate_and_run_jobs() {
tracing::error!("Job scheduler error: {}", e);
}
if let Ok(elapsed) = start.elapsed() {
if elapsed < interval {
thread::sleep(interval - elapsed);
}
} else {
thread::sleep(interval);
}
}
tracing::info!("Job scheduler background thread shutting down");
}
fn evaluate_and_run_jobs(&self) -> crate::core::Result<()> {
let tx = self.executor.engine.begin_transaction()?;
if !self.executor.engine.table_exists(SYS_CRON)? {
return Ok(());
}
let table = tx.get_table(SYS_CRON)?;
let mut scanner = table.scan(&[], None)?;
let mut jobs_to_run = Vec::new();
let now = Utc::now();
while scanner.next() {
let row = scanner.row();
if let (
Some(Value::Integer(id)),
Some(Value::Text(name)),
Some(Value::Text(schedule_str)),
Some(Value::Text(command)),
Some(Value::Boolean(active)),
) = (row.get(0), row.get(1), row.get(2), row.get(3), row.get(4))
{
if !active {
continue;
}
if let Ok(schedule) = schedule_str.as_ref().parse::<Schedule>() {
if schedule.includes(now) {
jobs_to_run.push((*id, name.to_string(), command.to_string()));
}
}
}
}
drop(scanner);
drop(tx);
for (job_id, name, command) in jobs_to_run {
self.execute_job(job_id, &name, &command);
}
Ok(())
}
fn execute_job(&self, job_id: i64, name: &str, command: &str) {
let _span = tracing::info_span!("job.execute", job_id = job_id, job_name = name).entered();
tracing::debug!("Executing scheduled job '{}' (ID: {})", name, job_id);
let start_time = Utc::now();
let result = self.executor.execute_internal_sql(command);
let end_time = Utc::now();
let (status, message) = match result {
Ok(_) => ("SUCCESS", None),
Err(e) => ("FAILED", Some(e.to_string())),
};
if let Err(e) = self.log_run(job_id, status, message, start_time, end_time) {
tracing::error!("Failed to log job execution for '{}': {}", name, e);
}
}
fn log_run(
&self,
job_id: i64,
status: &str,
message: Option<String>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
) -> crate::core::Result<()> {
let mut tx = self.executor.engine.begin_transaction()?;
let mut table = tx.get_table(SYS_CRON_RUNS)?;
let values = vec![
Value::Null(DataType::Integer), Value::Integer(job_id),
Value::text(status),
if let Some(msg) = message {
Value::text(msg)
} else {
Value::Null(DataType::Text)
},
Value::Timestamp(start_time),
Value::Timestamp(end_time),
];
let row = Row::from_values(values);
table.insert(row)?;
tx.commit()?;
Ok(())
}
}