dbx_core/automation/
scheduler_thread.rs1use crate::automation::Schedule;
6use crate::engine::Database;
7use crate::error::DbxResult;
8use std::collections::HashMap;
9use std::sync::mpsc;
10use std::sync::{Arc, RwLock, Weak};
11use std::thread::{self, JoinHandle};
12use std::time::Duration;
13
14pub struct SchedulerThread {
16 handle: Option<JoinHandle<()>>,
18 shutdown_tx: Option<mpsc::Sender<()>>,
20}
21
22impl SchedulerThread {
23 pub fn new() -> Self {
25 Self {
26 handle: None,
27 shutdown_tx: None,
28 }
29 }
30
31 pub fn start(
38 &mut self,
39 db_weak: Weak<Database>,
40 schedules: Arc<RwLock<HashMap<String, Schedule>>>,
41 ) -> DbxResult<()> {
42 if self.handle.is_some() {
43 return Ok(()); }
45
46 let (tx, rx) = mpsc::channel();
47 self.shutdown_tx = Some(tx);
48
49 let handle = thread::spawn(move || {
50 loop {
51 if rx.try_recv().is_ok() {
53 break;
54 }
55
56 let db = match db_weak.upgrade() {
58 Some(db) => db,
59 None => {
60 break;
62 }
63 };
64
65 let schedule_list: Vec<(String, Schedule)> = {
67 let sched = schedules.read().unwrap();
68 sched.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
69 };
70
71 for (schedule_name, schedule) in schedule_list {
72 if !schedule.enabled {
74 continue;
75 }
76
77 use std::str::FromStr;
79 match cron::Schedule::from_str(&schedule.cron_expr) {
80 Ok(cron_schedule) => {
81 use chrono::Utc;
82 let now = Utc::now();
83
84 if let Some(next) = cron_schedule.upcoming(Utc).next() {
86 let diff = (next - now).num_seconds();
88 if diff.abs() <= 1 {
89 let exec = db.schedule_executor.read().unwrap();
91 if let Err(e) = exec.execute(&db, &schedule_name) {
92 eprintln!(
93 "[Scheduler] Failed to execute schedule '{}': {}",
94 schedule_name, e
95 );
96 }
97 }
98 }
99 }
100 Err(e) => {
101 eprintln!(
102 "[Scheduler] Invalid cron expression for '{}': {}",
103 schedule_name, e
104 );
105 }
106 }
107 }
108
109 thread::sleep(Duration::from_secs(1));
111 }
112 });
113
114 self.handle = Some(handle);
115 Ok(())
116 }
117
118 pub fn stop(&mut self) -> DbxResult<()> {
120 if let Some(tx) = self.shutdown_tx.take() {
121 let _ = tx.send(());
122 }
123
124 if let Some(handle) = self.handle.take() {
125 let _ = handle.join();
126 }
127
128 Ok(())
129 }
130
131 pub fn is_running(&self) -> bool {
133 self.handle.is_some()
134 }
135}
136
137impl Default for SchedulerThread {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143impl Drop for SchedulerThread {
144 fn drop(&mut self) {
145 let _ = self.stop();
146 }
147}