Skip to main content

dbx_core/automation/
scheduler_thread.rs

1//! Background Scheduler Thread
2//!
3//! Weak 포인터를 사용하여 순환 참조 해결
4
5use crate::automation::Schedule;
6use crate::engine::Database;
7use crate::error::DbxResult;
8use std::collections::HashMap;
9use std::sync::mpsc;
10use std::sync::{Arc, RwLock, Weak};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14/// 백그라운드 스케줄러 스레드
15pub struct SchedulerThread {
16    /// 스레드 핸들
17    handle: Option<JoinHandle<()>>,
18    /// Shutdown 신호 전송 채널
19    shutdown_tx: Option<mpsc::Sender<()>>,
20}
21
22impl SchedulerThread {
23    /// 새 SchedulerThread 생성
24    pub fn new() -> Self {
25        Self {
26            handle: None,
27            shutdown_tx: None,
28        }
29    }
30
31    /// 백그라운드 스케줄러 스레드 시작
32    ///
33    /// # 인자
34    ///
35    /// * `db_weak` - Database Weak 포인터 (순환 참조 방지)
36    /// * `schedules` - Schedule 목록 (Arc 공유)
37    pub fn start(
38        &mut self,
39        db_weak: Weak<Database>,
40        schedules: Arc<RwLock<HashMap<String, Schedule>>>,
41    ) -> DbxResult<()> {
42        if self.handle.is_some() {
43            return Ok(()); // 이미 실행 중
44        }
45
46        let (tx, rx) = mpsc::channel();
47        self.shutdown_tx = Some(tx);
48
49        let handle = thread::spawn(move || {
50            loop {
51                // Shutdown 신호 확인 (non-blocking)
52                if rx.try_recv().is_ok() {
53                    break;
54                }
55
56                // Database Arc 업그레이드
57                let db = match db_weak.upgrade() {
58                    Some(db) => db,
59                    None => {
60                        // Database가 drop되면 종료
61                        break;
62                    }
63                };
64
65                // 모든 스케줄 체크 및 실행
66                let schedule_list: Vec<(String, Schedule)> = {
67                    let sched = schedules.read().unwrap();
68                    sched.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
69                };
70
71                for (schedule_name, schedule) in schedule_list {
72                    // enabled 상태 확인
73                    if !schedule.enabled {
74                        continue;
75                    }
76
77                    // Cron 표현식 파싱 및 다음 실행 시간 계산
78                    use std::str::FromStr;
79                    match cron::Schedule::from_str(&schedule.cron_expr) {
80                        Ok(cron_schedule) => {
81                            use chrono::Utc;
82                            let now = Utc::now();
83
84                            // 다음 실행 시간 계산
85                            if let Some(next) = cron_schedule.upcoming(Utc).next() {
86                                // 현재 시간과 비교 (1초 오차 허용)
87                                let diff = (next - now).num_seconds();
88                                if diff.abs() <= 1 {
89                                    // 실행 시간 도달
90                                    let exec = db.schedule_executor.read().unwrap();
91                                    if let Err(e) = exec.execute(&db, &schedule_name) {
92                                        eprintln!(
93                                            "[Scheduler] Failed to execute schedule '{}': {}",
94                                            schedule_name, e
95                                        );
96                                    }
97                                }
98                            }
99                        }
100                        Err(e) => {
101                            eprintln!(
102                                "[Scheduler] Invalid cron expression for '{}': {}",
103                                schedule_name, e
104                            );
105                        }
106                    }
107                }
108
109                // 1초 대기
110                thread::sleep(Duration::from_secs(1));
111            }
112        });
113
114        self.handle = Some(handle);
115        Ok(())
116    }
117
118    /// 백그라운드 스케줄러 스레드 중지
119    pub fn stop(&mut self) -> DbxResult<()> {
120        if let Some(tx) = self.shutdown_tx.take() {
121            let _ = tx.send(());
122        }
123
124        if let Some(handle) = self.handle.take() {
125            let _ = handle.join();
126        }
127
128        Ok(())
129    }
130
131    /// 스레드가 실행 중인지 확인
132    pub fn is_running(&self) -> bool {
133        self.handle.is_some()
134    }
135}
136
137impl Default for SchedulerThread {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143impl Drop for SchedulerThread {
144    fn drop(&mut self) {
145        let _ = self.stop();
146    }
147}