vibesql-server 0.1.3

Network server with PostgreSQL wire protocol for VibeSQL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
//! Schedule execution engine with retry logic
//!
//! Handles background execution of scheduled tasks and cron jobs with automatic retry.

use crate::Session;
use anyhow::Result;
use chrono::Utc;
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tokio::sync::Mutex;
use tracing::{error, info, warn};
use vibesql_parser::Parser;

use super::storage::{ExecutionHistoryRecord, ScheduleRecord, ScheduleStatus};

/// Configuration for schedule executor
#[derive(Debug, Clone)]
pub struct ScheduleExecutorConfig {
    /// Maximum number of retry attempts
    pub max_retries: u32,
    /// Initial retry backoff duration
    pub initial_backoff: StdDuration,
    /// Maximum retry backoff duration
    pub max_backoff: StdDuration,
    /// Exponential backoff multiplier
    pub backoff_multiplier: f64,
}

impl Default for ScheduleExecutorConfig {
    fn default() -> Self {
        Self {
            max_retries: 3,
            initial_backoff: StdDuration::from_secs(5),
            max_backoff: StdDuration::from_secs(300),
            backoff_multiplier: 2.0,
        }
    }
}

/// Executes scheduled tasks with retry logic
pub struct ScheduleExecutor {
    config: ScheduleExecutorConfig,
}

impl ScheduleExecutor {
    pub fn new(config: ScheduleExecutorConfig) -> Self {
        Self { config }
    }

    /// Execute a scheduled task with retry logic
    pub async fn execute_schedule(
        &self,
        schedule: &ScheduleRecord,
        session: Arc<Mutex<Session>>,
    ) -> Result<ExecutionHistoryRecord> {
        let started_at = Utc::now();

        // Validate SQL parses before retrying (don't retry parse errors)
        if let Err(e) = Parser::parse_sql(&schedule.sql) {
            return Ok(ExecutionHistoryRecord {
                id: None,
                schedule_id: Some(schedule.id.clone()),
                cron_name: None,
                started_at,
                completed_at: Some(Utc::now()),
                status: ScheduleStatus::Failed,
                error: Some(e.to_string()),
                rows_affected: None,
            });
        }

        // Execute with retries
        #[allow(unused_assignments)] // Initial None is never read, but keeps the code clear
        let mut last_error: Option<String> = None;
        let mut attempt = 0;

        loop {
            attempt += 1;
            let backoff = self.calculate_backoff(attempt - 1);

            match self.execute_statement(&schedule.sql, &session).await {
                Ok(rows_affected) => {
                    info!(
                        schedule_id = %schedule.id,
                        rows_affected = rows_affected,
                        attempts = attempt,
                        "Schedule executed successfully"
                    );
                    return Ok(ExecutionHistoryRecord {
                        id: None,
                        schedule_id: Some(schedule.id.clone()),
                        cron_name: None,
                        started_at,
                        completed_at: Some(Utc::now()),
                        status: ScheduleStatus::Completed,
                        error: None,
                        rows_affected: Some(rows_affected as i64),
                    });
                }
                Err(e) => {
                    last_error = Some(e.to_string());
                    warn!(
                        schedule_id = %schedule.id,
                        attempt = attempt,
                        error = %e,
                        "Schedule execution failed, will retry"
                    );

                    if attempt >= self.config.max_retries {
                        error!(
                            schedule_id = %schedule.id,
                            attempts = attempt,
                            error = %last_error.as_ref().unwrap(),
                            "Schedule execution failed after all retries"
                        );
                        return Ok(ExecutionHistoryRecord {
                            id: None,
                            schedule_id: Some(schedule.id.clone()),
                            cron_name: None,
                            started_at,
                            completed_at: Some(Utc::now()),
                            status: ScheduleStatus::Failed,
                            error: last_error,
                            rows_affected: None,
                        });
                    }

                    tokio::time::sleep(backoff).await;
                }
            }
        }
    }

    /// Calculate exponential backoff duration
    fn calculate_backoff(&self, attempt: u32) -> StdDuration {
        let backoff_secs = self.config.initial_backoff.as_secs_f64()
            * self.config.backoff_multiplier.powi(attempt as i32);

        let max_secs = self.config.max_backoff.as_secs_f64();
        let capped_secs = backoff_secs.min(max_secs);

        StdDuration::from_secs_f64(capped_secs)
    }

    /// Execute a single SQL statement via the session
    async fn execute_statement(&self, sql: &str, session: &Arc<Mutex<Session>>) -> Result<usize> {
        let mut session_guard = session.lock().await;
        let result = session_guard.execute(sql).await?;
        Ok(result.rows_affected() as usize)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_backoff_calculation() {
        let config = ScheduleExecutorConfig {
            initial_backoff: StdDuration::from_secs(5),
            max_backoff: StdDuration::from_secs(300),
            backoff_multiplier: 2.0,
            ..Default::default()
        };

        let executor = ScheduleExecutor::new(config);

        // First retry: 5 seconds
        assert_eq!(executor.calculate_backoff(0), StdDuration::from_secs(5));

        // Second retry: 10 seconds
        assert_eq!(executor.calculate_backoff(1), StdDuration::from_secs(10));

        // Third retry: 20 seconds
        assert_eq!(executor.calculate_backoff(2), StdDuration::from_secs(20));

        // Should cap at max_backoff (300 seconds)
        let very_high = executor.calculate_backoff(100);
        assert_eq!(very_high, StdDuration::from_secs(300));
    }

    #[tokio::test]
    async fn test_execute_schedule_insert() {
        // Create a session with a table
        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        session.execute("CREATE TABLE schedule_test (id INT, value VARCHAR(100))").await.unwrap();
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record for INSERT
        let schedule = ScheduleRecord {
            id: "test-schedule-1".to_string(),
            sql: "INSERT INTO schedule_test VALUES (1, 'scheduled')".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify success
        assert_eq!(result.status, ScheduleStatus::Completed);
        assert!(result.error.is_none());
        assert_eq!(result.rows_affected, Some(1));

        // Verify data was inserted
        let session_guard = session.lock().await;
        let _select_result = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        drop(session_guard);

        let mut verify_session = session.lock().await;
        let verify = verify_session.execute("SELECT * FROM schedule_test WHERE id = 1").await.unwrap();
        match verify {
            crate::session::ExecutionResult::Select { rows, .. } => {
                assert_eq!(rows.len(), 1);
            }
            _ => panic!("Expected Select result"),
        }
    }

    #[tokio::test]
    async fn test_execute_schedule_update() {
        // Create a session with a table and initial data
        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        session.execute("CREATE TABLE update_test (id INT, value VARCHAR(100))").await.unwrap();
        session.execute("INSERT INTO update_test VALUES (1, 'original')").await.unwrap();
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record for UPDATE
        let schedule = ScheduleRecord {
            id: "test-schedule-2".to_string(),
            sql: "UPDATE update_test SET value = 'updated' WHERE id = 1".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify success
        assert_eq!(result.status, ScheduleStatus::Completed);
        assert!(result.error.is_none());
        assert_eq!(result.rows_affected, Some(1));
    }

    #[tokio::test]
    async fn test_execute_schedule_delete() {
        // Create a session with a table and initial data
        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        session.execute("CREATE TABLE delete_test (id INT, value VARCHAR(100))").await.unwrap();
        session.execute("INSERT INTO delete_test VALUES (1, 'to_delete')").await.unwrap();
        session.execute("INSERT INTO delete_test VALUES (2, 'to_keep')").await.unwrap();
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record for DELETE
        let schedule = ScheduleRecord {
            id: "test-schedule-3".to_string(),
            sql: "DELETE FROM delete_test WHERE id = 1".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify success
        assert_eq!(result.status, ScheduleStatus::Completed);
        assert!(result.error.is_none());
        assert_eq!(result.rows_affected, Some(1));
    }

    #[tokio::test]
    async fn test_execute_schedule_select() {
        // Create a session with a table and data
        let mut session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        session.execute("CREATE TABLE select_test (id INT, value VARCHAR(100))").await.unwrap();
        session.execute("INSERT INTO select_test VALUES (1, 'row1')").await.unwrap();
        session.execute("INSERT INTO select_test VALUES (2, 'row2')").await.unwrap();
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record for SELECT
        let schedule = ScheduleRecord {
            id: "test-schedule-4".to_string(),
            sql: "SELECT * FROM select_test".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify success - SELECT returns rows as rows_affected
        assert_eq!(result.status, ScheduleStatus::Completed);
        assert!(result.error.is_none());
        assert_eq!(result.rows_affected, Some(2));
    }

    #[tokio::test]
    async fn test_execute_schedule_invalid_sql() {
        // Create a session
        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record with invalid SQL
        let schedule = ScheduleRecord {
            id: "test-schedule-5".to_string(),
            sql: "INVALID SQL SYNTAX HERE".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify failure (parse error - no retries)
        assert_eq!(result.status, ScheduleStatus::Failed);
        assert!(result.error.is_some());
        assert!(result.rows_affected.is_none());
    }

    #[tokio::test]
    async fn test_execute_schedule_table_not_found() {
        // Create a session without the table
        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        let session = Arc::new(Mutex::new(session));

        // Create executor with minimal retries for faster test
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig {
            max_retries: 1,
            initial_backoff: StdDuration::from_millis(10),
            ..Default::default()
        });

        // Create a schedule record for a non-existent table
        let schedule = ScheduleRecord {
            id: "test-schedule-6".to_string(),
            sql: "INSERT INTO nonexistent_table VALUES (1, 'test')".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify failure (execution error after retries)
        assert_eq!(result.status, ScheduleStatus::Failed);
        assert!(result.error.is_some());
        assert!(result.rows_affected.is_none());
    }

    #[tokio::test]
    async fn test_execute_schedule_create_table() {
        // Create a session
        let session = Session::new_standalone("testdb".to_string(), "testuser".to_string());
        let session = Arc::new(Mutex::new(session));

        // Create executor
        let executor = ScheduleExecutor::new(ScheduleExecutorConfig::default());

        // Create a schedule record for CREATE TABLE
        let schedule = ScheduleRecord {
            id: "test-schedule-7".to_string(),
            sql: "CREATE TABLE scheduled_table (id INT, name VARCHAR(100))".to_string(),
            params: None,
            run_at: Utc::now(),
            created_at: Utc::now(),
            status: ScheduleStatus::Pending,
            attempts: 0,
            last_error: None,
            completed_at: None,
        };

        // Execute the schedule
        let result = executor.execute_schedule(&schedule, session.clone()).await.unwrap();

        // Verify success - DDL returns 0 rows_affected
        assert_eq!(result.status, ScheduleStatus::Completed);
        assert!(result.error.is_none());
        assert_eq!(result.rows_affected, Some(0));

        // Verify table was created by inserting into it
        let mut session_guard = session.lock().await;
        let insert_result = session_guard.execute("INSERT INTO scheduled_table VALUES (1, 'test')").await;
        assert!(insert_result.is_ok());
    }
}