1use crate::error::{Result, SchedulerError};
4use crate::events::SchedulerEvent;
5use crate::schedule::Schedule;
6use chrono::{DateTime, Utc};
7use rusqlite::{params, Connection, OptionalExtension};
8use std::sync::Arc;
9
10pub struct ScheduleProjection {
12 conn: Arc<Connection>,
13}
14
15impl ScheduleProjection {
16 pub fn new(conn: Arc<Connection>) -> Self {
18 Self { conn }
19 }
20
21 pub fn init_schema(&self) -> Result<()> {
23 self.conn.execute_batch(
24 r#"
25 CREATE TABLE IF NOT EXISTS scheduled_tasks (
26 task_id TEXT PRIMARY KEY,
27 task_type TEXT NOT NULL,
28 schedule_type TEXT NOT NULL,
29 schedule_data TEXT NOT NULL,
30 payload BLOB NOT NULL,
31 enabled INTEGER NOT NULL DEFAULT 1,
32 next_run_time INTEGER NOT NULL,
33 last_run_time INTEGER,
34 last_execution_id TEXT,
35 retry_count INTEGER NOT NULL DEFAULT 0,
36 max_retries INTEGER NOT NULL DEFAULT 3,
37 timeout_secs INTEGER NOT NULL DEFAULT 300,
38 created_at INTEGER NOT NULL,
39 updated_at INTEGER NOT NULL
40 );
41
42 CREATE INDEX IF NOT EXISTS idx_next_run_time
43 ON scheduled_tasks(next_run_time, enabled);
44
45 CREATE TABLE IF NOT EXISTS task_executions (
46 execution_id TEXT PRIMARY KEY,
47 task_id TEXT NOT NULL,
48 triggered_event_id INTEGER NOT NULL,
49 started_at INTEGER NOT NULL,
50 completed_at INTEGER,
51 success INTEGER,
52 error TEXT,
53 retry_attempt INTEGER NOT NULL DEFAULT 0,
54 FOREIGN KEY (task_id) REFERENCES scheduled_tasks(task_id)
55 );
56
57 CREATE INDEX IF NOT EXISTS idx_task_executions_task_id
58 ON task_executions(task_id, started_at DESC);
59 "#,
60 )?;
61 Ok(())
62 }
63
64 pub fn get_due_tasks(&self, now: i64) -> Result<Vec<DueTask>> {
66 let mut stmt = self.conn.prepare(
67 r#"
68 SELECT task_id, task_type, payload, retry_count, max_retries, timeout_secs
69 FROM scheduled_tasks
70 WHERE enabled = 1 AND next_run_time <= ?
71 ORDER BY next_run_time ASC
72 "#,
73 )?;
74
75 let tasks = stmt
76 .query_map(params![now], |row| {
77 Ok(DueTask {
78 task_id: row.get(0)?,
79 task_type: row.get(1)?,
80 payload: row.get(2)?,
81 retry_count: row.get(3)?,
82 max_retries: row.get(4)?,
83 timeout_secs: row.get(5)?,
84 })
85 })?
86 .collect::<std::result::Result<Vec<_>, _>>()?;
87
88 Ok(tasks)
89 }
90
91 pub fn get_next_wake_time(&self) -> Result<Option<DateTime<Utc>>> {
93 let next: Option<i64> = self
94 .conn
95 .query_row(
96 r#"
97 SELECT next_run_time
98 FROM scheduled_tasks
99 WHERE enabled = 1
100 ORDER BY next_run_time ASC
101 LIMIT 1
102 "#,
103 [],
104 |row| row.get(0),
105 )
106 .optional()?;
107
108 Ok(next.and_then(|ts| DateTime::from_timestamp(ts, 0)))
109 }
110
111 pub fn get_task(&self, task_id: &str) -> Result<Option<ScheduledTask>> {
113 let task: Option<ScheduledTask> = self
114 .conn
115 .query_row(
116 r#"
117 SELECT task_id, task_type, schedule_type, schedule_data, payload,
118 enabled, next_run_time, last_run_time, last_execution_id,
119 retry_count, max_retries, timeout_secs, created_at, updated_at
120 FROM scheduled_tasks
121 WHERE task_id = ?
122 "#,
123 params![task_id],
124 |row| {
125 Ok(ScheduledTask {
126 task_id: row.get(0)?,
127 task_type: row.get(1)?,
128 schedule: serde_json::from_str(&row.get::<_, String>(3)?).unwrap(),
129 payload: row.get(4)?,
130 enabled: row.get(5)?,
131 next_run_time: row
132 .get::<_, Option<i64>>(6)?
133 .and_then(|ts| DateTime::from_timestamp(ts, 0)),
134 last_run_time: row
135 .get::<_, Option<i64>>(7)?
136 .and_then(|ts| DateTime::from_timestamp(ts, 0)),
137 last_execution_id: row.get(8)?,
138 retry_count: row.get(9)?,
139 max_retries: row.get(10)?,
140 timeout_secs: row.get(11)?,
141 created_at: DateTime::from_timestamp(row.get(12)?, 0).unwrap(),
142 updated_at: DateTime::from_timestamp(row.get(13)?, 0).unwrap(),
143 })
144 },
145 )
146 .optional()?;
147
148 Ok(task)
149 }
150
151 pub fn list_tasks(&self, filter: &TaskFilter) -> Result<Vec<ScheduledTask>> {
155 let mut query = String::from(
156 r#"
157 SELECT task_id, task_type, schedule_type, schedule_data, payload,
158 enabled, next_run_time, last_run_time, last_execution_id,
159 retry_count, max_retries, timeout_secs, created_at, updated_at
160 FROM scheduled_tasks
161 WHERE 1=1
162 "#,
163 );
164
165 let mut bound_params: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
167
168 if let Some(enabled) = filter.enabled {
169 query.push_str(" AND enabled = ?");
170 bound_params.push(Box::new(if enabled { 1_i32 } else { 0_i32 }));
171 }
172
173 if let Some(task_type) = &filter.task_type {
174 query.push_str(" AND task_type = ?");
175 bound_params.push(Box::new(task_type.clone()));
176 }
177
178 query.push_str(" ORDER BY created_at DESC");
179
180 let mut stmt = self.conn.prepare(&query)?;
181 let tasks = stmt
182 .query_map(rusqlite::params_from_iter(bound_params.iter()), |row| {
183 Ok(ScheduledTask {
184 task_id: row.get(0)?,
185 task_type: row.get(1)?,
186 schedule: serde_json::from_str(&row.get::<_, String>(3)?).unwrap(),
187 payload: row.get(4)?,
188 enabled: row.get(5)?,
189 next_run_time: row
190 .get::<_, Option<i64>>(6)?
191 .and_then(|ts| DateTime::from_timestamp(ts, 0)),
192 last_run_time: row
193 .get::<_, Option<i64>>(7)?
194 .and_then(|ts| DateTime::from_timestamp(ts, 0)),
195 last_execution_id: row.get(8)?,
196 retry_count: row.get(9)?,
197 max_retries: row.get(10)?,
198 timeout_secs: row.get(11)?,
199 created_at: DateTime::from_timestamp(row.get(12)?, 0).unwrap(),
200 updated_at: DateTime::from_timestamp(row.get(13)?, 0).unwrap(),
201 })
202 })?
203 .collect::<std::result::Result<Vec<_>, _>>()?;
204
205 Ok(tasks)
206 }
207
208 pub fn apply_event(&self, event: &SchedulerEvent) -> Result<()> {
210 match event {
211 SchedulerEvent::TaskScheduled {
212 task_id,
213 task_type,
214 schedule,
215 payload,
216 max_retries,
217 timeout_secs,
218 } => {
219 let now = Utc::now();
220 let next_run_time = schedule.next_run_time(now)?;
221
222 let schedule_type = match schedule {
223 Schedule::Cron { .. } => "cron",
224 Schedule::Interval { .. } => "interval",
225 Schedule::OneTime { .. } => "one_time",
226 Schedule::Immediate => "immediate",
227 };
228
229 let schedule_data = serde_json::to_string(schedule)?;
230
231 self.conn.execute(
232 r#"
233 INSERT INTO scheduled_tasks
234 (task_id, task_type, schedule_type, schedule_data, payload,
235 enabled, next_run_time, retry_count, max_retries, timeout_secs,
236 created_at, updated_at)
237 VALUES (?, ?, ?, ?, ?, 1, ?, 0, ?, ?, ?, ?)
238 ON CONFLICT(task_id) DO UPDATE SET
239 task_type = excluded.task_type,
240 schedule_type = excluded.schedule_type,
241 schedule_data = excluded.schedule_data,
242 payload = excluded.payload,
243 enabled = 1,
244 next_run_time = excluded.next_run_time,
245 retry_count = 0,
246 max_retries = excluded.max_retries,
247 timeout_secs = excluded.timeout_secs,
248 updated_at = excluded.updated_at
249 "#,
250 params![
251 task_id,
252 task_type,
253 schedule_type,
254 schedule_data,
255 payload,
256 next_run_time.map(|t| t.timestamp()),
257 max_retries,
258 timeout_secs,
259 now.timestamp(),
260 now.timestamp(),
261 ],
262 )?;
263 }
264 SchedulerEvent::TaskExecuted {
265 task_id,
266 execution_id,
267 triggered_event_id,
268 started_at,
269 completed_at,
270 success,
271 error,
272 } => {
273 let task = self
275 .get_task(task_id)?
276 .ok_or_else(|| SchedulerError::TaskNotFound(task_id.clone()))?;
277
278 let retry_attempt = task.retry_count;
279
280 self.conn.execute(
282 r#"
283 INSERT INTO task_executions
284 (execution_id, task_id, triggered_event_id, started_at,
285 completed_at, success, error, retry_attempt)
286 VALUES (?, ?, ?, ?, ?, ?, ?, ?)
287 "#,
288 params![
289 execution_id,
290 task_id,
291 triggered_event_id,
292 started_at,
293 completed_at,
294 if *success { 1 } else { 0 },
295 error,
296 retry_attempt,
297 ],
298 )?;
299
300 if *success {
302 let next_run_time = if task.schedule.is_recurring() {
304 let last_run =
305 DateTime::from_timestamp(*started_at, 0).ok_or_else(|| {
306 SchedulerError::InvalidSchedule("Invalid timestamp".into())
307 })?;
308 task.schedule.next_run_time(last_run)?
309 } else {
310 None
311 };
312
313 self.conn.execute(
314 r#"
315 UPDATE scheduled_tasks
316 SET last_run_time = ?,
317 last_execution_id = ?,
318 next_run_time = ?,
319 retry_count = 0,
320 updated_at = ?
321 WHERE task_id = ?
322 "#,
323 params![
324 started_at,
325 execution_id,
326 next_run_time.map(|t| t.timestamp()),
327 Utc::now().timestamp(),
328 task_id,
329 ],
330 )?;
331 } else {
332 let new_retry_count = task.retry_count + 1;
334 let enabled = if new_retry_count >= task.max_retries {
335 0
336 } else {
337 1
338 };
339
340 let next_run_time = if enabled == 1 {
342 let backoff_seconds = 2_i64.pow(new_retry_count) * 60; Some(Utc::now() + chrono::Duration::seconds(backoff_seconds))
344 } else {
345 None
346 };
347
348 self.conn.execute(
349 r#"
350 UPDATE scheduled_tasks
351 SET retry_count = ?,
352 enabled = ?,
353 next_run_time = ?,
354 updated_at = ?
355 WHERE task_id = ?
356 "#,
357 params![
358 new_retry_count,
359 enabled,
360 next_run_time.map(|t| t.timestamp()),
361 Utc::now().timestamp(),
362 task_id,
363 ],
364 )?;
365 }
366 }
367 SchedulerEvent::TaskCancelled { task_id, .. } => {
368 self.conn.execute(
369 "DELETE FROM scheduled_tasks WHERE task_id = ?",
370 params![task_id],
371 )?;
372 }
373 }
374
375 Ok(())
376 }
377}
378
379#[derive(Debug, Clone)]
381pub struct DueTask {
382 pub task_id: String,
384 pub task_type: String,
386 pub payload: Vec<u8>,
388 pub retry_count: u32,
390 pub max_retries: u32,
392 pub timeout_secs: u64,
394}
395
396#[derive(Debug, Clone)]
398pub struct ScheduledTask {
399 pub task_id: String,
401 pub task_type: String,
403 pub schedule: Schedule,
405 pub payload: Vec<u8>,
407 pub enabled: bool,
409 pub next_run_time: Option<DateTime<Utc>>,
411 pub last_run_time: Option<DateTime<Utc>>,
413 pub last_execution_id: Option<String>,
415 pub retry_count: u32,
417 pub max_retries: u32,
419 pub timeout_secs: u64,
421 pub created_at: DateTime<Utc>,
423 pub updated_at: DateTime<Utc>,
425}
426
427#[derive(Debug, Clone, Default)]
429pub struct TaskFilter {
430 pub enabled: Option<bool>,
432 pub task_type: Option<String>,
434}
435
436impl TaskFilter {
437 pub fn new() -> Self {
439 Self::default()
440 }
441
442 pub fn enabled(mut self, enabled: bool) -> Self {
444 self.enabled = Some(enabled);
445 self
446 }
447
448 pub fn task_type(mut self, task_type: impl Into<String>) -> Self {
450 self.task_type = Some(task_type.into());
451 self
452 }
453}