use crate::automation::{Schedule, SchedulerThread};
use crate::engine::Database;
use crate::error::DbxResult;
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Weak};
pub struct ScheduleExecutor {
schedules: Arc<RwLock<HashMap<String, Schedule>>>,
scheduler_thread: RwLock<Option<SchedulerThread>>,
}
impl ScheduleExecutor {
pub fn new() -> Self {
Self {
schedules: Arc::new(RwLock::new(HashMap::new())),
scheduler_thread: RwLock::new(None),
}
}
pub fn register(&self, schedule: Schedule) -> DbxResult<()> {
let mut schedules = self.schedules.write().unwrap();
schedules.insert(schedule.name.clone(), schedule);
Ok(())
}
pub fn unregister(&self, name: &str) -> DbxResult<()> {
let mut schedules = self.schedules.write().unwrap();
schedules.remove(name);
Ok(())
}
pub fn list_schedules(&self) -> Vec<String> {
let schedules = self.schedules.read().unwrap();
schedules.keys().cloned().collect()
}
pub fn get_schedule(&self, name: &str) -> Option<Schedule> {
let schedules = self.schedules.read().unwrap();
schedules.get(name).cloned()
}
pub fn enable(&self, name: &str) -> DbxResult<()> {
let mut schedules = self.schedules.write().unwrap();
if let Some(schedule) = schedules.get_mut(name) {
schedule.enable();
}
Ok(())
}
pub fn disable(&self, name: &str) -> DbxResult<()> {
let mut schedules = self.schedules.write().unwrap();
if let Some(schedule) = schedules.get_mut(name) {
schedule.disable();
}
Ok(())
}
pub fn execute(&self, db: &crate::engine::Database, name: &str) -> DbxResult<()> {
let schedule = {
let schedules = self.schedules.read().unwrap();
schedules.get(name).cloned()
};
let schedule = schedule.ok_or_else(|| crate::error::DbxError::InvalidOperation {
message: format!("Schedule '{}' not found", name),
context: "EXECUTE SCHEDULE".to_string(),
})?;
if !schedule.enabled {
#[cfg(debug_assertions)]
println!("[Schedule] Skipping disabled schedule '{}'", name);
return Ok(());
}
for sql in &schedule.sql_body {
match db.execute_sql(sql) {
Ok(_) => {
#[cfg(debug_assertions)]
println!("[Schedule] Successfully executed '{}': {}", name, sql);
}
Err(e) => {
return Err(crate::error::DbxError::InvalidOperation {
message: format!("Failed to execute schedule '{}': {}", name, e),
context: sql.clone(),
});
}
}
}
Ok(())
}
pub fn start_scheduler(&self, db_weak: Weak<Database>) -> DbxResult<()> {
let mut thread = self.scheduler_thread.write().unwrap();
if thread.is_some() {
return Ok(()); }
let mut scheduler = SchedulerThread::new();
scheduler.start(db_weak, Arc::clone(&self.schedules))?;
*thread = Some(scheduler);
Ok(())
}
pub fn stop_scheduler(&self) -> DbxResult<()> {
let mut thread = self.scheduler_thread.write().unwrap();
if let Some(mut scheduler) = thread.take() {
scheduler.stop()?;
}
Ok(())
}
pub fn is_scheduler_running(&self) -> bool {
let thread = self.scheduler_thread.read().unwrap();
thread.as_ref().map(|t| t.is_running()).unwrap_or(false)
}
}
impl Default for ScheduleExecutor {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_register_and_list() {
let executor = ScheduleExecutor::new();
let schedule = Schedule::new("test", "0 0 * * *", vec![]);
executor.register(schedule).unwrap();
let schedules = executor.list_schedules();
assert_eq!(schedules.len(), 1);
assert!(schedules.contains(&"test".to_string()));
}
#[test]
fn test_unregister() {
let executor = ScheduleExecutor::new();
let schedule = Schedule::new("test", "0 0 * * *", vec![]);
executor.register(schedule).unwrap();
executor.unregister("test").unwrap();
let schedules = executor.list_schedules();
assert_eq!(schedules.len(), 0);
}
#[test]
fn test_enable_disable() {
let executor = ScheduleExecutor::new();
let schedule = Schedule::new("test", "0 0 * * *", vec![]);
executor.register(schedule).unwrap();
executor.disable("test").unwrap();
let schedule = executor.get_schedule("test").unwrap();
assert!(!schedule.enabled);
executor.enable("test").unwrap();
let schedule = executor.get_schedule("test").unwrap();
assert!(schedule.enabled);
}
}