1use crate::Session;
6use anyhow::Result;
7use chrono::Utc;
8use std::sync::Arc;
9use std::time::Duration as StdDuration;
10use tokio::sync::Mutex;
11use tracing::{error, info, warn};
12use vibesql_parser::Parser;
13
14use super::storage::{ExecutionHistoryRecord, ScheduleRecord, ScheduleStatus};
15
16#[derive(Debug, Clone)]
18pub struct ScheduleExecutorConfig {
19 pub max_retries: u32,
21 pub initial_backoff: StdDuration,
23 pub max_backoff: StdDuration,
25 pub backoff_multiplier: f64,
27}
28
29impl Default for ScheduleExecutorConfig {
30 fn default() -> Self {
31 Self {
32 max_retries: 3,
33 initial_backoff: StdDuration::from_secs(5),
34 max_backoff: StdDuration::from_secs(300),
35 backoff_multiplier: 2.0,
36 }
37 }
38}
39
40pub struct ScheduleExecutor {
42 config: ScheduleExecutorConfig,
43}
44
45impl ScheduleExecutor {
46 pub fn new(config: ScheduleExecutorConfig) -> Self {
47 Self { config }
48 }
49
50 pub async fn execute_schedule(
52 &self,
53 schedule: &ScheduleRecord,
54 session: Arc<Mutex<Session>>,
55 ) -> Result<ExecutionHistoryRecord> {
56 let started_at = Utc::now();
57
58 if let Err(e) = Parser::parse_sql(&schedule.sql) {
60 return Ok(ExecutionHistoryRecord {
61 id: None,
62 schedule_id: Some(schedule.id.clone()),
63 cron_name: None,
64 started_at,
65 completed_at: Some(Utc::now()),
66 status: ScheduleStatus::Failed,
67 error: Some(e.to_string()),
68 rows_affected: None,
69 });
70 }
71
72 #[allow(unused_assignments)] let mut last_error: Option<String> = None;
75 let mut attempt = 0;
76
77 loop {
78 attempt += 1;
79 let backoff = self.calculate_backoff(attempt - 1);
80
81 match self.execute_statement(&schedule.sql, &session).await {
82 Ok(rows_affected) => {
83 info!(
84 schedule_id = %schedule.id,
85 rows_affected = rows_affected,
86 attempts = attempt,
87 "Schedule executed successfully"
88 );
89 return Ok(ExecutionHistoryRecord {
90 id: None,
91 schedule_id: Some(schedule.id.clone()),
92 cron_name: None,
93 started_at,
94 completed_at: Some(Utc::now()),
95 status: ScheduleStatus::Completed,
96 error: None,
97 rows_affected: Some(rows_affected as i64),
98 });
99 }
100 Err(e) => {
101 last_error = Some(e.to_string());
102 warn!(
103 schedule_id = %schedule.id,
104 attempt = attempt,
105 error = %e,
106 "Schedule execution failed, will retry"
107 );
108
109 if attempt >= self.config.max_retries {
110 error!(
111 schedule_id = %schedule.id,
112 attempts = attempt,
113 error = %last_error.as_ref().unwrap(),
114 "Schedule execution failed after all retries"
115 );
116 return Ok(ExecutionHistoryRecord {
117 id: None,
118 schedule_id: Some(schedule.id.clone()),
119 cron_name: None,
120 started_at,
121 completed_at: Some(Utc::now()),
122 status: ScheduleStatus::Failed,
123 error: last_error,
124 rows_affected: None,
125 });
126 }
127
128 tokio::time::sleep(backoff).await;
129 }
130 }
131 }
132 }
133
134 fn calculate_backoff(&self, attempt: u32) -> StdDuration {
136 let backoff_secs = self.config.initial_backoff.as_secs_f64()
137 * self.config.backoff_multiplier.powi(attempt as i32);
138
139 let max_secs = self.config.max_backoff.as_secs_f64();
140 let capped_secs = backoff_secs.min(max_secs);
141
142 StdDuration::from_secs_f64(capped_secs)
143 }
144
145 async fn execute_statement(&self, sql: &str, session: &Arc<Mutex<Session>>) -> Result<usize> {
147 let mut session_guard = session.lock().await;
148 let result = session_guard.execute(sql).await?;
149 Ok(result.rows_affected() as usize)
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 #[test]
158 fn test_backoff_calculation() {
159 let config = ScheduleExecutorConfig {
160 initial_backoff: StdDuration::from_secs(5),
161 max_backoff: StdDuration::from_secs(300),
162 backoff_multiplier: 2.0,
163 ..Default::default()
164 };
165
166 let executor = ScheduleExecutor::new(config);
167
168 assert_eq!(executor.calculate_backoff(0), StdDuration::from_secs(5));
170
171 assert_eq!(executor.calculate_backoff(1), StdDuration::from_secs(10));
173
174 assert_eq!(executor.calculate_backoff(2), StdDuration::from_secs(20));
176
177 let very_high = executor.calculate_backoff(100);
179 assert_eq!(very_high, StdDuration::from_secs(300));
180 }
181
182 #[tokio::test]
183 async fn test_execute_schedule_insert() {
184 let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
186 session.execute("CREATE TABLE schedule_test (id INT, value VARCHAR(100))").await.unwrap();
187 let session = Arc::new(Mutex::new(session));
188
189 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
191
192 let schedule = ScheduleRecord {
194 id: "test-schedule-1".to_string(),
195 sql: "INSERT INTO schedule_test VALUES (1, 'scheduled')".to_string(),
196 params: None,
197 run_at: Utc::now(),
198 created_at: Utc::now(),
199 status: ScheduleStatus::Pending,
200 attempts: 0,
201 last_error: None,
202 completed_at: None,
203 };
204
205 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
207
208 assert_eq!(result.status, ScheduleStatus::Completed);
210 assert!(result.error.is_none());
211 assert_eq!(result.rows_affected, Some(1));
212
213 let session_guard = session.lock().await;
215 let _select_result = Session::new_standalone("testdb".to_string(), "testuser".to_string());
216 drop(session_guard);
217
218 let mut verify_session = session.lock().await;
219 let verify = verify_session.execute("SELECT * FROM schedule_test WHERE id = 1").await.unwrap();
220 match verify {
221 crate::session::ExecutionResult::Select { rows, .. } => {
222 assert_eq!(rows.len(), 1);
223 }
224 _ => panic!("Expected Select result"),
225 }
226 }
227
228 #[tokio::test]
229 async fn test_execute_schedule_update() {
230 let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
232 session.execute("CREATE TABLE update_test (id INT, value VARCHAR(100))").await.unwrap();
233 session.execute("INSERT INTO update_test VALUES (1, 'original')").await.unwrap();
234 let session = Arc::new(Mutex::new(session));
235
236 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
238
239 let schedule = ScheduleRecord {
241 id: "test-schedule-2".to_string(),
242 sql: "UPDATE update_test SET value = 'updated' WHERE id = 1".to_string(),
243 params: None,
244 run_at: Utc::now(),
245 created_at: Utc::now(),
246 status: ScheduleStatus::Pending,
247 attempts: 0,
248 last_error: None,
249 completed_at: None,
250 };
251
252 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
254
255 assert_eq!(result.status, ScheduleStatus::Completed);
257 assert!(result.error.is_none());
258 assert_eq!(result.rows_affected, Some(1));
259 }
260
261 #[tokio::test]
262 async fn test_execute_schedule_delete() {
263 let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
265 session.execute("CREATE TABLE delete_test (id INT, value VARCHAR(100))").await.unwrap();
266 session.execute("INSERT INTO delete_test VALUES (1, 'to_delete')").await.unwrap();
267 session.execute("INSERT INTO delete_test VALUES (2, 'to_keep')").await.unwrap();
268 let session = Arc::new(Mutex::new(session));
269
270 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
272
273 let schedule = ScheduleRecord {
275 id: "test-schedule-3".to_string(),
276 sql: "DELETE FROM delete_test WHERE id = 1".to_string(),
277 params: None,
278 run_at: Utc::now(),
279 created_at: Utc::now(),
280 status: ScheduleStatus::Pending,
281 attempts: 0,
282 last_error: None,
283 completed_at: None,
284 };
285
286 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
288
289 assert_eq!(result.status, ScheduleStatus::Completed);
291 assert!(result.error.is_none());
292 assert_eq!(result.rows_affected, Some(1));
293 }
294
295 #[tokio::test]
296 async fn test_execute_schedule_select() {
297 let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
299 session.execute("CREATE TABLE select_test (id INT, value VARCHAR(100))").await.unwrap();
300 session.execute("INSERT INTO select_test VALUES (1, 'row1')").await.unwrap();
301 session.execute("INSERT INTO select_test VALUES (2, 'row2')").await.unwrap();
302 let session = Arc::new(Mutex::new(session));
303
304 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
306
307 let schedule = ScheduleRecord {
309 id: "test-schedule-4".to_string(),
310 sql: "SELECT * FROM select_test".to_string(),
311 params: None,
312 run_at: Utc::now(),
313 created_at: Utc::now(),
314 status: ScheduleStatus::Pending,
315 attempts: 0,
316 last_error: None,
317 completed_at: None,
318 };
319
320 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
322
323 assert_eq!(result.status, ScheduleStatus::Completed);
325 assert!(result.error.is_none());
326 assert_eq!(result.rows_affected, Some(2));
327 }
328
329 #[tokio::test]
330 async fn test_execute_schedule_invalid_sql() {
331 let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
333 let session = Arc::new(Mutex::new(session));
334
335 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
337
338 let schedule = ScheduleRecord {
340 id: "test-schedule-5".to_string(),
341 sql: "INVALID SQL SYNTAX HERE".to_string(),
342 params: None,
343 run_at: Utc::now(),
344 created_at: Utc::now(),
345 status: ScheduleStatus::Pending,
346 attempts: 0,
347 last_error: None,
348 completed_at: None,
349 };
350
351 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
353
354 assert_eq!(result.status, ScheduleStatus::Failed);
356 assert!(result.error.is_some());
357 assert!(result.rows_affected.is_none());
358 }
359
360 #[tokio::test]
361 async fn test_execute_schedule_table_not_found() {
362 let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
364 let session = Arc::new(Mutex::new(session));
365
366 let executor = ScheduleExecutor::new(ScheduleExecutorConfig {
368 max_retries: 1,
369 initial_backoff: StdDuration::from_millis(10),
370 ..Default::default()
371 });
372
373 let schedule = ScheduleRecord {
375 id: "test-schedule-6".to_string(),
376 sql: "INSERT INTO nonexistent_table VALUES (1, 'test')".to_string(),
377 params: None,
378 run_at: Utc::now(),
379 created_at: Utc::now(),
380 status: ScheduleStatus::Pending,
381 attempts: 0,
382 last_error: None,
383 completed_at: None,
384 };
385
386 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
388
389 assert_eq!(result.status, ScheduleStatus::Failed);
391 assert!(result.error.is_some());
392 assert!(result.rows_affected.is_none());
393 }
394
395 #[tokio::test]
396 async fn test_execute_schedule_create_table() {
397 let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
399 let session = Arc::new(Mutex::new(session));
400
401 let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
403
404 let schedule = ScheduleRecord {
406 id: "test-schedule-7".to_string(),
407 sql: "CREATE TABLE scheduled_table (id INT, name VARCHAR(100))".to_string(),
408 params: None,
409 run_at: Utc::now(),
410 created_at: Utc::now(),
411 status: ScheduleStatus::Pending,
412 attempts: 0,
413 last_error: None,
414 completed_at: None,
415 };
416
417 let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
419
420 assert_eq!(result.status, ScheduleStatus::Completed);
422 assert!(result.error.is_none());
423 assert_eq!(result.rows_affected, Some(0));
424
425 let mut session_guard = session.lock().await;
427 let insert_result = session_guard.execute("INSERT INTO scheduled_table VALUES (1, 'test')").await;
428 assert!(insert_result.is_ok());
429 }
430}