dbx_core/automation/
schedule_executor.rs1use crate::automation::{Schedule, SchedulerThread};
6use crate::engine::Database;
7use crate::error::DbxResult;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock, Weak};
10
11pub struct ScheduleExecutor {
13 schedules: Arc<RwLock<HashMap<String, Schedule>>>,
15 scheduler_thread: RwLock<Option<SchedulerThread>>,
17}
18
19impl ScheduleExecutor {
20 pub fn new() -> Self {
22 Self {
23 schedules: Arc::new(RwLock::new(HashMap::new())),
24 scheduler_thread: RwLock::new(None),
25 }
26 }
27
28 pub fn register(&self, schedule: Schedule) -> DbxResult<()> {
30 let mut schedules = self.schedules.write().unwrap();
31 schedules.insert(schedule.name.clone(), schedule);
32 Ok(())
33 }
34
35 pub fn unregister(&self, name: &str) -> DbxResult<()> {
37 let mut schedules = self.schedules.write().unwrap();
38 schedules.remove(name);
39 Ok(())
40 }
41
42 pub fn list_schedules(&self) -> Vec<String> {
44 let schedules = self.schedules.read().unwrap();
45 schedules.keys().cloned().collect()
46 }
47
48 pub fn get_schedule(&self, name: &str) -> Option<Schedule> {
50 let schedules = self.schedules.read().unwrap();
51 schedules.get(name).cloned()
52 }
53
54 pub fn enable(&self, name: &str) -> DbxResult<()> {
56 let mut schedules = self.schedules.write().unwrap();
57 if let Some(schedule) = schedules.get_mut(name) {
58 schedule.enable();
59 }
60 Ok(())
61 }
62
63 pub fn disable(&self, name: &str) -> DbxResult<()> {
65 let mut schedules = self.schedules.write().unwrap();
66 if let Some(schedule) = schedules.get_mut(name) {
67 schedule.disable();
68 }
69 Ok(())
70 }
71
72 pub fn execute(&self, db: &crate::engine::Database, name: &str) -> DbxResult<()> {
87 let schedule = {
89 let schedules = self.schedules.read().unwrap();
90 schedules.get(name).cloned()
91 };
92
93 let schedule = schedule.ok_or_else(|| crate::error::DbxError::InvalidOperation {
94 message: format!("Schedule '{}' not found", name),
95 context: "EXECUTE SCHEDULE".to_string(),
96 })?;
97
98 if !schedule.enabled {
100 #[cfg(debug_assertions)]
101 println!("[Schedule] Skipping disabled schedule '{}'", name);
102 return Ok(());
103 }
104
105 for sql in &schedule.sql_body {
107 match db.execute_sql(sql) {
108 Ok(_) => {
109 #[cfg(debug_assertions)]
110 println!("[Schedule] Successfully executed '{}': {}", name, sql);
111 }
112 Err(e) => {
113 return Err(crate::error::DbxError::InvalidOperation {
115 message: format!("Failed to execute schedule '{}': {}", name, e),
116 context: sql.clone(),
117 });
118 }
119 }
120 }
121
122 Ok(())
123 }
124
125 pub fn start_scheduler(&self, db_weak: Weak<Database>) -> DbxResult<()> {
131 let mut thread = self.scheduler_thread.write().unwrap();
132
133 if thread.is_some() {
134 return Ok(()); }
136
137 let mut scheduler = SchedulerThread::new();
138 scheduler.start(db_weak, Arc::clone(&self.schedules))?;
139
140 *thread = Some(scheduler);
141 Ok(())
142 }
143
144 pub fn stop_scheduler(&self) -> DbxResult<()> {
146 let mut thread = self.scheduler_thread.write().unwrap();
147
148 if let Some(mut scheduler) = thread.take() {
149 scheduler.stop()?;
150 }
151
152 Ok(())
153 }
154
155 pub fn is_scheduler_running(&self) -> bool {
157 let thread = self.scheduler_thread.read().unwrap();
158 thread.as_ref().map(|t| t.is_running()).unwrap_or(false)
159 }
160}
161
162impl Default for ScheduleExecutor {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
173 fn test_register_and_list() {
174 let executor = ScheduleExecutor::new();
175 let schedule = Schedule::new("test", "0 0 * * *", vec![]);
176
177 executor.register(schedule).unwrap();
178
179 let schedules = executor.list_schedules();
180 assert_eq!(schedules.len(), 1);
181 assert!(schedules.contains(&"test".to_string()));
182 }
183
184 #[test]
185 fn test_unregister() {
186 let executor = ScheduleExecutor::new();
187 let schedule = Schedule::new("test", "0 0 * * *", vec![]);
188
189 executor.register(schedule).unwrap();
190 executor.unregister("test").unwrap();
191
192 let schedules = executor.list_schedules();
193 assert_eq!(schedules.len(), 0);
194 }
195
196 #[test]
197 fn test_enable_disable() {
198 let executor = ScheduleExecutor::new();
199 let schedule = Schedule::new("test", "0 0 * * *", vec![]);
200
201 executor.register(schedule).unwrap();
202
203 executor.disable("test").unwrap();
204 let schedule = executor.get_schedule("test").unwrap();
205 assert!(!schedule.enabled);
206
207 executor.enable("test").unwrap();
208 let schedule = executor.get_schedule("test").unwrap();
209 assert!(schedule.enabled);
210 }
211}