Skip to main content

azoth_scheduler/
projection.rs

1//! Schedule projection for task state management.
2
3use crate::error::{Result, SchedulerError};
4use crate::events::SchedulerEvent;
5use crate::schedule::Schedule;
6use chrono::{DateTime, Utc};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::sync::Arc;
9
10/// Projection that maintains task schedules in SQLite.
11pub struct ScheduleProjection {
12    conn: Arc<Connection>,
13}
14
15impl ScheduleProjection {
16    /// Create a new schedule projection.
17    pub fn new(conn: Arc<Connection>) -> Self {
18        Self { conn }
19    }
20
21    /// Initialize the database schema.
22    pub fn init_schema(&self) -> Result<()> {
23        self.conn.execute_batch(
24            r#"
25            CREATE TABLE IF NOT EXISTS scheduled_tasks (
26                task_id TEXT PRIMARY KEY,
27                task_type TEXT NOT NULL,
28                schedule_type TEXT NOT NULL,
29                schedule_data TEXT NOT NULL,
30                payload BLOB NOT NULL,
31                enabled INTEGER NOT NULL DEFAULT 1,
32                next_run_time INTEGER NOT NULL,
33                last_run_time INTEGER,
34                last_execution_id TEXT,
35                retry_count INTEGER NOT NULL DEFAULT 0,
36                max_retries INTEGER NOT NULL DEFAULT 3,
37                timeout_secs INTEGER NOT NULL DEFAULT 300,
38                created_at INTEGER NOT NULL,
39                updated_at INTEGER NOT NULL
40            );
41
42            CREATE INDEX IF NOT EXISTS idx_next_run_time
43                ON scheduled_tasks(next_run_time, enabled);
44
45            CREATE TABLE IF NOT EXISTS task_executions (
46                execution_id TEXT PRIMARY KEY,
47                task_id TEXT NOT NULL,
48                triggered_event_id INTEGER NOT NULL,
49                started_at INTEGER NOT NULL,
50                completed_at INTEGER,
51                success INTEGER,
52                error TEXT,
53                retry_attempt INTEGER NOT NULL DEFAULT 0,
54                FOREIGN KEY (task_id) REFERENCES scheduled_tasks(task_id)
55            );
56
57            CREATE INDEX IF NOT EXISTS idx_task_executions_task_id
58                ON task_executions(task_id, started_at DESC);
59            "#,
60        )?;
61        Ok(())
62    }
63
64    /// Get tasks that are due to run.
65    pub fn get_due_tasks(&self, now: i64) -> Result<Vec<DueTask>> {
66        let mut stmt = self.conn.prepare(
67            r#"
68            SELECT task_id, task_type, payload, retry_count, max_retries, timeout_secs
69            FROM scheduled_tasks
70            WHERE enabled = 1 AND next_run_time <= ?
71            ORDER BY next_run_time ASC
72            "#,
73        )?;
74
75        let tasks = stmt
76            .query_map(params![now], |row| {
77                Ok(DueTask {
78                    task_id: row.get(0)?,
79                    task_type: row.get(1)?,
80                    payload: row.get(2)?,
81                    retry_count: row.get(3)?,
82                    max_retries: row.get(4)?,
83                    timeout_secs: row.get(5)?,
84                })
85            })?
86            .collect::<std::result::Result<Vec<_>, _>>()?;
87
88        Ok(tasks)
89    }
90
91    /// Get the next wake time (earliest next_run_time).
92    pub fn get_next_wake_time(&self) -> Result<Option<DateTime<Utc>>> {
93        let next: Option<i64> = self
94            .conn
95            .query_row(
96                r#"
97                SELECT next_run_time
98                FROM scheduled_tasks
99                WHERE enabled = 1
100                ORDER BY next_run_time ASC
101                LIMIT 1
102                "#,
103                [],
104                |row| row.get(0),
105            )
106            .optional()?;
107
108        Ok(next.and_then(|ts| DateTime::from_timestamp(ts, 0)))
109    }
110
111    /// Get a task by ID.
112    pub fn get_task(&self, task_id: &str) -> Result<Option<ScheduledTask>> {
113        let task: Option<ScheduledTask> = self
114            .conn
115            .query_row(
116                r#"
117                SELECT task_id, task_type, schedule_type, schedule_data, payload,
118                       enabled, next_run_time, last_run_time, last_execution_id,
119                       retry_count, max_retries, timeout_secs, created_at, updated_at
120                FROM scheduled_tasks
121                WHERE task_id = ?
122                "#,
123                params![task_id],
124                |row| {
125                    Ok(ScheduledTask {
126                        task_id: row.get(0)?,
127                        task_type: row.get(1)?,
128                        schedule: serde_json::from_str(&row.get::<_, String>(3)?).unwrap(),
129                        payload: row.get(4)?,
130                        enabled: row.get(5)?,
131                        next_run_time: row
132                            .get::<_, Option<i64>>(6)?
133                            .and_then(|ts| DateTime::from_timestamp(ts, 0)),
134                        last_run_time: row
135                            .get::<_, Option<i64>>(7)?
136                            .and_then(|ts| DateTime::from_timestamp(ts, 0)),
137                        last_execution_id: row.get(8)?,
138                        retry_count: row.get(9)?,
139                        max_retries: row.get(10)?,
140                        timeout_secs: row.get(11)?,
141                        created_at: DateTime::from_timestamp(row.get(12)?, 0).unwrap(),
142                        updated_at: DateTime::from_timestamp(row.get(13)?, 0).unwrap(),
143                    })
144                },
145            )
146            .optional()?;
147
148        Ok(task)
149    }
150
151    /// List all tasks with optional filtering.
152    ///
153    /// All filter values are passed as bound parameters to prevent SQL injection.
154    pub fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<ScheduledTask>> {
155        let mut query = String::from(
156            r#"
157            SELECT task_id, task_type, schedule_type, schedule_data, payload,
158                   enabled, next_run_time, last_run_time, last_execution_id,
159                   retry_count, max_retries, timeout_secs, created_at, updated_at
160            FROM scheduled_tasks
161            WHERE 1=1
162            "#,
163        );
164
165        // Collect bound parameters to prevent SQL injection
166        let mut bound_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
167
168        if let Some(enabled) = filter.enabled {
169            query.push_str(" AND enabled = ?");
170            bound_params.push(Box::new(if enabled { 1_i32 } else { 0_i32 }));
171        }
172
173        if let Some(task_type) = &filter.task_type {
174            query.push_str(" AND task_type = ?");
175            bound_params.push(Box::new(task_type.clone()));
176        }
177
178        query.push_str(" ORDER BY created_at DESC");
179
180        let mut stmt = self.conn.prepare(&query)?;
181        let tasks = stmt
182            .query_map(rusqlite::params_from_iter(bound_params.iter()), |row| {
183                Ok(ScheduledTask {
184                    task_id: row.get(0)?,
185                    task_type: row.get(1)?,
186                    schedule: serde_json::from_str(&row.get::<_, String>(3)?).unwrap(),
187                    payload: row.get(4)?,
188                    enabled: row.get(5)?,
189                    next_run_time: row
190                        .get::<_, Option<i64>>(6)?
191                        .and_then(|ts| DateTime::from_timestamp(ts, 0)),
192                    last_run_time: row
193                        .get::<_, Option<i64>>(7)?
194                        .and_then(|ts| DateTime::from_timestamp(ts, 0)),
195                    last_execution_id: row.get(8)?,
196                    retry_count: row.get(9)?,
197                    max_retries: row.get(10)?,
198                    timeout_secs: row.get(11)?,
199                    created_at: DateTime::from_timestamp(row.get(12)?, 0).unwrap(),
200                    updated_at: DateTime::from_timestamp(row.get(13)?, 0).unwrap(),
201                })
202            })?
203            .collect::<std::result::Result<Vec<_>, _>>()?;
204
205        Ok(tasks)
206    }
207
208    /// Apply an event to the projection.
209    pub fn apply_event(&self, event: &SchedulerEvent) -> Result<()> {
210        match event {
211            SchedulerEvent::TaskScheduled {
212                task_id,
213                task_type,
214                schedule,
215                payload,
216                max_retries,
217                timeout_secs,
218            } => {
219                let now = Utc::now();
220                let next_run_time = schedule.next_run_time(now)?;
221
222                let schedule_type = match schedule {
223                    Schedule::Cron { .. } => "cron",
224                    Schedule::Interval { .. } => "interval",
225                    Schedule::OneTime { .. } => "one_time",
226                    Schedule::Immediate => "immediate",
227                };
228
229                let schedule_data = serde_json::to_string(schedule)?;
230
231                self.conn.execute(
232                    r#"
233                    INSERT INTO scheduled_tasks
234                    (task_id, task_type, schedule_type, schedule_data, payload,
235                     enabled, next_run_time, retry_count, max_retries, timeout_secs,
236                     created_at, updated_at)
237                    VALUES (?, ?, ?, ?, ?, 1, ?, 0, ?, ?, ?, ?)
238                    ON CONFLICT(task_id) DO UPDATE SET
239                        task_type = excluded.task_type,
240                        schedule_type = excluded.schedule_type,
241                        schedule_data = excluded.schedule_data,
242                        payload = excluded.payload,
243                        enabled = 1,
244                        next_run_time = excluded.next_run_time,
245                        retry_count = 0,
246                        max_retries = excluded.max_retries,
247                        timeout_secs = excluded.timeout_secs,
248                        updated_at = excluded.updated_at
249                    "#,
250                    params![
251                        task_id,
252                        task_type,
253                        schedule_type,
254                        schedule_data,
255                        payload,
256                        next_run_time.map(|t| t.timestamp()),
257                        max_retries,
258                        timeout_secs,
259                        now.timestamp(),
260                        now.timestamp(),
261                    ],
262                )?;
263            }
264            SchedulerEvent::TaskExecuted {
265                task_id,
266                execution_id,
267                triggered_event_id,
268                started_at,
269                completed_at,
270                success,
271                error,
272            } => {
273                // Get the current task to determine next run time
274                let task = self
275                    .get_task(task_id)?
276                    .ok_or_else(|| SchedulerError::TaskNotFound(task_id.clone()))?;
277
278                let retry_attempt = task.retry_count;
279
280                // Record execution
281                self.conn.execute(
282                    r#"
283                    INSERT INTO task_executions
284                    (execution_id, task_id, triggered_event_id, started_at,
285                     completed_at, success, error, retry_attempt)
286                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
287                    "#,
288                    params![
289                        execution_id,
290                        task_id,
291                        triggered_event_id,
292                        started_at,
293                        completed_at,
294                        if *success { 1 } else { 0 },
295                        error,
296                        retry_attempt,
297                    ],
298                )?;
299
300                // Update task state
301                if *success {
302                    // Calculate next run time for recurring tasks
303                    let next_run_time = if task.schedule.is_recurring() {
304                        let last_run =
305                            DateTime::from_timestamp(*started_at, 0).ok_or_else(|| {
306                                SchedulerError::InvalidSchedule("Invalid timestamp".into())
307                            })?;
308                        task.schedule.next_run_time(last_run)?
309                    } else {
310                        None
311                    };
312
313                    self.conn.execute(
314                        r#"
315                        UPDATE scheduled_tasks
316                        SET last_run_time = ?,
317                            last_execution_id = ?,
318                            next_run_time = ?,
319                            retry_count = 0,
320                            updated_at = ?
321                        WHERE task_id = ?
322                        "#,
323                        params![
324                            started_at,
325                            execution_id,
326                            next_run_time.map(|t| t.timestamp()),
327                            Utc::now().timestamp(),
328                            task_id,
329                        ],
330                    )?;
331                } else {
332                    // Increment retry count or disable if max retries reached
333                    let new_retry_count = task.retry_count + 1;
334                    let enabled = if new_retry_count >= task.max_retries {
335                        0
336                    } else {
337                        1
338                    };
339
340                    // Calculate next retry time (exponential backoff)
341                    let next_run_time = if enabled == 1 {
342                        let backoff_seconds = 2_i64.pow(new_retry_count) * 60; // 1min, 2min, 4min, etc.
343                        Some(Utc::now() + chrono::Duration::seconds(backoff_seconds))
344                    } else {
345                        None
346                    };
347
348                    self.conn.execute(
349                        r#"
350                        UPDATE scheduled_tasks
351                        SET retry_count = ?,
352                            enabled = ?,
353                            next_run_time = ?,
354                            updated_at = ?
355                        WHERE task_id = ?
356                        "#,
357                        params![
358                            new_retry_count,
359                            enabled,
360                            next_run_time.map(|t| t.timestamp()),
361                            Utc::now().timestamp(),
362                            task_id,
363                        ],
364                    )?;
365                }
366            }
367            SchedulerEvent::TaskCancelled { task_id, .. } => {
368                self.conn.execute(
369                    "DELETE FROM scheduled_tasks WHERE task_id = ?",
370                    params![task_id],
371                )?;
372            }
373        }
374
375        Ok(())
376    }
377}
378
379/// A task that is due to run.
380#[derive(Debug, Clone)]
381pub struct DueTask {
382    /// Task identifier.
383    pub task_id: String,
384    /// Task type.
385    pub task_type: String,
386    /// Task payload.
387    pub payload: Vec<u8>,
388    /// Current retry count.
389    pub retry_count: u32,
390    /// Maximum retries.
391    pub max_retries: u32,
392    /// Timeout in seconds.
393    pub timeout_secs: u64,
394}
395
396/// A scheduled task.
397#[derive(Debug, Clone)]
398pub struct ScheduledTask {
399    /// Task identifier.
400    pub task_id: String,
401    /// Task type.
402    pub task_type: String,
403    /// Schedule configuration.
404    pub schedule: Schedule,
405    /// Task payload.
406    pub payload: Vec<u8>,
407    /// Whether the task is enabled.
408    pub enabled: bool,
409    /// Next scheduled run time.
410    pub next_run_time: Option<DateTime<Utc>>,
411    /// Last run time.
412    pub last_run_time: Option<DateTime<Utc>>,
413    /// Last execution ID.
414    pub last_execution_id: Option<String>,
415    /// Current retry count.
416    pub retry_count: u32,
417    /// Maximum retries.
418    pub max_retries: u32,
419    /// Timeout in seconds.
420    pub timeout_secs: u64,
421    /// When the task was created.
422    pub created_at: DateTime<Utc>,
423    /// When the task was last updated.
424    pub updated_at: DateTime<Utc>,
425}
426
427/// Filter for listing tasks.
428#[derive(Debug, Clone, Default)]
429pub struct TaskFilter {
430    /// Filter by enabled status.
431    pub enabled: Option<bool>,
432    /// Filter by task type.
433    pub task_type: Option<String>,
434}
435
436impl TaskFilter {
437    /// Create a new empty filter.
438    pub fn new() -> Self {
439        Self::default()
440    }
441
442    /// Filter by enabled status.
443    pub fn enabled(mut self, enabled: bool) -> Self {
444        self.enabled = Some(enabled);
445        self
446    }
447
448    /// Filter by task type.
449    pub fn task_type(mut self, task_type: impl Into<String>) -> Self {
450        self.task_type = Some(task_type.into());
451        self
452    }
453}