Skip to main content

pylon_runtime/
workflow_store.rs

1//! SQLite-backed persistent storage for workflow instances.
2//!
3//! Persists workflow state (including step history) to a SQLite database so
4//! active workflows survive server restarts. The in-memory engine remains the
5//! source of truth at runtime; this store is written to on state changes and
6//! read from at startup to restore unfinished work.
7
8use rusqlite::Connection;
9use std::sync::Mutex;
10
11use crate::workflows::{StepResult, StepStatus, WorkflowInstance, WorkflowStatus};
12
13/// SQLite-backed persistent storage for workflow instances.
14pub struct WorkflowStore {
15    conn: Mutex<Connection>,
16}
17
18impl WorkflowStore {
19    /// Open or create the workflow store database at `path`.
20    pub fn open(path: &str) -> Result<Self, String> {
21        let conn =
22            Connection::open(path).map_err(|e| format!("Failed to open workflow store: {e}"))?;
23        let store = Self {
24            conn: Mutex::new(conn),
25        };
26        store.init_schema()?;
27        Ok(store)
28    }
29
30    /// Create an in-memory store (useful for tests).
31    pub fn in_memory() -> Result<Self, String> {
32        let conn = Connection::open_in_memory()
33            .map_err(|e| format!("Failed to open in-memory store: {e}"))?;
34        let store = Self {
35            conn: Mutex::new(conn),
36        };
37        store.init_schema()?;
38        Ok(store)
39    }
40
41    fn init_schema(&self) -> Result<(), String> {
42        let conn = self.conn.lock().unwrap();
43        conn.execute_batch(
44            "
45            PRAGMA journal_mode=WAL;
46            CREATE TABLE IF NOT EXISTS workflows (
47                id TEXT PRIMARY KEY NOT NULL,
48                name TEXT NOT NULL,
49                input TEXT NOT NULL,
50                status TEXT NOT NULL DEFAULT 'Pending',
51                output TEXT,
52                error TEXT,
53                created_at TEXT NOT NULL,
54                started_at TEXT,
55                completed_at TEXT,
56                wake_at INTEGER,
57                waiting_for TEXT,
58                current_step INTEGER NOT NULL DEFAULT 0,
59                max_retries INTEGER NOT NULL DEFAULT 3
60            );
61            CREATE INDEX IF NOT EXISTS idx_wf_status ON workflows(status);
62
63            CREATE TABLE IF NOT EXISTS workflow_steps (
64                workflow_id TEXT NOT NULL,
65                step_index INTEGER NOT NULL,
66                step_id TEXT NOT NULL,
67                name TEXT NOT NULL,
68                status TEXT NOT NULL,
69                output TEXT,
70                error TEXT,
71                started_at TEXT,
72                completed_at TEXT,
73                duration_ms INTEGER,
74                retry_count INTEGER NOT NULL DEFAULT 0,
75                PRIMARY KEY (workflow_id, step_index),
76                FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE
77            );
78        ",
79        )
80        .map_err(|e| format!("Schema init failed: {e}"))
81    }
82
83    /// Save a workflow instance (insert or update), including all steps.
84    pub fn save(&self, wf: &WorkflowInstance) -> Result<(), String> {
85        let conn = self.conn.lock().unwrap();
86
87        conn.execute(
88            "INSERT OR REPLACE INTO workflows \
89             (id, name, input, status, output, error, created_at, started_at, \
90              completed_at, wake_at, waiting_for, current_step, max_retries) \
91             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
92            rusqlite::params![
93                wf.id,
94                wf.name,
95                wf.input.to_string(),
96                wf_status_to_str(&wf.status),
97                wf.output.as_ref().map(|v| v.to_string()),
98                wf.error,
99                wf.created_at,
100                wf.started_at,
101                wf.completed_at,
102                wf.wake_at.map(|v| v as i64),
103                wf.waiting_for,
104                wf.current_step as i64,
105                wf.max_retries,
106            ],
107        )
108        .map_err(|e| format!("Save workflow failed: {e}"))?;
109
110        // Replace all steps: delete then re-insert.
111        conn.execute(
112            "DELETE FROM workflow_steps WHERE workflow_id = ?1",
113            rusqlite::params![wf.id],
114        )
115        .map_err(|e| format!("Delete steps failed: {e}"))?;
116
117        let mut stmt = conn
118            .prepare(
119                "INSERT INTO workflow_steps \
120                 (workflow_id, step_index, step_id, name, status, output, error, \
121                  started_at, completed_at, duration_ms, retry_count) \
122                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
123            )
124            .map_err(|e| format!("Prepare step insert failed: {e}"))?;
125
126        for (i, step) in wf.steps.iter().enumerate() {
127            stmt.execute(rusqlite::params![
128                wf.id,
129                i as i64,
130                step.step_id,
131                step.name,
132                step_status_to_str(&step.status),
133                step.output.as_ref().map(|v| v.to_string()),
134                step.error,
135                step.started_at,
136                step.completed_at,
137                step.duration_ms.map(|v| v as i64),
138                step.retry_count,
139            ])
140            .map_err(|e| format!("Insert step failed: {e}"))?;
141        }
142
143        Ok(())
144    }
145
146    /// Load a workflow instance by ID, including its steps.
147    pub fn load(&self, id: &str) -> Result<Option<WorkflowInstance>, String> {
148        let conn = self.conn.lock().unwrap();
149        let mut stmt = conn
150            .prepare(
151                "SELECT id, name, input, status, output, error, created_at, \
152                 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
153                 FROM workflows WHERE id = ?1",
154            )
155            .map_err(|e| format!("Prepare failed: {e}"))?;
156
157        let wf = stmt
158            .query_row(rusqlite::params![id], |row| Ok(row_to_workflow(row)))
159            .ok();
160
161        match wf {
162            Some(mut wf) => {
163                wf.steps = load_steps(&conn, &wf.id)?;
164                Ok(Some(wf))
165            }
166            None => Ok(None),
167        }
168    }
169
170    /// Load all active workflows (Pending, Running, WaitingForEvent).
171    pub fn load_active(&self) -> Result<Vec<WorkflowInstance>, String> {
172        let conn = self.conn.lock().unwrap();
173        let mut stmt = conn
174            .prepare(
175                "SELECT id, name, input, status, output, error, created_at, \
176                 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
177                 FROM workflows \
178                 WHERE status IN ('Pending', 'Running', 'WaitingForEvent') \
179                 ORDER BY created_at ASC",
180            )
181            .map_err(|e| format!("Prepare failed: {e}"))?;
182
183        let rows = stmt
184            .query_map([], |row| Ok(row_to_workflow(row)))
185            .map_err(|e| format!("Query failed: {e}"))?;
186
187        let mut workflows = Vec::new();
188        for row in rows {
189            if let Ok(mut wf) = row {
190                wf.steps = load_steps(&conn, &wf.id)?;
191                workflows.push(wf);
192            }
193        }
194        Ok(workflows)
195    }
196
197    /// Load sleeping workflows.
198    pub fn load_sleeping(&self) -> Result<Vec<WorkflowInstance>, String> {
199        let conn = self.conn.lock().unwrap();
200        let mut stmt = conn
201            .prepare(
202                "SELECT id, name, input, status, output, error, created_at, \
203                 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
204                 FROM workflows \
205                 WHERE status = 'Sleeping' \
206                 ORDER BY wake_at ASC",
207            )
208            .map_err(|e| format!("Prepare failed: {e}"))?;
209
210        let rows = stmt
211            .query_map([], |row| Ok(row_to_workflow(row)))
212            .map_err(|e| format!("Query failed: {e}"))?;
213
214        let mut workflows = Vec::new();
215        for row in rows {
216            if let Ok(mut wf) = row {
217                wf.steps = load_steps(&conn, &wf.id)?;
218                workflows.push(wf);
219            }
220        }
221        Ok(workflows)
222    }
223
224    /// Count workflow instances by status.
225    pub fn count_by_status(&self, status: &str) -> usize {
226        let conn = self.conn.lock().unwrap();
227        conn.query_row(
228            "SELECT COUNT(*) FROM workflows WHERE status = ?1",
229            rusqlite::params![status],
230            |row| row.get::<_, i64>(0),
231        )
232        .unwrap_or(0) as usize
233    }
234}
235
236// ---------------------------------------------------------------------------
237// Row mapping helpers
238// ---------------------------------------------------------------------------
239
240fn row_to_workflow(row: &rusqlite::Row<'_>) -> WorkflowInstance {
241    WorkflowInstance {
242        id: row.get(0).unwrap_or_default(),
243        name: row.get(1).unwrap_or_default(),
244        input: serde_json::from_str(&row.get::<_, String>(2).unwrap_or_default())
245            .unwrap_or(serde_json::json!({})),
246        status: str_to_wf_status(&row.get::<_, String>(3).unwrap_or_default()),
247        output: row
248            .get::<_, String>(4)
249            .ok()
250            .and_then(|s| serde_json::from_str(&s).ok()),
251        error: row.get(5).ok(),
252        created_at: row.get(6).unwrap_or_default(),
253        started_at: row.get(7).ok(),
254        completed_at: row.get(8).ok(),
255        wake_at: row.get::<_, i64>(9).ok().map(|v| v as u64),
256        waiting_for: row.get(10).ok(),
257        current_step: row.get::<_, i64>(11).unwrap_or(0) as usize,
258        max_retries: row.get(12).unwrap_or(3),
259        steps: Vec::new(), // filled in by caller
260    }
261}
262
263fn load_steps(conn: &Connection, workflow_id: &str) -> Result<Vec<StepResult>, String> {
264    let mut stmt = conn
265        .prepare(
266            "SELECT step_id, name, status, output, error, started_at, \
267             completed_at, duration_ms, retry_count \
268             FROM workflow_steps \
269             WHERE workflow_id = ?1 \
270             ORDER BY step_index ASC",
271        )
272        .map_err(|e| format!("Prepare steps failed: {e}"))?;
273
274    let rows = stmt
275        .query_map(rusqlite::params![workflow_id], |row| {
276            Ok(StepResult {
277                step_id: row.get(0).unwrap_or_default(),
278                name: row.get(1).unwrap_or_default(),
279                status: str_to_step_status(&row.get::<_, String>(2).unwrap_or_default()),
280                output: row
281                    .get::<_, String>(3)
282                    .ok()
283                    .and_then(|s| serde_json::from_str(&s).ok()),
284                error: row.get(4).ok(),
285                started_at: row.get(5).ok(),
286                completed_at: row.get(6).ok(),
287                duration_ms: row.get::<_, i64>(7).ok().map(|v| v as u64),
288                retry_count: row.get(8).unwrap_or(0),
289            })
290        })
291        .map_err(|e| format!("Query steps failed: {e}"))?;
292
293    let mut steps = Vec::new();
294    for row in rows {
295        if let Ok(step) = row {
296            steps.push(step);
297        }
298    }
299    Ok(steps)
300}
301
302fn wf_status_to_str(s: &WorkflowStatus) -> &'static str {
303    match s {
304        WorkflowStatus::Pending => "Pending",
305        WorkflowStatus::Running => "Running",
306        WorkflowStatus::Sleeping => "Sleeping",
307        WorkflowStatus::WaitingForEvent => "WaitingForEvent",
308        WorkflowStatus::Completed => "Completed",
309        WorkflowStatus::Failed => "Failed",
310        WorkflowStatus::Cancelled => "Cancelled",
311    }
312}
313
314fn str_to_wf_status(s: &str) -> WorkflowStatus {
315    match s {
316        "Pending" => WorkflowStatus::Pending,
317        "Running" => WorkflowStatus::Running,
318        "Sleeping" => WorkflowStatus::Sleeping,
319        "WaitingForEvent" => WorkflowStatus::WaitingForEvent,
320        "Completed" => WorkflowStatus::Completed,
321        "Failed" => WorkflowStatus::Failed,
322        "Cancelled" => WorkflowStatus::Cancelled,
323        _ => WorkflowStatus::Pending,
324    }
325}
326
327fn step_status_to_str(s: &StepStatus) -> &'static str {
328    match s {
329        StepStatus::Pending => "Pending",
330        StepStatus::Running => "Running",
331        StepStatus::Completed => "Completed",
332        StepStatus::Failed => "Failed",
333        StepStatus::Skipped => "Skipped",
334    }
335}
336
337fn str_to_step_status(s: &str) -> StepStatus {
338    match s {
339        "Pending" => StepStatus::Pending,
340        "Running" => StepStatus::Running,
341        "Completed" => StepStatus::Completed,
342        "Failed" => StepStatus::Failed,
343        "Skipped" => StepStatus::Skipped,
344        _ => StepStatus::Pending,
345    }
346}
347
348// ---------------------------------------------------------------------------
349// Tests
350// ---------------------------------------------------------------------------
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    fn make_workflow(id: &str, status: WorkflowStatus) -> WorkflowInstance {
357        WorkflowInstance {
358            id: id.to_string(),
359            name: "onboarding".to_string(),
360            input: serde_json::json!({"user": "alice"}),
361            status,
362            steps: Vec::new(),
363            output: None,
364            error: None,
365            created_at: "1000Z".to_string(),
366            started_at: None,
367            completed_at: None,
368            wake_at: None,
369            waiting_for: None,
370            current_step: 0,
371            max_retries: 3,
372        }
373    }
374
375    fn make_step(name: &str, status: StepStatus) -> StepResult {
376        StepResult {
377            step_id: format!("step_{name}"),
378            name: name.to_string(),
379            status,
380            output: Some(serde_json::json!({"result": name})),
381            error: None,
382            started_at: Some("1000Z".into()),
383            completed_at: Some("1001Z".into()),
384            duration_ms: Some(42),
385            retry_count: 0,
386        }
387    }
388
389    #[test]
390    fn in_memory_opens_without_error() {
391        let store = WorkflowStore::in_memory().unwrap();
392        assert_eq!(store.count_by_status("Pending"), 0);
393    }
394
395    #[test]
396    fn save_and_load_roundtrip_without_steps() {
397        let store = WorkflowStore::in_memory().unwrap();
398
399        let mut wf = make_workflow("wf_1", WorkflowStatus::Running);
400        wf.started_at = Some("1500Z".into());
401        wf.current_step = 2;
402        wf.max_retries = 5;
403
404        store.save(&wf).unwrap();
405
406        let loaded = store.load("wf_1").unwrap().unwrap();
407        assert_eq!(loaded.id, "wf_1");
408        assert_eq!(loaded.name, "onboarding");
409        assert_eq!(loaded.input, serde_json::json!({"user": "alice"}));
410        assert_eq!(loaded.status, WorkflowStatus::Running);
411        assert_eq!(loaded.current_step, 2);
412        assert_eq!(loaded.max_retries, 5);
413        assert_eq!(loaded.started_at, Some("1500Z".into()));
414        assert!(loaded.steps.is_empty());
415    }
416
417    #[test]
418    fn save_and_load_roundtrip_with_steps() {
419        let store = WorkflowStore::in_memory().unwrap();
420
421        let mut wf = make_workflow("wf_2", WorkflowStatus::Running);
422        wf.steps = vec![
423            make_step("create_account", StepStatus::Completed),
424            make_step("send_email", StepStatus::Failed),
425        ];
426        wf.steps[1].error = Some("SMTP timeout".into());
427        wf.steps[1].retry_count = 2;
428        wf.current_step = 1;
429
430        store.save(&wf).unwrap();
431
432        let loaded = store.load("wf_2").unwrap().unwrap();
433        assert_eq!(loaded.steps.len(), 2);
434
435        assert_eq!(loaded.steps[0].name, "create_account");
436        assert_eq!(loaded.steps[0].status, StepStatus::Completed);
437        assert_eq!(
438            loaded.steps[0].output,
439            Some(serde_json::json!({"result": "create_account"}))
440        );
441        assert_eq!(loaded.steps[0].duration_ms, Some(42));
442
443        assert_eq!(loaded.steps[1].name, "send_email");
444        assert_eq!(loaded.steps[1].status, StepStatus::Failed);
445        assert_eq!(loaded.steps[1].error, Some("SMTP timeout".into()));
446        assert_eq!(loaded.steps[1].retry_count, 2);
447    }
448
449    #[test]
450    fn save_updates_existing_workflow() {
451        let store = WorkflowStore::in_memory().unwrap();
452
453        let mut wf = make_workflow("wf_3", WorkflowStatus::Pending);
454        store.save(&wf).unwrap();
455
456        wf.status = WorkflowStatus::Running;
457        wf.started_at = Some("2000Z".into());
458        wf.steps.push(make_step("step_a", StepStatus::Completed));
459        store.save(&wf).unwrap();
460
461        let loaded = store.load("wf_3").unwrap().unwrap();
462        assert_eq!(loaded.status, WorkflowStatus::Running);
463        assert_eq!(loaded.steps.len(), 1);
464    }
465
466    #[test]
467    fn load_nonexistent_returns_none() {
468        let store = WorkflowStore::in_memory().unwrap();
469        assert!(store.load("nonexistent").unwrap().is_none());
470    }
471
472    #[test]
473    fn load_active_returns_pending_running_waiting() {
474        let store = WorkflowStore::in_memory().unwrap();
475
476        store
477            .save(&make_workflow("wf_pending", WorkflowStatus::Pending))
478            .unwrap();
479        store
480            .save(&make_workflow("wf_running", WorkflowStatus::Running))
481            .unwrap();
482        store
483            .save(&make_workflow(
484                "wf_waiting",
485                WorkflowStatus::WaitingForEvent,
486            ))
487            .unwrap();
488        store
489            .save(&make_workflow("wf_sleeping", WorkflowStatus::Sleeping))
490            .unwrap();
491        store
492            .save(&make_workflow("wf_completed", WorkflowStatus::Completed))
493            .unwrap();
494        store
495            .save(&make_workflow("wf_failed", WorkflowStatus::Failed))
496            .unwrap();
497        store
498            .save(&make_workflow("wf_cancelled", WorkflowStatus::Cancelled))
499            .unwrap();
500
501        let active = store.load_active().unwrap();
502        assert_eq!(active.len(), 3);
503        let ids: Vec<&str> = active.iter().map(|w| w.id.as_str()).collect();
504        assert!(ids.contains(&"wf_pending"));
505        assert!(ids.contains(&"wf_running"));
506        assert!(ids.contains(&"wf_waiting"));
507    }
508
509    #[test]
510    fn load_sleeping_returns_only_sleeping() {
511        let store = WorkflowStore::in_memory().unwrap();
512
513        let mut sleeping = make_workflow("wf_sleep", WorkflowStatus::Sleeping);
514        sleeping.wake_at = Some(99999);
515        store.save(&sleeping).unwrap();
516
517        store
518            .save(&make_workflow("wf_run", WorkflowStatus::Running))
519            .unwrap();
520
521        let result = store.load_sleeping().unwrap();
522        assert_eq!(result.len(), 1);
523        assert_eq!(result[0].id, "wf_sleep");
524        assert_eq!(result[0].wake_at, Some(99999));
525    }
526
527    #[test]
528    fn count_by_status_counts_correctly() {
529        let store = WorkflowStore::in_memory().unwrap();
530
531        store
532            .save(&make_workflow("w1", WorkflowStatus::Pending))
533            .unwrap();
534        store
535            .save(&make_workflow("w2", WorkflowStatus::Pending))
536            .unwrap();
537        store
538            .save(&make_workflow("w3", WorkflowStatus::Running))
539            .unwrap();
540        store
541            .save(&make_workflow("w4", WorkflowStatus::Completed))
542            .unwrap();
543
544        assert_eq!(store.count_by_status("Pending"), 2);
545        assert_eq!(store.count_by_status("Running"), 1);
546        assert_eq!(store.count_by_status("Completed"), 1);
547        assert_eq!(store.count_by_status("Failed"), 0);
548    }
549
550    #[test]
551    fn sleeping_workflow_with_output_roundtrips() {
552        let store = WorkflowStore::in_memory().unwrap();
553
554        let mut wf = make_workflow("wf_out", WorkflowStatus::Completed);
555        wf.output = Some(serde_json::json!({"final": "result"}));
556        wf.error = Some("partial failure".into());
557        wf.completed_at = Some("5000Z".into());
558        wf.waiting_for = Some("user_confirmed".into());
559
560        store.save(&wf).unwrap();
561
562        let loaded = store.load("wf_out").unwrap().unwrap();
563        assert_eq!(loaded.output, Some(serde_json::json!({"final": "result"})));
564        assert_eq!(loaded.error, Some("partial failure".into()));
565        assert_eq!(loaded.completed_at, Some("5000Z".into()));
566        assert_eq!(loaded.waiting_for, Some("user_confirmed".into()));
567    }
568
569    #[test]
570    fn all_statuses_roundtrip() {
571        let store = WorkflowStore::in_memory().unwrap();
572        let statuses = [
573            WorkflowStatus::Pending,
574            WorkflowStatus::Running,
575            WorkflowStatus::Sleeping,
576            WorkflowStatus::WaitingForEvent,
577            WorkflowStatus::Completed,
578            WorkflowStatus::Failed,
579            WorkflowStatus::Cancelled,
580        ];
581        for (i, status) in statuses.iter().enumerate() {
582            let wf = make_workflow(&format!("wf_{i}"), status.clone());
583            store.save(&wf).unwrap();
584            let loaded = store.load(&format!("wf_{i}")).unwrap().unwrap();
585            assert_eq!(loaded.status, *status);
586        }
587    }
588
589    #[test]
590    fn all_step_statuses_roundtrip() {
591        let store = WorkflowStore::in_memory().unwrap();
592        let step_statuses = [
593            StepStatus::Pending,
594            StepStatus::Running,
595            StepStatus::Completed,
596            StepStatus::Failed,
597            StepStatus::Skipped,
598        ];
599
600        let mut wf = make_workflow("wf_steps", WorkflowStatus::Running);
601        for (i, status) in step_statuses.iter().enumerate() {
602            wf.steps.push(make_step(&format!("s{i}"), status.clone()));
603        }
604        store.save(&wf).unwrap();
605
606        let loaded = store.load("wf_steps").unwrap().unwrap();
607        for (i, status) in step_statuses.iter().enumerate() {
608            assert_eq!(loaded.steps[i].status, *status);
609        }
610    }
611}