Skip to main content

dbx_core/automation/
schedule_executor.rs

1//! SQL Schedule Executor
2//!
3//! SQL 스케줄 실행 엔진
4
5use crate::automation::{Schedule, SchedulerThread};
6use crate::engine::Database;
7use crate::error::DbxResult;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock, Weak};
10
11/// SQL 스케줄 실행 엔진
12pub struct ScheduleExecutor {
13    /// 등록된 스케줄들
14    schedules: Arc<RwLock<HashMap<String, Schedule>>>,
15    /// 백그라운드 스케줄러 스레드
16    scheduler_thread: RwLock<Option<SchedulerThread>>,
17}
18
19impl ScheduleExecutor {
20    /// 새 ScheduleExecutor 생성
21    pub fn new() -> Self {
22        Self {
23            schedules: Arc::new(RwLock::new(HashMap::new())),
24            scheduler_thread: RwLock::new(None),
25        }
26    }
27
28    /// 스케줄 등록
29    pub fn register(&self, schedule: Schedule) -> DbxResult<()> {
30        let mut schedules = self.schedules.write().unwrap();
31        schedules.insert(schedule.name.clone(), schedule);
32        Ok(())
33    }
34
35    /// 스케줄 해제
36    pub fn unregister(&self, name: &str) -> DbxResult<()> {
37        let mut schedules = self.schedules.write().unwrap();
38        schedules.remove(name);
39        Ok(())
40    }
41
42    /// 등록된 모든 스케줄 목록
43    pub fn list_schedules(&self) -> Vec<String> {
44        let schedules = self.schedules.read().unwrap();
45        schedules.keys().cloned().collect()
46    }
47
48    /// 특정 스케줄 조회
49    pub fn get_schedule(&self, name: &str) -> Option<Schedule> {
50        let schedules = self.schedules.read().unwrap();
51        schedules.get(name).cloned()
52    }
53
54    /// 스케줄 활성화
55    pub fn enable(&self, name: &str) -> DbxResult<()> {
56        let mut schedules = self.schedules.write().unwrap();
57        if let Some(schedule) = schedules.get_mut(name) {
58            schedule.enable();
59        }
60        Ok(())
61    }
62
63    /// 스케줄 비활성화
64    pub fn disable(&self, name: &str) -> DbxResult<()> {
65        let mut schedules = self.schedules.write().unwrap();
66        if let Some(schedule) = schedules.get_mut(name) {
67            schedule.disable();
68        }
69        Ok(())
70    }
71
72    /// 스케줄 실행
73    ///
74    /// 지정된 스케줄의 SQL body를 실행합니다.
75    ///
76    /// # 인자
77    ///
78    /// * `db` - Database 인스턴스
79    /// * `name` - 실행할 스케줄 이름
80    ///
81    /// # 에러
82    ///
83    /// - 스케줄이 존재하지 않으면 에러 반환
84    /// - 스케줄이 비활성화 상태면 실행하지 않음
85    /// - SQL 실행 실패 시 에러 반환
86    pub fn execute(&self, db: &crate::engine::Database, name: &str) -> DbxResult<()> {
87        // 스케줄 조회
88        let schedule = {
89            let schedules = self.schedules.read().unwrap();
90            schedules.get(name).cloned()
91        };
92
93        let schedule = schedule.ok_or_else(|| crate::error::DbxError::InvalidOperation {
94            message: format!("Schedule '{}' not found", name),
95            context: "EXECUTE SCHEDULE".to_string(),
96        })?;
97
98        // 비활성화된 스케줄은 실행하지 않음
99        if !schedule.enabled {
100            #[cfg(debug_assertions)]
101            println!("[Schedule] Skipping disabled schedule '{}'", name);
102            return Ok(());
103        }
104
105        // Schedule body의 각 SQL 문장 실행
106        for sql in &schedule.sql_body {
107            match db.execute_sql(sql) {
108                Ok(_) => {
109                    #[cfg(debug_assertions)]
110                    println!("[Schedule] Successfully executed '{}': {}", name, sql);
111                }
112                Err(e) => {
113                    // Schedule 실행 실패 시 에러 반환
114                    return Err(crate::error::DbxError::InvalidOperation {
115                        message: format!("Failed to execute schedule '{}': {}", name, e),
116                        context: sql.clone(),
117                    });
118                }
119            }
120        }
121
122        Ok(())
123    }
124
125    /// 백그라운드 스케줄러 시작
126    ///
127    /// # 인자
128    ///
129    /// * `db_weak` - Database Weak 포인터 (순환 참조 방지)
130    pub fn start_scheduler(&self, db_weak: Weak<Database>) -> DbxResult<()> {
131        let mut thread = self.scheduler_thread.write().unwrap();
132
133        if thread.is_some() {
134            return Ok(()); // 이미 실행 중
135        }
136
137        let mut scheduler = SchedulerThread::new();
138        scheduler.start(db_weak, Arc::clone(&self.schedules))?;
139
140        *thread = Some(scheduler);
141        Ok(())
142    }
143
144    /// 백그라운드 스케줄러 중지
145    pub fn stop_scheduler(&self) -> DbxResult<()> {
146        let mut thread = self.scheduler_thread.write().unwrap();
147
148        if let Some(mut scheduler) = thread.take() {
149            scheduler.stop()?;
150        }
151
152        Ok(())
153    }
154
155    /// 스케줄러가 실행 중인지 확인
156    pub fn is_scheduler_running(&self) -> bool {
157        let thread = self.scheduler_thread.read().unwrap();
158        thread.as_ref().map(|t| t.is_running()).unwrap_or(false)
159    }
160}
161
162impl Default for ScheduleExecutor {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    #[test]
173    fn test_register_and_list() {
174        let executor = ScheduleExecutor::new();
175        let schedule = Schedule::new("test", "0 0 * * *", vec![]);
176
177        executor.register(schedule).unwrap();
178
179        let schedules = executor.list_schedules();
180        assert_eq!(schedules.len(), 1);
181        assert!(schedules.contains(&"test".to_string()));
182    }
183
184    #[test]
185    fn test_unregister() {
186        let executor = ScheduleExecutor::new();
187        let schedule = Schedule::new("test", "0 0 * * *", vec![]);
188
189        executor.register(schedule).unwrap();
190        executor.unregister("test").unwrap();
191
192        let schedules = executor.list_schedules();
193        assert_eq!(schedules.len(), 0);
194    }
195
196    #[test]
197    fn test_enable_disable() {
198        let executor = ScheduleExecutor::new();
199        let schedule = Schedule::new("test", "0 0 * * *", vec![]);
200
201        executor.register(schedule).unwrap();
202
203        executor.disable("test").unwrap();
204        let schedule = executor.get_schedule("test").unwrap();
205        assert!(!schedule.enabled);
206
207        executor.enable("test").unwrap();
208        let schedule = executor.get_schedule("test").unwrap();
209        assert!(schedule.enabled);
210    }
211}