1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Mutex;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub enum WorkflowStatus {
14 Pending,
15 Running,
16 Sleeping,
17 WaitingForEvent,
18 Completed,
19 Failed,
20 Cancelled,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum StepStatus {
25 Pending,
26 Running,
27 Completed,
28 Failed,
29 Skipped,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct StepResult {
34 pub step_id: String,
35 pub name: String,
36 pub status: StepStatus,
37 pub output: Option<serde_json::Value>,
38 pub error: Option<String>,
39 pub started_at: Option<String>,
40 pub completed_at: Option<String>,
41 pub duration_ms: Option<u64>,
42 pub retry_count: u32,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct WorkflowInstance {
47 pub id: String,
48 pub name: String,
49 pub input: serde_json::Value,
50 pub status: WorkflowStatus,
51 pub steps: Vec<StepResult>,
52 pub output: Option<serde_json::Value>,
53 pub error: Option<String>,
54 pub created_at: String,
55 pub started_at: Option<String>,
56 pub completed_at: Option<String>,
57 pub wake_at: Option<u64>,
59 pub waiting_for: Option<String>,
61 pub current_step: usize,
63 pub max_retries: u32,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct WorkflowDef {
70 pub name: String,
71 pub description: String,
72 pub file: String,
74 pub max_retries: u32,
76 pub step_timeout_secs: u64,
78}
79
80pub struct WorkflowEngine {
85 definitions: Mutex<HashMap<String, WorkflowDef>>,
87 instances: Mutex<HashMap<String, WorkflowInstance>>,
89 runner_url: String,
91 #[allow(dead_code)]
93 max_history: usize,
94}
95
96impl WorkflowEngine {
97 pub fn new(runner_url: &str, max_history: usize) -> Self {
98 Self {
99 definitions: Mutex::new(HashMap::new()),
100 instances: Mutex::new(HashMap::new()),
101 runner_url: runner_url.to_string(),
102 max_history,
103 }
104 }
105
106 pub fn register(&self, def: WorkflowDef) {
108 self.definitions
109 .lock()
110 .unwrap()
111 .insert(def.name.clone(), def);
112 }
113
114 pub fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
116 let defs = self.definitions.lock().unwrap();
117 let def = defs
118 .get(name)
119 .ok_or_else(|| format!("Workflow '{}' not registered", name))?;
120
121 let id = generate_workflow_id();
122 let instance = WorkflowInstance {
123 id: id.clone(),
124 name: name.to_string(),
125 input,
126 status: WorkflowStatus::Pending,
127 steps: Vec::new(),
128 output: None,
129 error: None,
130 created_at: now_iso(),
131 started_at: None,
132 completed_at: None,
133 wake_at: None,
134 waiting_for: None,
135 current_step: 0,
136 max_retries: def.max_retries,
137 };
138
139 self.instances.lock().unwrap().insert(id.clone(), instance);
140 Ok(id)
141 }
142
143 pub fn advance(&self, workflow_id: &str) -> Result<WorkflowStatus, String> {
152 let instance = {
153 let instances = self.instances.lock().unwrap();
154 instances
155 .get(workflow_id)
156 .cloned()
157 .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?
158 };
159
160 match instance.status {
162 WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Cancelled => {
163 return Ok(instance.status);
164 }
165 WorkflowStatus::Sleeping => {
166 if let Some(wake_at) = instance.wake_at {
167 let now = SystemTime::now()
168 .duration_since(UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_secs();
171 if now < wake_at {
172 return Ok(WorkflowStatus::Sleeping);
173 }
174 }
175 }
177 _ => {}
178 }
179
180 let request = serde_json::json!({
181 "workflow_id": workflow_id,
182 "workflow_name": instance.name,
183 "input": instance.input,
184 "current_step": instance.current_step,
185 "completed_steps": instance.steps,
186 });
187
188 let response = self.call_runner(&request)?;
189 self.apply_response(workflow_id, &response)
190 }
191
192 pub fn advance_with_response(
195 &self,
196 workflow_id: &str,
197 response: serde_json::Value,
198 ) -> Result<WorkflowStatus, String> {
199 {
201 let instances = self.instances.lock().unwrap();
202 let instance = instances
203 .get(workflow_id)
204 .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?;
205
206 match instance.status {
207 WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Cancelled => {
208 return Ok(instance.status.clone());
209 }
210 _ => {}
211 }
212 }
213
214 self.apply_response(workflow_id, &response)
215 }
216
217 pub fn send_event(
219 &self,
220 workflow_id: &str,
221 event: &str,
222 data: serde_json::Value,
223 ) -> Result<(), String> {
224 let mut instances = self.instances.lock().unwrap();
225 let inst = instances.get_mut(workflow_id).ok_or("Workflow not found")?;
226
227 if inst.status != WorkflowStatus::WaitingForEvent {
228 return Err("Workflow is not waiting for an event".into());
229 }
230
231 if inst.waiting_for.as_deref() != Some(event) {
232 return Err(format!(
233 "Workflow is waiting for '{}', not '{event}'",
234 inst.waiting_for.as_deref().unwrap_or("")
235 ));
236 }
237
238 inst.steps.push(StepResult {
239 step_id: format!("step_{}", inst.steps.len()),
240 name: format!("event:{event}"),
241 status: StepStatus::Completed,
242 output: Some(data),
243 error: None,
244 started_at: Some(now_iso()),
245 completed_at: Some(now_iso()),
246 duration_ms: None,
247 retry_count: 0,
248 });
249 inst.current_step += 1;
250 inst.status = WorkflowStatus::Running;
251 inst.waiting_for = None;
252
253 Ok(())
254 }
255
256 pub fn cancel(&self, workflow_id: &str) -> Result<(), String> {
258 let mut instances = self.instances.lock().unwrap();
259 let inst = instances.get_mut(workflow_id).ok_or("Workflow not found")?;
260 inst.status = WorkflowStatus::Cancelled;
261 inst.completed_at = Some(now_iso());
262 Ok(())
263 }
264
265 pub fn get(&self, workflow_id: &str) -> Option<WorkflowInstance> {
267 self.instances.lock().unwrap().get(workflow_id).cloned()
268 }
269
270 pub fn list(&self, status: Option<&WorkflowStatus>) -> Vec<WorkflowInstance> {
272 let instances = self.instances.lock().unwrap();
273 instances
274 .values()
275 .filter(|i| {
276 status
277 .map(|s| std::mem::discriminant(&i.status) == std::mem::discriminant(s))
278 .unwrap_or(true)
279 })
280 .cloned()
281 .collect()
282 }
283
284 pub fn definitions(&self) -> Vec<WorkflowDef> {
286 self.definitions.lock().unwrap().values().cloned().collect()
287 }
288
289 pub fn wake_sleeping(&self) -> Vec<String> {
292 let now = SystemTime::now()
293 .duration_since(UNIX_EPOCH)
294 .unwrap_or_default()
295 .as_secs();
296 let mut woken = Vec::new();
297 let mut instances = self.instances.lock().unwrap();
298
299 for (id, inst) in instances.iter_mut() {
300 if inst.status == WorkflowStatus::Sleeping {
301 if let Some(wake_at) = inst.wake_at {
302 if now >= wake_at {
303 inst.status = WorkflowStatus::Running;
304 inst.wake_at = None;
305 woken.push(id.clone());
306 }
307 }
308 }
309 }
310
311 woken
312 }
313
314 pub fn restore_from(&self, store: &crate::workflow_store::WorkflowStore) -> usize {
325 let mut count = 0;
326
327 let active = store.load_active().unwrap_or_default();
328 let sleeping = store.load_sleeping().unwrap_or_default();
329
330 let mut instances = self.instances.lock().unwrap();
331
332 for wf in active {
333 instances.insert(wf.id.clone(), wf);
334 count += 1;
335 }
336 for wf in sleeping {
337 if !instances.contains_key(&wf.id) {
340 instances.insert(wf.id.clone(), wf);
341 count += 1;
342 }
343 }
344
345 count
346 }
347
348 fn apply_response(
354 &self,
355 workflow_id: &str,
356 response: &serde_json::Value,
357 ) -> Result<WorkflowStatus, String> {
358 let action = response
359 .get("action")
360 .and_then(|v| v.as_str())
361 .unwrap_or("fail");
362
363 let mut instances = self.instances.lock().unwrap();
364 let inst = instances
365 .get_mut(workflow_id)
366 .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?;
367
368 if inst.started_at.is_none() {
369 inst.started_at = Some(now_iso());
370 }
371
372 match action {
373 "step_complete" => {
374 let step_name = response
375 .get("step_name")
376 .and_then(|v| v.as_str())
377 .unwrap_or("unknown");
378 let output = response.get("output").cloned();
379
380 inst.steps.push(StepResult {
381 step_id: format!("step_{}", inst.steps.len()),
382 name: step_name.to_string(),
383 status: StepStatus::Completed,
384 output,
385 error: None,
386 started_at: Some(now_iso()),
387 completed_at: Some(now_iso()),
388 duration_ms: response.get("duration_ms").and_then(|v| v.as_u64()),
389 retry_count: 0,
390 });
391 inst.current_step += 1;
392 inst.status = WorkflowStatus::Running;
393
394 Ok(WorkflowStatus::Running)
395 }
396 "sleep" => {
397 let duration_str = response
398 .get("duration")
399 .and_then(|v| v.as_str())
400 .unwrap_or("0s");
401 let secs = parse_duration_str(duration_str);
402 let wake_at = SystemTime::now()
403 .duration_since(UNIX_EPOCH)
404 .unwrap_or_default()
405 .as_secs()
406 + secs;
407
408 inst.status = WorkflowStatus::Sleeping;
409 inst.wake_at = Some(wake_at);
410 inst.current_step += 1;
411
412 Ok(WorkflowStatus::Sleeping)
413 }
414 "wait_event" => {
415 let event = response
416 .get("event")
417 .and_then(|v| v.as_str())
418 .unwrap_or("")
419 .to_string();
420 inst.status = WorkflowStatus::WaitingForEvent;
421 inst.waiting_for = Some(event);
422
423 Ok(WorkflowStatus::WaitingForEvent)
424 }
425 "complete" => {
426 inst.status = WorkflowStatus::Completed;
427 inst.output = response.get("output").cloned();
428 inst.completed_at = Some(now_iso());
429
430 Ok(WorkflowStatus::Completed)
431 }
432 "fail" => {
433 let error = response
434 .get("error")
435 .and_then(|v| v.as_str())
436 .unwrap_or("Unknown error")
437 .to_string();
438
439 let step_name = response
440 .get("step_name")
441 .and_then(|v| v.as_str())
442 .unwrap_or("unknown");
443
444 let retry_count = inst
446 .steps
447 .iter()
448 .filter(|s| s.name == step_name && s.status == StepStatus::Failed)
449 .count() as u32;
450
451 if retry_count < inst.max_retries {
452 inst.steps.push(StepResult {
453 step_id: format!("step_{}", inst.steps.len()),
454 name: step_name.to_string(),
455 status: StepStatus::Failed,
456 output: None,
457 error: Some(error),
458 started_at: Some(now_iso()),
459 completed_at: Some(now_iso()),
460 duration_ms: None,
461 retry_count: retry_count + 1,
462 });
463 Ok(WorkflowStatus::Running)
465 } else {
466 inst.status = WorkflowStatus::Failed;
467 inst.error = Some(error);
468 inst.completed_at = Some(now_iso());
469 Ok(WorkflowStatus::Failed)
470 }
471 }
472 _ => Err(format!("Unknown action: {action}")),
473 }
474 }
475
476 fn call_runner(&self, request: &serde_json::Value) -> Result<serde_json::Value, String> {
478 use std::io::{Read, Write};
479 use std::net::TcpStream;
480
481 let url = &self.runner_url;
482 let host = url.strip_prefix("http://").unwrap_or(url);
483 let (host_port, path) = match host.find('/') {
484 Some(i) => (&host[..i], &host[i..]),
485 None => (host, "/"),
486 };
487
488 let body = request.to_string();
489 let http_request = format!(
490 "POST {} HTTP/1.1\r\n\
491 Host: {}\r\n\
492 Content-Type: application/json\r\n\
493 Content-Length: {}\r\n\
494 Connection: close\r\n\
495 \r\n\
496 {}",
497 path,
498 host_port,
499 body.len(),
500 body
501 );
502
503 let mut stream = TcpStream::connect(host_port)
504 .map_err(|e| format!("Failed to connect to workflow runner: {e}"))?;
505 stream.set_read_timeout(Some(Duration::from_secs(30))).ok();
506 stream
507 .write_all(http_request.as_bytes())
508 .map_err(|e| format!("Write failed: {e}"))?;
509
510 let mut response = String::new();
511 stream.read_to_string(&mut response).ok();
512
513 let body = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
514 serde_json::from_str(body).map_err(|e| format!("Failed to parse runner response: {e}"))
515 }
516}
517
518fn parse_duration_str(s: &str) -> u64 {
524 let s = s.trim();
525 if let Some(n) = s.strip_suffix('s') {
526 n.parse().unwrap_or(0)
527 } else if let Some(n) = s.strip_suffix('m') {
528 n.parse::<u64>().unwrap_or(0) * 60
529 } else if let Some(n) = s.strip_suffix('h') {
530 n.parse::<u64>().unwrap_or(0) * 3600
531 } else if let Some(n) = s.strip_suffix('d') {
532 n.parse::<u64>().unwrap_or(0) * 86400
533 } else {
534 s.parse().unwrap_or(0)
535 }
536}
537
538fn generate_workflow_id() -> String {
539 use std::collections::hash_map::DefaultHasher;
540 use std::hash::{Hash, Hasher};
541
542 static COUNTER: AtomicU64 = AtomicU64::new(0);
543
544 let ts = SystemTime::now()
545 .duration_since(UNIX_EPOCH)
546 .unwrap_or_default();
547 let count = COUNTER.fetch_add(1, Ordering::Relaxed);
548
549 let mut hasher = DefaultHasher::new();
550 ts.as_nanos().hash(&mut hasher);
551 count.hash(&mut hasher);
552
553 format!("wf_{:016x}", hasher.finish())
554}
555
556fn now_iso() -> String {
557 let ts = SystemTime::now()
558 .duration_since(UNIX_EPOCH)
559 .unwrap_or_default()
560 .as_secs();
561 format!("{ts}Z")
562}
563
564#[cfg(test)]
569mod tests {
570 use super::*;
571
572 fn engine() -> WorkflowEngine {
573 let e = WorkflowEngine::new("http://127.0.0.1:19999/run", 100);
574 e.register(WorkflowDef {
575 name: "onboarding".into(),
576 description: "User onboarding flow".into(),
577 file: "workflows/onboarding.ts".into(),
578 max_retries: 3,
579 step_timeout_secs: 30,
580 });
581 e
582 }
583
584 #[test]
587 fn register_and_list_definitions() {
588 let e = engine();
589 let defs = e.definitions();
590 assert_eq!(defs.len(), 1);
591 assert_eq!(defs[0].name, "onboarding");
592 }
593
594 #[test]
595 fn start_creates_pending_instance() {
596 let e = engine();
597 let id = e
598 .start("onboarding", serde_json::json!({"user": "alice"}))
599 .unwrap();
600 let inst = e.get(&id).unwrap();
601 assert_eq!(inst.status, WorkflowStatus::Pending);
602 assert_eq!(inst.name, "onboarding");
603 assert_eq!(inst.input, serde_json::json!({"user": "alice"}));
604 assert_eq!(inst.current_step, 0);
605 }
606
607 #[test]
608 fn start_unknown_workflow_errors() {
609 let e = engine();
610 let err = e.start("nonexistent", serde_json::json!({})).unwrap_err();
611 assert!(err.contains("not registered"));
612 }
613
614 #[test]
617 fn step_complete_advances_workflow() {
618 let e = engine();
619 let id = e.start("onboarding", serde_json::json!({})).unwrap();
620
621 let status = e
622 .advance_with_response(
623 &id,
624 serde_json::json!({
625 "action": "step_complete",
626 "step_name": "create_account",
627 "output": {"account_id": 42},
628 "duration_ms": 120
629 }),
630 )
631 .unwrap();
632
633 assert_eq!(status, WorkflowStatus::Running);
634 let inst = e.get(&id).unwrap();
635 assert_eq!(inst.current_step, 1);
636 assert_eq!(inst.steps.len(), 1);
637 assert_eq!(inst.steps[0].name, "create_account");
638 assert_eq!(inst.steps[0].status, StepStatus::Completed);
639 assert_eq!(
640 inst.steps[0].output,
641 Some(serde_json::json!({"account_id": 42}))
642 );
643 assert_eq!(inst.steps[0].duration_ms, Some(120));
644 assert!(inst.started_at.is_some());
645 }
646
647 #[test]
648 fn multiple_steps_advance_sequentially() {
649 let e = engine();
650 let id = e.start("onboarding", serde_json::json!({})).unwrap();
651
652 e.advance_with_response(
653 &id,
654 serde_json::json!({"action": "step_complete", "step_name": "step_a"}),
655 )
656 .unwrap();
657
658 e.advance_with_response(
659 &id,
660 serde_json::json!({"action": "step_complete", "step_name": "step_b"}),
661 )
662 .unwrap();
663
664 let inst = e.get(&id).unwrap();
665 assert_eq!(inst.current_step, 2);
666 assert_eq!(inst.steps.len(), 2);
667 assert_eq!(inst.steps[0].name, "step_a");
668 assert_eq!(inst.steps[1].name, "step_b");
669 }
670
671 #[test]
674 fn sleep_sets_wake_at_and_status() {
675 let e = engine();
676 let id = e.start("onboarding", serde_json::json!({})).unwrap();
677
678 let status = e
679 .advance_with_response(
680 &id,
681 serde_json::json!({"action": "sleep", "duration": "1h"}),
682 )
683 .unwrap();
684
685 assert_eq!(status, WorkflowStatus::Sleeping);
686 let inst = e.get(&id).unwrap();
687 assert!(inst.wake_at.is_some());
688 let now = SystemTime::now()
690 .duration_since(UNIX_EPOCH)
691 .unwrap()
692 .as_secs();
693 let delta = inst.wake_at.unwrap().abs_diff(now + 3600);
694 assert!(delta < 5, "wake_at should be ~1h from now, delta={delta}");
695 }
696
697 #[test]
698 fn wake_sleeping_wakes_expired_workflows() {
699 let e = engine();
700 let id = e.start("onboarding", serde_json::json!({})).unwrap();
701
702 e.advance_with_response(
704 &id,
705 serde_json::json!({"action": "sleep", "duration": "0s"}),
706 )
707 .unwrap();
708
709 let woken = e.wake_sleeping();
710 assert!(woken.contains(&id));
711
712 let inst = e.get(&id).unwrap();
713 assert_eq!(inst.status, WorkflowStatus::Running);
714 assert!(inst.wake_at.is_none());
715 }
716
717 #[test]
718 fn wake_sleeping_does_not_wake_future_timers() {
719 let e = engine();
720 let id = e.start("onboarding", serde_json::json!({})).unwrap();
721
722 e.advance_with_response(
723 &id,
724 serde_json::json!({"action": "sleep", "duration": "24h"}),
725 )
726 .unwrap();
727
728 let woken = e.wake_sleeping();
729 assert!(woken.is_empty());
730
731 let inst = e.get(&id).unwrap();
732 assert_eq!(inst.status, WorkflowStatus::Sleeping);
733 }
734
735 #[test]
738 fn wait_event_and_send_event() {
739 let e = engine();
740 let id = e.start("onboarding", serde_json::json!({})).unwrap();
741
742 let status = e
743 .advance_with_response(
744 &id,
745 serde_json::json!({"action": "wait_event", "event": "user_confirmed"}),
746 )
747 .unwrap();
748 assert_eq!(status, WorkflowStatus::WaitingForEvent);
749
750 e.send_event(
751 &id,
752 "user_confirmed",
753 serde_json::json!({"confirmed": true}),
754 )
755 .unwrap();
756
757 let inst = e.get(&id).unwrap();
758 assert_eq!(inst.status, WorkflowStatus::Running);
759 assert!(inst.waiting_for.is_none());
760 assert_eq!(inst.steps.last().unwrap().name, "event:user_confirmed");
761 assert_eq!(
762 inst.steps.last().unwrap().output,
763 Some(serde_json::json!({"confirmed": true}))
764 );
765 }
766
767 #[test]
768 fn send_event_wrong_name_errors() {
769 let e = engine();
770 let id = e.start("onboarding", serde_json::json!({})).unwrap();
771
772 e.advance_with_response(
773 &id,
774 serde_json::json!({"action": "wait_event", "event": "user_confirmed"}),
775 )
776 .unwrap();
777
778 let err = e
779 .send_event(&id, "wrong_event", serde_json::json!({}))
780 .unwrap_err();
781 assert!(err.contains("waiting for 'user_confirmed'"));
782 }
783
784 #[test]
785 fn send_event_not_waiting_errors() {
786 let e = engine();
787 let id = e.start("onboarding", serde_json::json!({})).unwrap();
788
789 let err = e
790 .send_event(&id, "anything", serde_json::json!({}))
791 .unwrap_err();
792 assert!(err.contains("not waiting"));
793 }
794
795 #[test]
798 fn cancel_sets_status_and_completed_at() {
799 let e = engine();
800 let id = e.start("onboarding", serde_json::json!({})).unwrap();
801
802 e.cancel(&id).unwrap();
803
804 let inst = e.get(&id).unwrap();
805 assert_eq!(inst.status, WorkflowStatus::Cancelled);
806 assert!(inst.completed_at.is_some());
807 }
808
809 #[test]
810 fn cancel_unknown_workflow_errors() {
811 let e = engine();
812 let err = e.cancel("wf_nonexistent").unwrap_err();
813 assert!(err.contains("not found"));
814 }
815
816 #[test]
819 fn complete_sets_output_and_status() {
820 let e = engine();
821 let id = e.start("onboarding", serde_json::json!({})).unwrap();
822
823 let status = e
824 .advance_with_response(
825 &id,
826 serde_json::json!({"action": "complete", "output": {"result": "done"}}),
827 )
828 .unwrap();
829
830 assert_eq!(status, WorkflowStatus::Completed);
831 let inst = e.get(&id).unwrap();
832 assert_eq!(inst.output, Some(serde_json::json!({"result": "done"})));
833 assert!(inst.completed_at.is_some());
834 }
835
836 #[test]
837 fn advance_completed_workflow_returns_status() {
838 let e = engine();
839 let id = e.start("onboarding", serde_json::json!({})).unwrap();
840
841 e.advance_with_response(
842 &id,
843 serde_json::json!({"action": "complete", "output": null}),
844 )
845 .unwrap();
846
847 let status = e
848 .advance_with_response(
849 &id,
850 serde_json::json!({"action": "step_complete", "step_name": "ignored"}),
851 )
852 .unwrap();
853 assert_eq!(status, WorkflowStatus::Completed);
854 }
855
856 #[test]
859 fn failure_retries_up_to_max() {
860 let e = engine(); let id = e.start("onboarding", serde_json::json!({})).unwrap();
862
863 for i in 0..3 {
865 let status = e
866 .advance_with_response(
867 &id,
868 serde_json::json!({
869 "action": "fail",
870 "step_name": "flaky_step",
871 "error": format!("attempt {i}")
872 }),
873 )
874 .unwrap();
875 assert_eq!(
876 status,
877 WorkflowStatus::Running,
878 "retry {i} should keep running"
879 );
880 }
881
882 let status = e
884 .advance_with_response(
885 &id,
886 serde_json::json!({
887 "action": "fail",
888 "step_name": "flaky_step",
889 "error": "final failure"
890 }),
891 )
892 .unwrap();
893 assert_eq!(status, WorkflowStatus::Failed);
894
895 let inst = e.get(&id).unwrap();
896 assert_eq!(inst.error, Some("final failure".into()));
897 assert!(inst.completed_at.is_some());
898 assert_eq!(inst.current_step, 0);
900 }
901
902 #[test]
903 fn failure_then_success_works() {
904 let e = engine();
905 let id = e.start("onboarding", serde_json::json!({})).unwrap();
906
907 e.advance_with_response(
909 &id,
910 serde_json::json!({"action": "fail", "step_name": "flakey", "error": "oops"}),
911 )
912 .unwrap();
913
914 e.advance_with_response(
916 &id,
917 serde_json::json!({"action": "step_complete", "step_name": "flakey", "output": "ok"}),
918 )
919 .unwrap();
920
921 let inst = e.get(&id).unwrap();
922 assert_eq!(inst.current_step, 1);
923 assert_eq!(inst.steps.len(), 2);
924 assert_eq!(inst.steps[0].status, StepStatus::Failed);
925 assert_eq!(inst.steps[1].status, StepStatus::Completed);
926 }
927
928 #[test]
931 fn parse_duration_seconds() {
932 assert_eq!(parse_duration_str("30s"), 30);
933 }
934
935 #[test]
936 fn parse_duration_minutes() {
937 assert_eq!(parse_duration_str("5m"), 300);
938 }
939
940 #[test]
941 fn parse_duration_hours() {
942 assert_eq!(parse_duration_str("24h"), 86400);
943 }
944
945 #[test]
946 fn parse_duration_days() {
947 assert_eq!(parse_duration_str("7d"), 604800);
948 }
949
950 #[test]
951 fn parse_duration_bare_number() {
952 assert_eq!(parse_duration_str("60"), 60);
953 }
954
955 #[test]
956 fn parse_duration_invalid() {
957 assert_eq!(parse_duration_str("abc"), 0);
958 }
959
960 #[test]
961 fn parse_duration_with_whitespace() {
962 assert_eq!(parse_duration_str(" 10s "), 10);
963 }
964
965 #[test]
968 fn list_all_instances() {
969 let e = engine();
970 e.start("onboarding", serde_json::json!({})).unwrap();
971 e.start("onboarding", serde_json::json!({})).unwrap();
972
973 let all = e.list(None);
974 assert_eq!(all.len(), 2);
975 }
976
977 #[test]
978 fn list_filters_by_status() {
979 let e = engine();
980 let id1 = e.start("onboarding", serde_json::json!({})).unwrap();
981 let _id2 = e.start("onboarding", serde_json::json!({})).unwrap();
982
983 e.advance_with_response(
985 &id1,
986 serde_json::json!({"action": "complete", "output": null}),
987 )
988 .unwrap();
989
990 let completed = e.list(Some(&WorkflowStatus::Completed));
991 assert_eq!(completed.len(), 1);
992 assert_eq!(completed[0].id, id1);
993
994 let pending = e.list(Some(&WorkflowStatus::Pending));
995 assert_eq!(pending.len(), 1);
996 }
997
998 #[test]
1001 fn unknown_action_returns_error() {
1002 let e = engine();
1003 let id = e.start("onboarding", serde_json::json!({})).unwrap();
1004
1005 let err = e
1006 .advance_with_response(&id, serde_json::json!({"action": "bogus"}))
1007 .unwrap_err();
1008 assert!(err.contains("Unknown action"));
1009 }
1010
1011 #[test]
1014 fn generated_ids_are_unique() {
1015 let mut ids = std::collections::HashSet::new();
1016 for _ in 0..100 {
1017 let id = generate_workflow_id();
1018 assert!(ids.insert(id), "duplicate workflow ID generated");
1019 }
1020 }
1021
1022 #[test]
1025 fn restore_from_store() {
1026 let store = crate::workflow_store::WorkflowStore::in_memory().unwrap();
1027
1028 let wf_pending = WorkflowInstance {
1030 id: "wf_aaa".into(),
1031 name: "onboarding".into(),
1032 input: serde_json::json!({"user": "bob"}),
1033 status: WorkflowStatus::Pending,
1034 steps: Vec::new(),
1035 output: None,
1036 error: None,
1037 created_at: "1000Z".into(),
1038 started_at: None,
1039 completed_at: None,
1040 wake_at: None,
1041 waiting_for: None,
1042 current_step: 0,
1043 max_retries: 3,
1044 };
1045
1046 let wf_sleeping = WorkflowInstance {
1048 id: "wf_bbb".into(),
1049 name: "onboarding".into(),
1050 input: serde_json::json!({}),
1051 status: WorkflowStatus::Sleeping,
1052 steps: vec![StepResult {
1053 step_id: "step_0".into(),
1054 name: "init".into(),
1055 status: StepStatus::Completed,
1056 output: Some(serde_json::json!({"ok": true})),
1057 error: None,
1058 started_at: Some("1000Z".into()),
1059 completed_at: Some("1001Z".into()),
1060 duration_ms: Some(50),
1061 retry_count: 0,
1062 }],
1063 output: None,
1064 error: None,
1065 created_at: "1000Z".into(),
1066 started_at: Some("1000Z".into()),
1067 completed_at: None,
1068 wake_at: Some(99999999),
1069 waiting_for: None,
1070 current_step: 1,
1071 max_retries: 3,
1072 };
1073
1074 let wf_completed = WorkflowInstance {
1076 id: "wf_ccc".into(),
1077 name: "onboarding".into(),
1078 input: serde_json::json!({}),
1079 status: WorkflowStatus::Completed,
1080 steps: Vec::new(),
1081 output: Some(serde_json::json!({"done": true})),
1082 error: None,
1083 created_at: "500Z".into(),
1084 started_at: Some("500Z".into()),
1085 completed_at: Some("600Z".into()),
1086 wake_at: None,
1087 waiting_for: None,
1088 current_step: 0,
1089 max_retries: 3,
1090 };
1091
1092 store.save(&wf_pending).unwrap();
1093 store.save(&wf_sleeping).unwrap();
1094 store.save(&wf_completed).unwrap();
1095
1096 let e = WorkflowEngine::new("http://127.0.0.1:19999/run", 100);
1097 let restored = e.restore_from(&store);
1098 assert_eq!(restored, 2);
1099
1100 let inst = e.get("wf_aaa").unwrap();
1102 assert_eq!(inst.status, WorkflowStatus::Pending);
1103 assert_eq!(inst.input, serde_json::json!({"user": "bob"}));
1104
1105 let inst = e.get("wf_bbb").unwrap();
1107 assert_eq!(inst.status, WorkflowStatus::Sleeping);
1108 assert_eq!(inst.steps.len(), 1);
1109 assert_eq!(inst.wake_at, Some(99999999));
1110
1111 assert!(e.get("wf_ccc").is_none());
1113 }
1114}