Skip to main content

vibesql_server/scheduler/
executor.rs

1//! Schedule execution engine with retry logic
2//!
3//! Handles background execution of scheduled tasks and cron jobs with automatic retry.
4
5use crate::Session;
6use anyhow::Result;
7use chrono::Utc;
8use std::sync::Arc;
9use std::time::Duration as StdDuration;
10use tokio::sync::Mutex;
11use tracing::{error, info, warn};
12use vibesql_parser::Parser;
13
14use super::storage::{ExecutionHistoryRecord, ScheduleRecord, ScheduleStatus};
15
16/// Configuration for schedule executor
17#[derive(Debug, Clone)]
18pub struct ScheduleExecutorConfig {
19    /// Maximum number of retry attempts
20    pub max_retries: u32,
21    /// Initial retry backoff duration
22    pub initial_backoff: StdDuration,
23    /// Maximum retry backoff duration
24    pub max_backoff: StdDuration,
25    /// Exponential backoff multiplier
26    pub backoff_multiplier: f64,
27}
28
29impl Default for ScheduleExecutorConfig {
30    fn default() -> Self {
31        Self {
32            max_retries: 3,
33            initial_backoff: StdDuration::from_secs(5),
34            max_backoff: StdDuration::from_secs(300),
35            backoff_multiplier: 2.0,
36        }
37    }
38}
39
40/// Executes scheduled tasks with retry logic
41pub struct ScheduleExecutor {
42    config: ScheduleExecutorConfig,
43}
44
45impl ScheduleExecutor {
46    pub fn new(config: ScheduleExecutorConfig) -> Self {
47        Self { config }
48    }
49
50    /// Execute a scheduled task with retry logic
51    pub async fn execute_schedule(
52        &self,
53        schedule: &ScheduleRecord,
54        session: Arc<Mutex<Session>>,
55    ) -> Result<ExecutionHistoryRecord> {
56        let started_at = Utc::now();
57
58        // Validate SQL parses before retrying (don't retry parse errors)
59        if let Err(e) = Parser::parse_sql(&schedule.sql) {
60            return Ok(ExecutionHistoryRecord {
61                id: None,
62                schedule_id: Some(schedule.id.clone()),
63                cron_name: None,
64                started_at,
65                completed_at: Some(Utc::now()),
66                status: ScheduleStatus::Failed,
67                error: Some(e.to_string()),
68                rows_affected: None,
69            });
70        }
71
72        // Execute with retries
73        #[allow(unused_assignments)] // Initial None is never read, but keeps the code clear
74        let mut last_error: Option<String> = None;
75        let mut attempt = 0;
76
77        loop {
78            attempt += 1;
79            let backoff = self.calculate_backoff(attempt - 1);
80
81            match self.execute_statement(&schedule.sql, &session).await {
82                Ok(rows_affected) => {
83                    info!(
84                        schedule_id = %schedule.id,
85                        rows_affected = rows_affected,
86                        attempts = attempt,
87                        "Schedule executed successfully"
88                    );
89                    return Ok(ExecutionHistoryRecord {
90                        id: None,
91                        schedule_id: Some(schedule.id.clone()),
92                        cron_name: None,
93                        started_at,
94                        completed_at: Some(Utc::now()),
95                        status: ScheduleStatus::Completed,
96                        error: None,
97                        rows_affected: Some(rows_affected as i64),
98                    });
99                }
100                Err(e) => {
101                    last_error = Some(e.to_string());
102                    warn!(
103                        schedule_id = %schedule.id,
104                        attempt = attempt,
105                        error = %e,
106                        "Schedule execution failed, will retry"
107                    );
108
109                    if attempt >= self.config.max_retries {
110                        error!(
111                            schedule_id = %schedule.id,
112                            attempts = attempt,
113                            error = %last_error.as_ref().unwrap(),
114                            "Schedule execution failed after all retries"
115                        );
116                        return Ok(ExecutionHistoryRecord {
117                            id: None,
118                            schedule_id: Some(schedule.id.clone()),
119                            cron_name: None,
120                            started_at,
121                            completed_at: Some(Utc::now()),
122                            status: ScheduleStatus::Failed,
123                            error: last_error,
124                            rows_affected: None,
125                        });
126                    }
127
128                    tokio::time::sleep(backoff).await;
129                }
130            }
131        }
132    }
133
134    /// Calculate exponential backoff duration
135    fn calculate_backoff(&self, attempt: u32) -> StdDuration {
136        let backoff_secs = self.config.initial_backoff.as_secs_f64()
137            * self.config.backoff_multiplier.powi(attempt as i32);
138
139        let max_secs = self.config.max_backoff.as_secs_f64();
140        let capped_secs = backoff_secs.min(max_secs);
141
142        StdDuration::from_secs_f64(capped_secs)
143    }
144
145    /// Execute a single SQL statement via the session
146    async fn execute_statement(&self, sql: &str, session: &Arc<Mutex<Session>>) -> Result<usize> {
147        let mut session_guard = session.lock().await;
148        let result = session_guard.execute(sql).await?;
149        Ok(result.rows_affected() as usize)
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    #[test]
158    fn test_backoff_calculation() {
159        let config = ScheduleExecutorConfig {
160            initial_backoff: StdDuration::from_secs(5),
161            max_backoff: StdDuration::from_secs(300),
162            backoff_multiplier: 2.0,
163            ..Default::default()
164        };
165
166        let executor = ScheduleExecutor::new(config);
167
168        // First retry: 5 seconds
169        assert_eq!(executor.calculate_backoff(0), StdDuration::from_secs(5));
170
171        // Second retry: 10 seconds
172        assert_eq!(executor.calculate_backoff(1), StdDuration::from_secs(10));
173
174        // Third retry: 20 seconds
175        assert_eq!(executor.calculate_backoff(2), StdDuration::from_secs(20));
176
177        // Should cap at max_backoff (300 seconds)
178        let very_high = executor.calculate_backoff(100);
179        assert_eq!(very_high, StdDuration::from_secs(300));
180    }
181
182    #[tokio::test]
183    async fn test_execute_schedule_insert() {
184        // Create a session with a table
185        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
186        session.execute("CREATE TABLE schedule_test (id INT, value VARCHAR(100))").await.unwrap();
187        let session = Arc::new(Mutex::new(session));
188
189        // Create executor
190        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
191
192        // Create a schedule record for INSERT
193        let schedule = ScheduleRecord {
194            id: "test-schedule-1".to_string(),
195            sql: "INSERT INTO schedule_test VALUES (1, 'scheduled')".to_string(),
196            params: None,
197            run_at: Utc::now(),
198            created_at: Utc::now(),
199            status: ScheduleStatus::Pending,
200            attempts: 0,
201            last_error: None,
202            completed_at: None,
203        };
204
205        // Execute the schedule
206        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
207
208        // Verify success
209        assert_eq!(result.status, ScheduleStatus::Completed);
210        assert!(result.error.is_none());
211        assert_eq!(result.rows_affected, Some(1));
212
213        // Verify data was inserted
214        let session_guard = session.lock().await;
215        let _select_result = Session::new_standalone("testdb".to_string(), "testuser".to_string());
216        drop(session_guard);
217
218        let mut verify_session = session.lock().await;
219        let verify = verify_session.execute("SELECT * FROM schedule_test WHERE id = 1").await.unwrap();
220        match verify {
221            crate::session::ExecutionResult::Select { rows, .. } => {
222                assert_eq!(rows.len(), 1);
223            }
224            _ => panic!("Expected Select result"),
225        }
226    }
227
228    #[tokio::test]
229    async fn test_execute_schedule_update() {
230        // Create a session with a table and initial data
231        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
232        session.execute("CREATE TABLE update_test (id INT, value VARCHAR(100))").await.unwrap();
233        session.execute("INSERT INTO update_test VALUES (1, 'original')").await.unwrap();
234        let session = Arc::new(Mutex::new(session));
235
236        // Create executor
237        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
238
239        // Create a schedule record for UPDATE
240        let schedule = ScheduleRecord {
241            id: "test-schedule-2".to_string(),
242            sql: "UPDATE update_test SET value = 'updated' WHERE id = 1".to_string(),
243            params: None,
244            run_at: Utc::now(),
245            created_at: Utc::now(),
246            status: ScheduleStatus::Pending,
247            attempts: 0,
248            last_error: None,
249            completed_at: None,
250        };
251
252        // Execute the schedule
253        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
254
255        // Verify success
256        assert_eq!(result.status, ScheduleStatus::Completed);
257        assert!(result.error.is_none());
258        assert_eq!(result.rows_affected, Some(1));
259    }
260
261    #[tokio::test]
262    async fn test_execute_schedule_delete() {
263        // Create a session with a table and initial data
264        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
265        session.execute("CREATE TABLE delete_test (id INT, value VARCHAR(100))").await.unwrap();
266        session.execute("INSERT INTO delete_test VALUES (1, 'to_delete')").await.unwrap();
267        session.execute("INSERT INTO delete_test VALUES (2, 'to_keep')").await.unwrap();
268        let session = Arc::new(Mutex::new(session));
269
270        // Create executor
271        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
272
273        // Create a schedule record for DELETE
274        let schedule = ScheduleRecord {
275            id: "test-schedule-3".to_string(),
276            sql: "DELETE FROM delete_test WHERE id = 1".to_string(),
277            params: None,
278            run_at: Utc::now(),
279            created_at: Utc::now(),
280            status: ScheduleStatus::Pending,
281            attempts: 0,
282            last_error: None,
283            completed_at: None,
284        };
285
286        // Execute the schedule
287        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
288
289        // Verify success
290        assert_eq!(result.status, ScheduleStatus::Completed);
291        assert!(result.error.is_none());
292        assert_eq!(result.rows_affected, Some(1));
293    }
294
295    #[tokio::test]
296    async fn test_execute_schedule_select() {
297        // Create a session with a table and data
298        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
299        session.execute("CREATE TABLE select_test (id INT, value VARCHAR(100))").await.unwrap();
300        session.execute("INSERT INTO select_test VALUES (1, 'row1')").await.unwrap();
301        session.execute("INSERT INTO select_test VALUES (2, 'row2')").await.unwrap();
302        let session = Arc::new(Mutex::new(session));
303
304        // Create executor
305        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
306
307        // Create a schedule record for SELECT
308        let schedule = ScheduleRecord {
309            id: "test-schedule-4".to_string(),
310            sql: "SELECT * FROM select_test".to_string(),
311            params: None,
312            run_at: Utc::now(),
313            created_at: Utc::now(),
314            status: ScheduleStatus::Pending,
315            attempts: 0,
316            last_error: None,
317            completed_at: None,
318        };
319
320        // Execute the schedule
321        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
322
323        // Verify success - SELECT returns rows as rows_affected
324        assert_eq!(result.status, ScheduleStatus::Completed);
325        assert!(result.error.is_none());
326        assert_eq!(result.rows_affected, Some(2));
327    }
328
329    #[tokio::test]
330    async fn test_execute_schedule_invalid_sql() {
331        // Create a session
332        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
333        let session = Arc::new(Mutex::new(session));
334
335        // Create executor
336        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
337
338        // Create a schedule record with invalid SQL
339        let schedule = ScheduleRecord {
340            id: "test-schedule-5".to_string(),
341            sql: "INVALID SQL SYNTAX HERE".to_string(),
342            params: None,
343            run_at: Utc::now(),
344            created_at: Utc::now(),
345            status: ScheduleStatus::Pending,
346            attempts: 0,
347            last_error: None,
348            completed_at: None,
349        };
350
351        // Execute the schedule
352        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
353
354        // Verify failure (parse error - no retries)
355        assert_eq!(result.status, ScheduleStatus::Failed);
356        assert!(result.error.is_some());
357        assert!(result.rows_affected.is_none());
358    }
359
360    #[tokio::test]
361    async fn test_execute_schedule_table_not_found() {
362        // Create a session without the table
363        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
364        let session = Arc::new(Mutex::new(session));
365
366        // Create executor with minimal retries for faster test
367        let executor = ScheduleExecutor::new(ScheduleExecutorConfig {
368            max_retries: 1,
369            initial_backoff: StdDuration::from_millis(10),
370            ..Default::default()
371        });
372
373        // Create a schedule record for a non-existent table
374        let schedule = ScheduleRecord {
375            id: "test-schedule-6".to_string(),
376            sql: "INSERT INTO nonexistent_table VALUES (1, 'test')".to_string(),
377            params: None,
378            run_at: Utc::now(),
379            created_at: Utc::now(),
380            status: ScheduleStatus::Pending,
381            attempts: 0,
382            last_error: None,
383            completed_at: None,
384        };
385
386        // Execute the schedule
387        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
388
389        // Verify failure (execution error after retries)
390        assert_eq!(result.status, ScheduleStatus::Failed);
391        assert!(result.error.is_some());
392        assert!(result.rows_affected.is_none());
393    }
394
395    #[tokio::test]
396    async fn test_execute_schedule_create_table() {
397        // Create a session
398        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
399        let session = Arc::new(Mutex::new(session));
400
401        // Create executor
402        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());
403
404        // Create a schedule record for CREATE TABLE
405        let schedule = ScheduleRecord {
406            id: "test-schedule-7".to_string(),
407            sql: "CREATE TABLE scheduled_table (id INT, name VARCHAR(100))".to_string(),
408            params: None,
409            run_at: Utc::now(),
410            created_at: Utc::now(),
411            status: ScheduleStatus::Pending,
412            attempts: 0,
413            last_error: None,
414            completed_at: None,
415        };
416
417        // Execute the schedule
418        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();
419
420        // Verify success - DDL returns 0 rows_affected
421        assert_eq!(result.status, ScheduleStatus::Completed);
422        assert!(result.error.is_none());
423        assert_eq!(result.rows_affected, Some(0));
424
425        // Verify table was created by inserting into it
426        let mut session_guard = session.lock().await;
427        let insert_result = session_guard.execute("INSERT INTO scheduled_table VALUES (1, 'test')").await;
428        assert!(insert_result.is_ok());
429    }
430}