1use rusqlite::Connection;
9use std::sync::Mutex;
10
11use crate::workflows::{StepResult, StepStatus, WorkflowInstance, WorkflowStatus};
12
13pub struct WorkflowStore {
15 conn: Mutex<Connection>,
16}
17
18impl WorkflowStore {
19 pub fn open(path: &str) -> Result<Self, String> {
21 let conn =
22 Connection::open(path).map_err(|e| format!("Failed to open workflow store: {e}"))?;
23 let store = Self {
24 conn: Mutex::new(conn),
25 };
26 store.init_schema()?;
27 Ok(store)
28 }
29
30 pub fn in_memory() -> Result<Self, String> {
32 let conn = Connection::open_in_memory()
33 .map_err(|e| format!("Failed to open in-memory store: {e}"))?;
34 let store = Self {
35 conn: Mutex::new(conn),
36 };
37 store.init_schema()?;
38 Ok(store)
39 }
40
41 fn init_schema(&self) -> Result<(), String> {
42 let conn = self.conn.lock().unwrap();
43 conn.execute_batch(
44 "
45 PRAGMA journal_mode=WAL;
46 CREATE TABLE IF NOT EXISTS workflows (
47 id TEXT PRIMARY KEY NOT NULL,
48 name TEXT NOT NULL,
49 input TEXT NOT NULL,
50 status TEXT NOT NULL DEFAULT 'Pending',
51 output TEXT,
52 error TEXT,
53 created_at TEXT NOT NULL,
54 started_at TEXT,
55 completed_at TEXT,
56 wake_at INTEGER,
57 waiting_for TEXT,
58 current_step INTEGER NOT NULL DEFAULT 0,
59 max_retries INTEGER NOT NULL DEFAULT 3
60 );
61 CREATE INDEX IF NOT EXISTS idx_wf_status ON workflows(status);
62
63 CREATE TABLE IF NOT EXISTS workflow_steps (
64 workflow_id TEXT NOT NULL,
65 step_index INTEGER NOT NULL,
66 step_id TEXT NOT NULL,
67 name TEXT NOT NULL,
68 status TEXT NOT NULL,
69 output TEXT,
70 error TEXT,
71 started_at TEXT,
72 completed_at TEXT,
73 duration_ms INTEGER,
74 retry_count INTEGER NOT NULL DEFAULT 0,
75 PRIMARY KEY (workflow_id, step_index),
76 FOREIGN KEY (workflow_id) REFERENCES workflows(id) ON DELETE CASCADE
77 );
78 ",
79 )
80 .map_err(|e| format!("Schema init failed: {e}"))
81 }
82
83 pub fn save(&self, wf: &WorkflowInstance) -> Result<(), String> {
85 let conn = self.conn.lock().unwrap();
86
87 conn.execute(
88 "INSERT OR REPLACE INTO workflows \
89 (id, name, input, status, output, error, created_at, started_at, \
90 completed_at, wake_at, waiting_for, current_step, max_retries) \
91 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
92 rusqlite::params![
93 wf.id,
94 wf.name,
95 wf.input.to_string(),
96 wf_status_to_str(&wf.status),
97 wf.output.as_ref().map(|v| v.to_string()),
98 wf.error,
99 wf.created_at,
100 wf.started_at,
101 wf.completed_at,
102 wf.wake_at.map(|v| v as i64),
103 wf.waiting_for,
104 wf.current_step as i64,
105 wf.max_retries,
106 ],
107 )
108 .map_err(|e| format!("Save workflow failed: {e}"))?;
109
110 conn.execute(
112 "DELETE FROM workflow_steps WHERE workflow_id = ?1",
113 rusqlite::params![wf.id],
114 )
115 .map_err(|e| format!("Delete steps failed: {e}"))?;
116
117 let mut stmt = conn
118 .prepare(
119 "INSERT INTO workflow_steps \
120 (workflow_id, step_index, step_id, name, status, output, error, \
121 started_at, completed_at, duration_ms, retry_count) \
122 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
123 )
124 .map_err(|e| format!("Prepare step insert failed: {e}"))?;
125
126 for (i, step) in wf.steps.iter().enumerate() {
127 stmt.execute(rusqlite::params![
128 wf.id,
129 i as i64,
130 step.step_id,
131 step.name,
132 step_status_to_str(&step.status),
133 step.output.as_ref().map(|v| v.to_string()),
134 step.error,
135 step.started_at,
136 step.completed_at,
137 step.duration_ms.map(|v| v as i64),
138 step.retry_count,
139 ])
140 .map_err(|e| format!("Insert step failed: {e}"))?;
141 }
142
143 Ok(())
144 }
145
146 pub fn load(&self, id: &str) -> Result<Option<WorkflowInstance>, String> {
148 let conn = self.conn.lock().unwrap();
149 let mut stmt = conn
150 .prepare(
151 "SELECT id, name, input, status, output, error, created_at, \
152 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
153 FROM workflows WHERE id = ?1",
154 )
155 .map_err(|e| format!("Prepare failed: {e}"))?;
156
157 let wf = stmt
158 .query_row(rusqlite::params![id], |row| Ok(row_to_workflow(row)))
159 .ok();
160
161 match wf {
162 Some(mut wf) => {
163 wf.steps = load_steps(&conn, &wf.id)?;
164 Ok(Some(wf))
165 }
166 None => Ok(None),
167 }
168 }
169
170 pub fn load_active(&self) -> Result<Vec<WorkflowInstance>, String> {
172 let conn = self.conn.lock().unwrap();
173 let mut stmt = conn
174 .prepare(
175 "SELECT id, name, input, status, output, error, created_at, \
176 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
177 FROM workflows \
178 WHERE status IN ('Pending', 'Running', 'WaitingForEvent') \
179 ORDER BY created_at ASC",
180 )
181 .map_err(|e| format!("Prepare failed: {e}"))?;
182
183 let rows = stmt
184 .query_map([], |row| Ok(row_to_workflow(row)))
185 .map_err(|e| format!("Query failed: {e}"))?;
186
187 let mut workflows = Vec::new();
188 for row in rows {
189 if let Ok(mut wf) = row {
190 wf.steps = load_steps(&conn, &wf.id)?;
191 workflows.push(wf);
192 }
193 }
194 Ok(workflows)
195 }
196
197 pub fn load_sleeping(&self) -> Result<Vec<WorkflowInstance>, String> {
199 let conn = self.conn.lock().unwrap();
200 let mut stmt = conn
201 .prepare(
202 "SELECT id, name, input, status, output, error, created_at, \
203 started_at, completed_at, wake_at, waiting_for, current_step, max_retries \
204 FROM workflows \
205 WHERE status = 'Sleeping' \
206 ORDER BY wake_at ASC",
207 )
208 .map_err(|e| format!("Prepare failed: {e}"))?;
209
210 let rows = stmt
211 .query_map([], |row| Ok(row_to_workflow(row)))
212 .map_err(|e| format!("Query failed: {e}"))?;
213
214 let mut workflows = Vec::new();
215 for row in rows {
216 if let Ok(mut wf) = row {
217 wf.steps = load_steps(&conn, &wf.id)?;
218 workflows.push(wf);
219 }
220 }
221 Ok(workflows)
222 }
223
224 pub fn count_by_status(&self, status: &str) -> usize {
226 let conn = self.conn.lock().unwrap();
227 conn.query_row(
228 "SELECT COUNT(*) FROM workflows WHERE status = ?1",
229 rusqlite::params![status],
230 |row| row.get::<_, i64>(0),
231 )
232 .unwrap_or(0) as usize
233 }
234}
235
236fn row_to_workflow(row: &rusqlite::Row<'_>) -> WorkflowInstance {
241 WorkflowInstance {
242 id: row.get(0).unwrap_or_default(),
243 name: row.get(1).unwrap_or_default(),
244 input: serde_json::from_str(&row.get::<_, String>(2).unwrap_or_default())
245 .unwrap_or(serde_json::json!({})),
246 status: str_to_wf_status(&row.get::<_, String>(3).unwrap_or_default()),
247 output: row
248 .get::<_, String>(4)
249 .ok()
250 .and_then(|s| serde_json::from_str(&s).ok()),
251 error: row.get(5).ok(),
252 created_at: row.get(6).unwrap_or_default(),
253 started_at: row.get(7).ok(),
254 completed_at: row.get(8).ok(),
255 wake_at: row.get::<_, i64>(9).ok().map(|v| v as u64),
256 waiting_for: row.get(10).ok(),
257 current_step: row.get::<_, i64>(11).unwrap_or(0) as usize,
258 max_retries: row.get(12).unwrap_or(3),
259 steps: Vec::new(), }
261}
262
263fn load_steps(conn: &Connection, workflow_id: &str) -> Result<Vec<StepResult>, String> {
264 let mut stmt = conn
265 .prepare(
266 "SELECT step_id, name, status, output, error, started_at, \
267 completed_at, duration_ms, retry_count \
268 FROM workflow_steps \
269 WHERE workflow_id = ?1 \
270 ORDER BY step_index ASC",
271 )
272 .map_err(|e| format!("Prepare steps failed: {e}"))?;
273
274 let rows = stmt
275 .query_map(rusqlite::params![workflow_id], |row| {
276 Ok(StepResult {
277 step_id: row.get(0).unwrap_or_default(),
278 name: row.get(1).unwrap_or_default(),
279 status: str_to_step_status(&row.get::<_, String>(2).unwrap_or_default()),
280 output: row
281 .get::<_, String>(3)
282 .ok()
283 .and_then(|s| serde_json::from_str(&s).ok()),
284 error: row.get(4).ok(),
285 started_at: row.get(5).ok(),
286 completed_at: row.get(6).ok(),
287 duration_ms: row.get::<_, i64>(7).ok().map(|v| v as u64),
288 retry_count: row.get(8).unwrap_or(0),
289 })
290 })
291 .map_err(|e| format!("Query steps failed: {e}"))?;
292
293 let mut steps = Vec::new();
294 for row in rows {
295 if let Ok(step) = row {
296 steps.push(step);
297 }
298 }
299 Ok(steps)
300}
301
302fn wf_status_to_str(s: &WorkflowStatus) -> &'static str {
303 match s {
304 WorkflowStatus::Pending => "Pending",
305 WorkflowStatus::Running => "Running",
306 WorkflowStatus::Sleeping => "Sleeping",
307 WorkflowStatus::WaitingForEvent => "WaitingForEvent",
308 WorkflowStatus::Completed => "Completed",
309 WorkflowStatus::Failed => "Failed",
310 WorkflowStatus::Cancelled => "Cancelled",
311 }
312}
313
314fn str_to_wf_status(s: &str) -> WorkflowStatus {
315 match s {
316 "Pending" => WorkflowStatus::Pending,
317 "Running" => WorkflowStatus::Running,
318 "Sleeping" => WorkflowStatus::Sleeping,
319 "WaitingForEvent" => WorkflowStatus::WaitingForEvent,
320 "Completed" => WorkflowStatus::Completed,
321 "Failed" => WorkflowStatus::Failed,
322 "Cancelled" => WorkflowStatus::Cancelled,
323 _ => WorkflowStatus::Pending,
324 }
325}
326
327fn step_status_to_str(s: &StepStatus) -> &'static str {
328 match s {
329 StepStatus::Pending => "Pending",
330 StepStatus::Running => "Running",
331 StepStatus::Completed => "Completed",
332 StepStatus::Failed => "Failed",
333 StepStatus::Skipped => "Skipped",
334 }
335}
336
337fn str_to_step_status(s: &str) -> StepStatus {
338 match s {
339 "Pending" => StepStatus::Pending,
340 "Running" => StepStatus::Running,
341 "Completed" => StepStatus::Completed,
342 "Failed" => StepStatus::Failed,
343 "Skipped" => StepStatus::Skipped,
344 _ => StepStatus::Pending,
345 }
346}
347
348#[cfg(test)]
353mod tests {
354 use super::*;
355
356 fn make_workflow(id: &str, status: WorkflowStatus) -> WorkflowInstance {
357 WorkflowInstance {
358 id: id.to_string(),
359 name: "onboarding".to_string(),
360 input: serde_json::json!({"user": "alice"}),
361 status,
362 steps: Vec::new(),
363 output: None,
364 error: None,
365 created_at: "1000Z".to_string(),
366 started_at: None,
367 completed_at: None,
368 wake_at: None,
369 waiting_for: None,
370 current_step: 0,
371 max_retries: 3,
372 }
373 }
374
375 fn make_step(name: &str, status: StepStatus) -> StepResult {
376 StepResult {
377 step_id: format!("step_{name}"),
378 name: name.to_string(),
379 status,
380 output: Some(serde_json::json!({"result": name})),
381 error: None,
382 started_at: Some("1000Z".into()),
383 completed_at: Some("1001Z".into()),
384 duration_ms: Some(42),
385 retry_count: 0,
386 }
387 }
388
389 #[test]
390 fn in_memory_opens_without_error() {
391 let store = WorkflowStore::in_memory().unwrap();
392 assert_eq!(store.count_by_status("Pending"), 0);
393 }
394
395 #[test]
396 fn save_and_load_roundtrip_without_steps() {
397 let store = WorkflowStore::in_memory().unwrap();
398
399 let mut wf = make_workflow("wf_1", WorkflowStatus::Running);
400 wf.started_at = Some("1500Z".into());
401 wf.current_step = 2;
402 wf.max_retries = 5;
403
404 store.save(&wf).unwrap();
405
406 let loaded = store.load("wf_1").unwrap().unwrap();
407 assert_eq!(loaded.id, "wf_1");
408 assert_eq!(loaded.name, "onboarding");
409 assert_eq!(loaded.input, serde_json::json!({"user": "alice"}));
410 assert_eq!(loaded.status, WorkflowStatus::Running);
411 assert_eq!(loaded.current_step, 2);
412 assert_eq!(loaded.max_retries, 5);
413 assert_eq!(loaded.started_at, Some("1500Z".into()));
414 assert!(loaded.steps.is_empty());
415 }
416
417 #[test]
418 fn save_and_load_roundtrip_with_steps() {
419 let store = WorkflowStore::in_memory().unwrap();
420
421 let mut wf = make_workflow("wf_2", WorkflowStatus::Running);
422 wf.steps = vec![
423 make_step("create_account", StepStatus::Completed),
424 make_step("send_email", StepStatus::Failed),
425 ];
426 wf.steps[1].error = Some("SMTP timeout".into());
427 wf.steps[1].retry_count = 2;
428 wf.current_step = 1;
429
430 store.save(&wf).unwrap();
431
432 let loaded = store.load("wf_2").unwrap().unwrap();
433 assert_eq!(loaded.steps.len(), 2);
434
435 assert_eq!(loaded.steps[0].name, "create_account");
436 assert_eq!(loaded.steps[0].status, StepStatus::Completed);
437 assert_eq!(
438 loaded.steps[0].output,
439 Some(serde_json::json!({"result": "create_account"}))
440 );
441 assert_eq!(loaded.steps[0].duration_ms, Some(42));
442
443 assert_eq!(loaded.steps[1].name, "send_email");
444 assert_eq!(loaded.steps[1].status, StepStatus::Failed);
445 assert_eq!(loaded.steps[1].error, Some("SMTP timeout".into()));
446 assert_eq!(loaded.steps[1].retry_count, 2);
447 }
448
449 #[test]
450 fn save_updates_existing_workflow() {
451 let store = WorkflowStore::in_memory().unwrap();
452
453 let mut wf = make_workflow("wf_3", WorkflowStatus::Pending);
454 store.save(&wf).unwrap();
455
456 wf.status = WorkflowStatus::Running;
457 wf.started_at = Some("2000Z".into());
458 wf.steps.push(make_step("step_a", StepStatus::Completed));
459 store.save(&wf).unwrap();
460
461 let loaded = store.load("wf_3").unwrap().unwrap();
462 assert_eq!(loaded.status, WorkflowStatus::Running);
463 assert_eq!(loaded.steps.len(), 1);
464 }
465
466 #[test]
467 fn load_nonexistent_returns_none() {
468 let store = WorkflowStore::in_memory().unwrap();
469 assert!(store.load("nonexistent").unwrap().is_none());
470 }
471
472 #[test]
473 fn load_active_returns_pending_running_waiting() {
474 let store = WorkflowStore::in_memory().unwrap();
475
476 store
477 .save(&make_workflow("wf_pending", WorkflowStatus::Pending))
478 .unwrap();
479 store
480 .save(&make_workflow("wf_running", WorkflowStatus::Running))
481 .unwrap();
482 store
483 .save(&make_workflow(
484 "wf_waiting",
485 WorkflowStatus::WaitingForEvent,
486 ))
487 .unwrap();
488 store
489 .save(&make_workflow("wf_sleeping", WorkflowStatus::Sleeping))
490 .unwrap();
491 store
492 .save(&make_workflow("wf_completed", WorkflowStatus::Completed))
493 .unwrap();
494 store
495 .save(&make_workflow("wf_failed", WorkflowStatus::Failed))
496 .unwrap();
497 store
498 .save(&make_workflow("wf_cancelled", WorkflowStatus::Cancelled))
499 .unwrap();
500
501 let active = store.load_active().unwrap();
502 assert_eq!(active.len(), 3);
503 let ids: Vec<&str> = active.iter().map(|w| w.id.as_str()).collect();
504 assert!(ids.contains(&"wf_pending"));
505 assert!(ids.contains(&"wf_running"));
506 assert!(ids.contains(&"wf_waiting"));
507 }
508
509 #[test]
510 fn load_sleeping_returns_only_sleeping() {
511 let store = WorkflowStore::in_memory().unwrap();
512
513 let mut sleeping = make_workflow("wf_sleep", WorkflowStatus::Sleeping);
514 sleeping.wake_at = Some(99999);
515 store.save(&sleeping).unwrap();
516
517 store
518 .save(&make_workflow("wf_run", WorkflowStatus::Running))
519 .unwrap();
520
521 let result = store.load_sleeping().unwrap();
522 assert_eq!(result.len(), 1);
523 assert_eq!(result[0].id, "wf_sleep");
524 assert_eq!(result[0].wake_at, Some(99999));
525 }
526
527 #[test]
528 fn count_by_status_counts_correctly() {
529 let store = WorkflowStore::in_memory().unwrap();
530
531 store
532 .save(&make_workflow("w1", WorkflowStatus::Pending))
533 .unwrap();
534 store
535 .save(&make_workflow("w2", WorkflowStatus::Pending))
536 .unwrap();
537 store
538 .save(&make_workflow("w3", WorkflowStatus::Running))
539 .unwrap();
540 store
541 .save(&make_workflow("w4", WorkflowStatus::Completed))
542 .unwrap();
543
544 assert_eq!(store.count_by_status("Pending"), 2);
545 assert_eq!(store.count_by_status("Running"), 1);
546 assert_eq!(store.count_by_status("Completed"), 1);
547 assert_eq!(store.count_by_status("Failed"), 0);
548 }
549
550 #[test]
551 fn sleeping_workflow_with_output_roundtrips() {
552 let store = WorkflowStore::in_memory().unwrap();
553
554 let mut wf = make_workflow("wf_out", WorkflowStatus::Completed);
555 wf.output = Some(serde_json::json!({"final": "result"}));
556 wf.error = Some("partial failure".into());
557 wf.completed_at = Some("5000Z".into());
558 wf.waiting_for = Some("user_confirmed".into());
559
560 store.save(&wf).unwrap();
561
562 let loaded = store.load("wf_out").unwrap().unwrap();
563 assert_eq!(loaded.output, Some(serde_json::json!({"final": "result"})));
564 assert_eq!(loaded.error, Some("partial failure".into()));
565 assert_eq!(loaded.completed_at, Some("5000Z".into()));
566 assert_eq!(loaded.waiting_for, Some("user_confirmed".into()));
567 }
568
569 #[test]
570 fn all_statuses_roundtrip() {
571 let store = WorkflowStore::in_memory().unwrap();
572 let statuses = [
573 WorkflowStatus::Pending,
574 WorkflowStatus::Running,
575 WorkflowStatus::Sleeping,
576 WorkflowStatus::WaitingForEvent,
577 WorkflowStatus::Completed,
578 WorkflowStatus::Failed,
579 WorkflowStatus::Cancelled,
580 ];
581 for (i, status) in statuses.iter().enumerate() {
582 let wf = make_workflow(&format!("wf_{i}"), status.clone());
583 store.save(&wf).unwrap();
584 let loaded = store.load(&format!("wf_{i}")).unwrap().unwrap();
585 assert_eq!(loaded.status, *status);
586 }
587 }
588
589 #[test]
590 fn all_step_statuses_roundtrip() {
591 let store = WorkflowStore::in_memory().unwrap();
592 let step_statuses = [
593 StepStatus::Pending,
594 StepStatus::Running,
595 StepStatus::Completed,
596 StepStatus::Failed,
597 StepStatus::Skipped,
598 ];
599
600 let mut wf = make_workflow("wf_steps", WorkflowStatus::Running);
601 for (i, status) in step_statuses.iter().enumerate() {
602 wf.steps.push(make_step(&format!("s{i}"), status.clone()));
603 }
604 store.save(&wf).unwrap();
605
606 let loaded = store.load("wf_steps").unwrap().unwrap();
607 for (i, status) in step_statuses.iter().enumerate() {
608 assert_eq!(loaded.steps[i].status, *status);
609 }
610 }
611}