Skip to main content

pylon_runtime/
workflows.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Mutex;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6use serde::{Deserialize, Serialize};
7
8// ---------------------------------------------------------------------------
9// Workflow definitions
10// ---------------------------------------------------------------------------
11
12#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
13pub enum WorkflowStatus {
14    Pending,
15    Running,
16    Sleeping,
17    WaitingForEvent,
18    Completed,
19    Failed,
20    Cancelled,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum StepStatus {
25    Pending,
26    Running,
27    Completed,
28    Failed,
29    Skipped,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct StepResult {
34    pub step_id: String,
35    pub name: String,
36    pub status: StepStatus,
37    pub output: Option<serde_json::Value>,
38    pub error: Option<String>,
39    pub started_at: Option<String>,
40    pub completed_at: Option<String>,
41    pub duration_ms: Option<u64>,
42    pub retry_count: u32,
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct WorkflowInstance {
47    pub id: String,
48    pub name: String,
49    pub input: serde_json::Value,
50    pub status: WorkflowStatus,
51    pub steps: Vec<StepResult>,
52    pub output: Option<serde_json::Value>,
53    pub error: Option<String>,
54    pub created_at: String,
55    pub started_at: Option<String>,
56    pub completed_at: Option<String>,
57    /// If sleeping, when to wake up (unix timestamp seconds).
58    pub wake_at: Option<u64>,
59    /// If waiting for an event, the event name.
60    pub waiting_for: Option<String>,
61    /// Current step index being executed.
62    pub current_step: usize,
63    /// Max retries per step.
64    pub max_retries: u32,
65}
66
67/// A registered workflow definition.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct WorkflowDef {
70    pub name: String,
71    pub description: String,
72    /// The TypeScript file that defines this workflow.
73    pub file: String,
74    /// Max retries per step.
75    pub max_retries: u32,
76    /// Timeout per step in seconds.
77    pub step_timeout_secs: u64,
78}
79
80// ---------------------------------------------------------------------------
81// Workflow Engine
82// ---------------------------------------------------------------------------
83
84pub struct WorkflowEngine {
85    /// Registered workflow definitions.
86    definitions: Mutex<HashMap<String, WorkflowDef>>,
87    /// Active and historical workflow instances.
88    instances: Mutex<HashMap<String, WorkflowInstance>>,
89    /// URL of the TypeScript workflow runner.
90    runner_url: String,
91    /// Max instances to keep in history (unused currently, reserved for GC).
92    #[allow(dead_code)]
93    max_history: usize,
94}
95
96impl WorkflowEngine {
97    pub fn new(runner_url: &str, max_history: usize) -> Self {
98        Self {
99            definitions: Mutex::new(HashMap::new()),
100            instances: Mutex::new(HashMap::new()),
101            runner_url: runner_url.to_string(),
102            max_history,
103        }
104    }
105
106    /// Register a workflow definition.
107    pub fn register(&self, def: WorkflowDef) {
108        self.definitions
109            .lock()
110            .unwrap()
111            .insert(def.name.clone(), def);
112    }
113
114    /// Start a new workflow instance. Returns the instance ID.
115    pub fn start(&self, name: &str, input: serde_json::Value) -> Result<String, String> {
116        let defs = self.definitions.lock().unwrap();
117        let def = defs
118            .get(name)
119            .ok_or_else(|| format!("Workflow '{}' not registered", name))?;
120
121        let id = generate_workflow_id();
122        let instance = WorkflowInstance {
123            id: id.clone(),
124            name: name.to_string(),
125            input,
126            status: WorkflowStatus::Pending,
127            steps: Vec::new(),
128            output: None,
129            error: None,
130            created_at: now_iso(),
131            started_at: None,
132            completed_at: None,
133            wake_at: None,
134            waiting_for: None,
135            current_step: 0,
136            max_retries: def.max_retries,
137        };
138
139        self.instances.lock().unwrap().insert(id.clone(), instance);
140        Ok(id)
141    }
142
143    /// Execute the next step of a workflow by calling the TS runner.
144    ///
145    /// The TS runner returns an action object describing what happened:
146    /// - `{ "action": "step_complete", "step_name": "...", "output": ... }`
147    /// - `{ "action": "sleep", "duration": "24h" }`
148    /// - `{ "action": "wait_event", "event": "user_confirmed" }`
149    /// - `{ "action": "complete", "output": ... }`
150    /// - `{ "action": "fail", "error": "...", "step_name": "..." }`
151    pub fn advance(&self, workflow_id: &str) -> Result<WorkflowStatus, String> {
152        let instance = {
153            let instances = self.instances.lock().unwrap();
154            instances
155                .get(workflow_id)
156                .cloned()
157                .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?
158        };
159
160        // Terminal states: nothing to do.
161        match instance.status {
162            WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Cancelled => {
163                return Ok(instance.status);
164            }
165            WorkflowStatus::Sleeping => {
166                if let Some(wake_at) = instance.wake_at {
167                    let now = SystemTime::now()
168                        .duration_since(UNIX_EPOCH)
169                        .unwrap_or_default()
170                        .as_secs();
171                    if now < wake_at {
172                        return Ok(WorkflowStatus::Sleeping);
173                    }
174                }
175                // Timer expired -- fall through to advance.
176            }
177            _ => {}
178        }
179
180        let request = serde_json::json!({
181            "workflow_id": workflow_id,
182            "workflow_name": instance.name,
183            "input": instance.input,
184            "current_step": instance.current_step,
185            "completed_steps": instance.steps,
186        });
187
188        let response = self.call_runner(&request)?;
189        self.apply_response(workflow_id, &response)
190    }
191
192    /// Advance a workflow with a pre-provided response (for testing without
193    /// a running TS runner).
194    pub fn advance_with_response(
195        &self,
196        workflow_id: &str,
197        response: serde_json::Value,
198    ) -> Result<WorkflowStatus, String> {
199        // Verify the workflow exists and is advanceable.
200        {
201            let instances = self.instances.lock().unwrap();
202            let instance = instances
203                .get(workflow_id)
204                .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?;
205
206            match instance.status {
207                WorkflowStatus::Completed | WorkflowStatus::Failed | WorkflowStatus::Cancelled => {
208                    return Ok(instance.status.clone());
209                }
210                _ => {}
211            }
212        }
213
214        self.apply_response(workflow_id, &response)
215    }
216
217    /// Send an event to a waiting workflow.
218    pub fn send_event(
219        &self,
220        workflow_id: &str,
221        event: &str,
222        data: serde_json::Value,
223    ) -> Result<(), String> {
224        let mut instances = self.instances.lock().unwrap();
225        let inst = instances.get_mut(workflow_id).ok_or("Workflow not found")?;
226
227        if inst.status != WorkflowStatus::WaitingForEvent {
228            return Err("Workflow is not waiting for an event".into());
229        }
230
231        if inst.waiting_for.as_deref() != Some(event) {
232            return Err(format!(
233                "Workflow is waiting for '{}', not '{event}'",
234                inst.waiting_for.as_deref().unwrap_or("")
235            ));
236        }
237
238        inst.steps.push(StepResult {
239            step_id: format!("step_{}", inst.steps.len()),
240            name: format!("event:{event}"),
241            status: StepStatus::Completed,
242            output: Some(data),
243            error: None,
244            started_at: Some(now_iso()),
245            completed_at: Some(now_iso()),
246            duration_ms: None,
247            retry_count: 0,
248        });
249        inst.current_step += 1;
250        inst.status = WorkflowStatus::Running;
251        inst.waiting_for = None;
252
253        Ok(())
254    }
255
256    /// Cancel a workflow.
257    pub fn cancel(&self, workflow_id: &str) -> Result<(), String> {
258        let mut instances = self.instances.lock().unwrap();
259        let inst = instances.get_mut(workflow_id).ok_or("Workflow not found")?;
260        inst.status = WorkflowStatus::Cancelled;
261        inst.completed_at = Some(now_iso());
262        Ok(())
263    }
264
265    /// Get a workflow instance by ID.
266    pub fn get(&self, workflow_id: &str) -> Option<WorkflowInstance> {
267        self.instances.lock().unwrap().get(workflow_id).cloned()
268    }
269
270    /// List all workflow instances with optional status filter.
271    pub fn list(&self, status: Option<&WorkflowStatus>) -> Vec<WorkflowInstance> {
272        let instances = self.instances.lock().unwrap();
273        instances
274            .values()
275            .filter(|i| {
276                status
277                    .map(|s| std::mem::discriminant(&i.status) == std::mem::discriminant(s))
278                    .unwrap_or(true)
279            })
280            .cloned()
281            .collect()
282    }
283
284    /// List registered workflow definitions.
285    pub fn definitions(&self) -> Vec<WorkflowDef> {
286        self.definitions.lock().unwrap().values().cloned().collect()
287    }
288
289    /// Wake sleeping workflows whose timer has expired. Returns the IDs of
290    /// workflows that were woken.
291    pub fn wake_sleeping(&self) -> Vec<String> {
292        let now = SystemTime::now()
293            .duration_since(UNIX_EPOCH)
294            .unwrap_or_default()
295            .as_secs();
296        let mut woken = Vec::new();
297        let mut instances = self.instances.lock().unwrap();
298
299        for (id, inst) in instances.iter_mut() {
300            if inst.status == WorkflowStatus::Sleeping {
301                if let Some(wake_at) = inst.wake_at {
302                    if now >= wake_at {
303                        inst.status = WorkflowStatus::Running;
304                        inst.wake_at = None;
305                        woken.push(id.clone());
306                    }
307                }
308            }
309        }
310
311        woken
312    }
313
314    // -----------------------------------------------------------------------
315    // Persistence
316    // -----------------------------------------------------------------------
317
318    /// Restore active and sleeping workflows from a persistent store.
319    ///
320    /// Loads all non-terminal workflows and inserts them into the in-memory
321    /// instance map. Returns the number of workflows restored.
322    ///
323    /// Call this once at startup, before the engine begins processing.
324    pub fn restore_from(&self, store: &crate::workflow_store::WorkflowStore) -> usize {
325        let mut count = 0;
326
327        let active = store.load_active().unwrap_or_default();
328        let sleeping = store.load_sleeping().unwrap_or_default();
329
330        let mut instances = self.instances.lock().unwrap();
331
332        for wf in active {
333            instances.insert(wf.id.clone(), wf);
334            count += 1;
335        }
336        for wf in sleeping {
337            // Avoid double-counting if load_active and load_sleeping overlap
338            // (they shouldn't given the status filters, but guard anyway).
339            if !instances.contains_key(&wf.id) {
340                instances.insert(wf.id.clone(), wf);
341                count += 1;
342            }
343        }
344
345        count
346    }
347
348    // -----------------------------------------------------------------------
349    // Internal
350    // -----------------------------------------------------------------------
351
352    /// Apply a runner response to the workflow, updating state accordingly.
353    fn apply_response(
354        &self,
355        workflow_id: &str,
356        response: &serde_json::Value,
357    ) -> Result<WorkflowStatus, String> {
358        let action = response
359            .get("action")
360            .and_then(|v| v.as_str())
361            .unwrap_or("fail");
362
363        let mut instances = self.instances.lock().unwrap();
364        let inst = instances
365            .get_mut(workflow_id)
366            .ok_or_else(|| format!("Workflow '{}' not found", workflow_id))?;
367
368        if inst.started_at.is_none() {
369            inst.started_at = Some(now_iso());
370        }
371
372        match action {
373            "step_complete" => {
374                let step_name = response
375                    .get("step_name")
376                    .and_then(|v| v.as_str())
377                    .unwrap_or("unknown");
378                let output = response.get("output").cloned();
379
380                inst.steps.push(StepResult {
381                    step_id: format!("step_{}", inst.steps.len()),
382                    name: step_name.to_string(),
383                    status: StepStatus::Completed,
384                    output,
385                    error: None,
386                    started_at: Some(now_iso()),
387                    completed_at: Some(now_iso()),
388                    duration_ms: response.get("duration_ms").and_then(|v| v.as_u64()),
389                    retry_count: 0,
390                });
391                inst.current_step += 1;
392                inst.status = WorkflowStatus::Running;
393
394                Ok(WorkflowStatus::Running)
395            }
396            "sleep" => {
397                let duration_str = response
398                    .get("duration")
399                    .and_then(|v| v.as_str())
400                    .unwrap_or("0s");
401                let secs = parse_duration_str(duration_str);
402                let wake_at = SystemTime::now()
403                    .duration_since(UNIX_EPOCH)
404                    .unwrap_or_default()
405                    .as_secs()
406                    + secs;
407
408                inst.status = WorkflowStatus::Sleeping;
409                inst.wake_at = Some(wake_at);
410                inst.current_step += 1;
411
412                Ok(WorkflowStatus::Sleeping)
413            }
414            "wait_event" => {
415                let event = response
416                    .get("event")
417                    .and_then(|v| v.as_str())
418                    .unwrap_or("")
419                    .to_string();
420                inst.status = WorkflowStatus::WaitingForEvent;
421                inst.waiting_for = Some(event);
422
423                Ok(WorkflowStatus::WaitingForEvent)
424            }
425            "complete" => {
426                inst.status = WorkflowStatus::Completed;
427                inst.output = response.get("output").cloned();
428                inst.completed_at = Some(now_iso());
429
430                Ok(WorkflowStatus::Completed)
431            }
432            "fail" => {
433                let error = response
434                    .get("error")
435                    .and_then(|v| v.as_str())
436                    .unwrap_or("Unknown error")
437                    .to_string();
438
439                let step_name = response
440                    .get("step_name")
441                    .and_then(|v| v.as_str())
442                    .unwrap_or("unknown");
443
444                // Count previous failures for the same step to decide retry.
445                let retry_count = inst
446                    .steps
447                    .iter()
448                    .filter(|s| s.name == step_name && s.status == StepStatus::Failed)
449                    .count() as u32;
450
451                if retry_count < inst.max_retries {
452                    inst.steps.push(StepResult {
453                        step_id: format!("step_{}", inst.steps.len()),
454                        name: step_name.to_string(),
455                        status: StepStatus::Failed,
456                        output: None,
457                        error: Some(error),
458                        started_at: Some(now_iso()),
459                        completed_at: Some(now_iso()),
460                        duration_ms: None,
461                        retry_count: retry_count + 1,
462                    });
463                    // Don't advance current_step -- retry the same step.
464                    Ok(WorkflowStatus::Running)
465                } else {
466                    inst.status = WorkflowStatus::Failed;
467                    inst.error = Some(error);
468                    inst.completed_at = Some(now_iso());
469                    Ok(WorkflowStatus::Failed)
470                }
471            }
472            _ => Err(format!("Unknown action: {action}")),
473        }
474    }
475
476    /// Call the TypeScript workflow runner via HTTP.
477    fn call_runner(&self, request: &serde_json::Value) -> Result<serde_json::Value, String> {
478        use std::io::{Read, Write};
479        use std::net::TcpStream;
480
481        let url = &self.runner_url;
482        let host = url.strip_prefix("http://").unwrap_or(url);
483        let (host_port, path) = match host.find('/') {
484            Some(i) => (&host[..i], &host[i..]),
485            None => (host, "/"),
486        };
487
488        let body = request.to_string();
489        let http_request = format!(
490            "POST {} HTTP/1.1\r\n\
491             Host: {}\r\n\
492             Content-Type: application/json\r\n\
493             Content-Length: {}\r\n\
494             Connection: close\r\n\
495             \r\n\
496             {}",
497            path,
498            host_port,
499            body.len(),
500            body
501        );
502
503        let mut stream = TcpStream::connect(host_port)
504            .map_err(|e| format!("Failed to connect to workflow runner: {e}"))?;
505        stream.set_read_timeout(Some(Duration::from_secs(30))).ok();
506        stream
507            .write_all(http_request.as_bytes())
508            .map_err(|e| format!("Write failed: {e}"))?;
509
510        let mut response = String::new();
511        stream.read_to_string(&mut response).ok();
512
513        let body = response.split("\r\n\r\n").nth(1).unwrap_or("{}");
514        serde_json::from_str(body).map_err(|e| format!("Failed to parse runner response: {e}"))
515    }
516}
517
518// ---------------------------------------------------------------------------
519// Helpers
520// ---------------------------------------------------------------------------
521
522/// Parse a human-readable duration like "24h", "30m", "5s", "1d".
523fn parse_duration_str(s: &str) -> u64 {
524    let s = s.trim();
525    if let Some(n) = s.strip_suffix('s') {
526        n.parse().unwrap_or(0)
527    } else if let Some(n) = s.strip_suffix('m') {
528        n.parse::<u64>().unwrap_or(0) * 60
529    } else if let Some(n) = s.strip_suffix('h') {
530        n.parse::<u64>().unwrap_or(0) * 3600
531    } else if let Some(n) = s.strip_suffix('d') {
532        n.parse::<u64>().unwrap_or(0) * 86400
533    } else {
534        s.parse().unwrap_or(0)
535    }
536}
537
538fn generate_workflow_id() -> String {
539    use std::collections::hash_map::DefaultHasher;
540    use std::hash::{Hash, Hasher};
541
542    static COUNTER: AtomicU64 = AtomicU64::new(0);
543
544    let ts = SystemTime::now()
545        .duration_since(UNIX_EPOCH)
546        .unwrap_or_default();
547    let count = COUNTER.fetch_add(1, Ordering::Relaxed);
548
549    let mut hasher = DefaultHasher::new();
550    ts.as_nanos().hash(&mut hasher);
551    count.hash(&mut hasher);
552
553    format!("wf_{:016x}", hasher.finish())
554}
555
556fn now_iso() -> String {
557    let ts = SystemTime::now()
558        .duration_since(UNIX_EPOCH)
559        .unwrap_or_default()
560        .as_secs();
561    format!("{ts}Z")
562}
563
564// ---------------------------------------------------------------------------
565// Tests
566// ---------------------------------------------------------------------------
567
568#[cfg(test)]
569mod tests {
570    use super::*;
571
572    fn engine() -> WorkflowEngine {
573        let e = WorkflowEngine::new("http://127.0.0.1:19999/run", 100);
574        e.register(WorkflowDef {
575            name: "onboarding".into(),
576            description: "User onboarding flow".into(),
577            file: "workflows/onboarding.ts".into(),
578            max_retries: 3,
579            step_timeout_secs: 30,
580        });
581        e
582    }
583
584    // -- Registration & start -----------------------------------------------
585
586    #[test]
587    fn register_and_list_definitions() {
588        let e = engine();
589        let defs = e.definitions();
590        assert_eq!(defs.len(), 1);
591        assert_eq!(defs[0].name, "onboarding");
592    }
593
594    #[test]
595    fn start_creates_pending_instance() {
596        let e = engine();
597        let id = e
598            .start("onboarding", serde_json::json!({"user": "alice"}))
599            .unwrap();
600        let inst = e.get(&id).unwrap();
601        assert_eq!(inst.status, WorkflowStatus::Pending);
602        assert_eq!(inst.name, "onboarding");
603        assert_eq!(inst.input, serde_json::json!({"user": "alice"}));
604        assert_eq!(inst.current_step, 0);
605    }
606
607    #[test]
608    fn start_unknown_workflow_errors() {
609        let e = engine();
610        let err = e.start("nonexistent", serde_json::json!({})).unwrap_err();
611        assert!(err.contains("not registered"));
612    }
613
614    // -- Step recording via advance_with_response ---------------------------
615
616    #[test]
617    fn step_complete_advances_workflow() {
618        let e = engine();
619        let id = e.start("onboarding", serde_json::json!({})).unwrap();
620
621        let status = e
622            .advance_with_response(
623                &id,
624                serde_json::json!({
625                    "action": "step_complete",
626                    "step_name": "create_account",
627                    "output": {"account_id": 42},
628                    "duration_ms": 120
629                }),
630            )
631            .unwrap();
632
633        assert_eq!(status, WorkflowStatus::Running);
634        let inst = e.get(&id).unwrap();
635        assert_eq!(inst.current_step, 1);
636        assert_eq!(inst.steps.len(), 1);
637        assert_eq!(inst.steps[0].name, "create_account");
638        assert_eq!(inst.steps[0].status, StepStatus::Completed);
639        assert_eq!(
640            inst.steps[0].output,
641            Some(serde_json::json!({"account_id": 42}))
642        );
643        assert_eq!(inst.steps[0].duration_ms, Some(120));
644        assert!(inst.started_at.is_some());
645    }
646
647    #[test]
648    fn multiple_steps_advance_sequentially() {
649        let e = engine();
650        let id = e.start("onboarding", serde_json::json!({})).unwrap();
651
652        e.advance_with_response(
653            &id,
654            serde_json::json!({"action": "step_complete", "step_name": "step_a"}),
655        )
656        .unwrap();
657
658        e.advance_with_response(
659            &id,
660            serde_json::json!({"action": "step_complete", "step_name": "step_b"}),
661        )
662        .unwrap();
663
664        let inst = e.get(&id).unwrap();
665        assert_eq!(inst.current_step, 2);
666        assert_eq!(inst.steps.len(), 2);
667        assert_eq!(inst.steps[0].name, "step_a");
668        assert_eq!(inst.steps[1].name, "step_b");
669    }
670
671    // -- Sleep & wake -------------------------------------------------------
672
673    #[test]
674    fn sleep_sets_wake_at_and_status() {
675        let e = engine();
676        let id = e.start("onboarding", serde_json::json!({})).unwrap();
677
678        let status = e
679            .advance_with_response(
680                &id,
681                serde_json::json!({"action": "sleep", "duration": "1h"}),
682            )
683            .unwrap();
684
685        assert_eq!(status, WorkflowStatus::Sleeping);
686        let inst = e.get(&id).unwrap();
687        assert!(inst.wake_at.is_some());
688        // wake_at should be roughly now + 3600
689        let now = SystemTime::now()
690            .duration_since(UNIX_EPOCH)
691            .unwrap()
692            .as_secs();
693        let delta = inst.wake_at.unwrap().abs_diff(now + 3600);
694        assert!(delta < 5, "wake_at should be ~1h from now, delta={delta}");
695    }
696
697    #[test]
698    fn wake_sleeping_wakes_expired_workflows() {
699        let e = engine();
700        let id = e.start("onboarding", serde_json::json!({})).unwrap();
701
702        // Sleep for 0 seconds (immediately expired).
703        e.advance_with_response(
704            &id,
705            serde_json::json!({"action": "sleep", "duration": "0s"}),
706        )
707        .unwrap();
708
709        let woken = e.wake_sleeping();
710        assert!(woken.contains(&id));
711
712        let inst = e.get(&id).unwrap();
713        assert_eq!(inst.status, WorkflowStatus::Running);
714        assert!(inst.wake_at.is_none());
715    }
716
717    #[test]
718    fn wake_sleeping_does_not_wake_future_timers() {
719        let e = engine();
720        let id = e.start("onboarding", serde_json::json!({})).unwrap();
721
722        e.advance_with_response(
723            &id,
724            serde_json::json!({"action": "sleep", "duration": "24h"}),
725        )
726        .unwrap();
727
728        let woken = e.wake_sleeping();
729        assert!(woken.is_empty());
730
731        let inst = e.get(&id).unwrap();
732        assert_eq!(inst.status, WorkflowStatus::Sleeping);
733    }
734
735    // -- Event sending ------------------------------------------------------
736
737    #[test]
738    fn wait_event_and_send_event() {
739        let e = engine();
740        let id = e.start("onboarding", serde_json::json!({})).unwrap();
741
742        let status = e
743            .advance_with_response(
744                &id,
745                serde_json::json!({"action": "wait_event", "event": "user_confirmed"}),
746            )
747            .unwrap();
748        assert_eq!(status, WorkflowStatus::WaitingForEvent);
749
750        e.send_event(
751            &id,
752            "user_confirmed",
753            serde_json::json!({"confirmed": true}),
754        )
755        .unwrap();
756
757        let inst = e.get(&id).unwrap();
758        assert_eq!(inst.status, WorkflowStatus::Running);
759        assert!(inst.waiting_for.is_none());
760        assert_eq!(inst.steps.last().unwrap().name, "event:user_confirmed");
761        assert_eq!(
762            inst.steps.last().unwrap().output,
763            Some(serde_json::json!({"confirmed": true}))
764        );
765    }
766
767    #[test]
768    fn send_event_wrong_name_errors() {
769        let e = engine();
770        let id = e.start("onboarding", serde_json::json!({})).unwrap();
771
772        e.advance_with_response(
773            &id,
774            serde_json::json!({"action": "wait_event", "event": "user_confirmed"}),
775        )
776        .unwrap();
777
778        let err = e
779            .send_event(&id, "wrong_event", serde_json::json!({}))
780            .unwrap_err();
781        assert!(err.contains("waiting for 'user_confirmed'"));
782    }
783
784    #[test]
785    fn send_event_not_waiting_errors() {
786        let e = engine();
787        let id = e.start("onboarding", serde_json::json!({})).unwrap();
788
789        let err = e
790            .send_event(&id, "anything", serde_json::json!({}))
791            .unwrap_err();
792        assert!(err.contains("not waiting"));
793    }
794
795    // -- Cancel -------------------------------------------------------------
796
797    #[test]
798    fn cancel_sets_status_and_completed_at() {
799        let e = engine();
800        let id = e.start("onboarding", serde_json::json!({})).unwrap();
801
802        e.cancel(&id).unwrap();
803
804        let inst = e.get(&id).unwrap();
805        assert_eq!(inst.status, WorkflowStatus::Cancelled);
806        assert!(inst.completed_at.is_some());
807    }
808
809    #[test]
810    fn cancel_unknown_workflow_errors() {
811        let e = engine();
812        let err = e.cancel("wf_nonexistent").unwrap_err();
813        assert!(err.contains("not found"));
814    }
815
816    // -- Completion ---------------------------------------------------------
817
818    #[test]
819    fn complete_sets_output_and_status() {
820        let e = engine();
821        let id = e.start("onboarding", serde_json::json!({})).unwrap();
822
823        let status = e
824            .advance_with_response(
825                &id,
826                serde_json::json!({"action": "complete", "output": {"result": "done"}}),
827            )
828            .unwrap();
829
830        assert_eq!(status, WorkflowStatus::Completed);
831        let inst = e.get(&id).unwrap();
832        assert_eq!(inst.output, Some(serde_json::json!({"result": "done"})));
833        assert!(inst.completed_at.is_some());
834    }
835
836    #[test]
837    fn advance_completed_workflow_returns_status() {
838        let e = engine();
839        let id = e.start("onboarding", serde_json::json!({})).unwrap();
840
841        e.advance_with_response(
842            &id,
843            serde_json::json!({"action": "complete", "output": null}),
844        )
845        .unwrap();
846
847        let status = e
848            .advance_with_response(
849                &id,
850                serde_json::json!({"action": "step_complete", "step_name": "ignored"}),
851            )
852            .unwrap();
853        assert_eq!(status, WorkflowStatus::Completed);
854    }
855
856    // -- Retry on failure ---------------------------------------------------
857
858    #[test]
859    fn failure_retries_up_to_max() {
860        let e = engine(); // max_retries = 3
861        let id = e.start("onboarding", serde_json::json!({})).unwrap();
862
863        // First 3 failures should retry (not mark workflow as Failed).
864        for i in 0..3 {
865            let status = e
866                .advance_with_response(
867                    &id,
868                    serde_json::json!({
869                        "action": "fail",
870                        "step_name": "flaky_step",
871                        "error": format!("attempt {i}")
872                    }),
873                )
874                .unwrap();
875            assert_eq!(
876                status,
877                WorkflowStatus::Running,
878                "retry {i} should keep running"
879            );
880        }
881
882        // 4th failure exceeds max_retries, workflow should fail.
883        let status = e
884            .advance_with_response(
885                &id,
886                serde_json::json!({
887                    "action": "fail",
888                    "step_name": "flaky_step",
889                    "error": "final failure"
890                }),
891            )
892            .unwrap();
893        assert_eq!(status, WorkflowStatus::Failed);
894
895        let inst = e.get(&id).unwrap();
896        assert_eq!(inst.error, Some("final failure".into()));
897        assert!(inst.completed_at.is_some());
898        // current_step should not have advanced (all retries on same step).
899        assert_eq!(inst.current_step, 0);
900    }
901
902    #[test]
903    fn failure_then_success_works() {
904        let e = engine();
905        let id = e.start("onboarding", serde_json::json!({})).unwrap();
906
907        // Fail once.
908        e.advance_with_response(
909            &id,
910            serde_json::json!({"action": "fail", "step_name": "flakey", "error": "oops"}),
911        )
912        .unwrap();
913
914        // Succeed on retry.
915        e.advance_with_response(
916            &id,
917            serde_json::json!({"action": "step_complete", "step_name": "flakey", "output": "ok"}),
918        )
919        .unwrap();
920
921        let inst = e.get(&id).unwrap();
922        assert_eq!(inst.current_step, 1);
923        assert_eq!(inst.steps.len(), 2);
924        assert_eq!(inst.steps[0].status, StepStatus::Failed);
925        assert_eq!(inst.steps[1].status, StepStatus::Completed);
926    }
927
928    // -- parse_duration_str -------------------------------------------------
929
930    #[test]
931    fn parse_duration_seconds() {
932        assert_eq!(parse_duration_str("30s"), 30);
933    }
934
935    #[test]
936    fn parse_duration_minutes() {
937        assert_eq!(parse_duration_str("5m"), 300);
938    }
939
940    #[test]
941    fn parse_duration_hours() {
942        assert_eq!(parse_duration_str("24h"), 86400);
943    }
944
945    #[test]
946    fn parse_duration_days() {
947        assert_eq!(parse_duration_str("7d"), 604800);
948    }
949
950    #[test]
951    fn parse_duration_bare_number() {
952        assert_eq!(parse_duration_str("60"), 60);
953    }
954
955    #[test]
956    fn parse_duration_invalid() {
957        assert_eq!(parse_duration_str("abc"), 0);
958    }
959
960    #[test]
961    fn parse_duration_with_whitespace() {
962        assert_eq!(parse_duration_str("  10s  "), 10);
963    }
964
965    // -- List by status -----------------------------------------------------
966
967    #[test]
968    fn list_all_instances() {
969        let e = engine();
970        e.start("onboarding", serde_json::json!({})).unwrap();
971        e.start("onboarding", serde_json::json!({})).unwrap();
972
973        let all = e.list(None);
974        assert_eq!(all.len(), 2);
975    }
976
977    #[test]
978    fn list_filters_by_status() {
979        let e = engine();
980        let id1 = e.start("onboarding", serde_json::json!({})).unwrap();
981        let _id2 = e.start("onboarding", serde_json::json!({})).unwrap();
982
983        // Complete one.
984        e.advance_with_response(
985            &id1,
986            serde_json::json!({"action": "complete", "output": null}),
987        )
988        .unwrap();
989
990        let completed = e.list(Some(&WorkflowStatus::Completed));
991        assert_eq!(completed.len(), 1);
992        assert_eq!(completed[0].id, id1);
993
994        let pending = e.list(Some(&WorkflowStatus::Pending));
995        assert_eq!(pending.len(), 1);
996    }
997
998    // -- Unknown action returns error ---------------------------------------
999
1000    #[test]
1001    fn unknown_action_returns_error() {
1002        let e = engine();
1003        let id = e.start("onboarding", serde_json::json!({})).unwrap();
1004
1005        let err = e
1006            .advance_with_response(&id, serde_json::json!({"action": "bogus"}))
1007            .unwrap_err();
1008        assert!(err.contains("Unknown action"));
1009    }
1010
1011    // -- ID generation uniqueness -------------------------------------------
1012
1013    #[test]
1014    fn generated_ids_are_unique() {
1015        let mut ids = std::collections::HashSet::new();
1016        for _ in 0..100 {
1017            let id = generate_workflow_id();
1018            assert!(ids.insert(id), "duplicate workflow ID generated");
1019        }
1020    }
1021
1022    // -- Restore from store -------------------------------------------------
1023
1024    #[test]
1025    fn restore_from_store() {
1026        let store = crate::workflow_store::WorkflowStore::in_memory().unwrap();
1027
1028        // Save a pending workflow.
1029        let wf_pending = WorkflowInstance {
1030            id: "wf_aaa".into(),
1031            name: "onboarding".into(),
1032            input: serde_json::json!({"user": "bob"}),
1033            status: WorkflowStatus::Pending,
1034            steps: Vec::new(),
1035            output: None,
1036            error: None,
1037            created_at: "1000Z".into(),
1038            started_at: None,
1039            completed_at: None,
1040            wake_at: None,
1041            waiting_for: None,
1042            current_step: 0,
1043            max_retries: 3,
1044        };
1045
1046        // Save a sleeping workflow.
1047        let wf_sleeping = WorkflowInstance {
1048            id: "wf_bbb".into(),
1049            name: "onboarding".into(),
1050            input: serde_json::json!({}),
1051            status: WorkflowStatus::Sleeping,
1052            steps: vec![StepResult {
1053                step_id: "step_0".into(),
1054                name: "init".into(),
1055                status: StepStatus::Completed,
1056                output: Some(serde_json::json!({"ok": true})),
1057                error: None,
1058                started_at: Some("1000Z".into()),
1059                completed_at: Some("1001Z".into()),
1060                duration_ms: Some(50),
1061                retry_count: 0,
1062            }],
1063            output: None,
1064            error: None,
1065            created_at: "1000Z".into(),
1066            started_at: Some("1000Z".into()),
1067            completed_at: None,
1068            wake_at: Some(99999999),
1069            waiting_for: None,
1070            current_step: 1,
1071            max_retries: 3,
1072        };
1073
1074        // Save a completed workflow (should NOT be restored).
1075        let wf_completed = WorkflowInstance {
1076            id: "wf_ccc".into(),
1077            name: "onboarding".into(),
1078            input: serde_json::json!({}),
1079            status: WorkflowStatus::Completed,
1080            steps: Vec::new(),
1081            output: Some(serde_json::json!({"done": true})),
1082            error: None,
1083            created_at: "500Z".into(),
1084            started_at: Some("500Z".into()),
1085            completed_at: Some("600Z".into()),
1086            wake_at: None,
1087            waiting_for: None,
1088            current_step: 0,
1089            max_retries: 3,
1090        };
1091
1092        store.save(&wf_pending).unwrap();
1093        store.save(&wf_sleeping).unwrap();
1094        store.save(&wf_completed).unwrap();
1095
1096        let e = WorkflowEngine::new("http://127.0.0.1:19999/run", 100);
1097        let restored = e.restore_from(&store);
1098        assert_eq!(restored, 2);
1099
1100        // Verify the pending workflow is present.
1101        let inst = e.get("wf_aaa").unwrap();
1102        assert_eq!(inst.status, WorkflowStatus::Pending);
1103        assert_eq!(inst.input, serde_json::json!({"user": "bob"}));
1104
1105        // Verify the sleeping workflow is present with its step.
1106        let inst = e.get("wf_bbb").unwrap();
1107        assert_eq!(inst.status, WorkflowStatus::Sleeping);
1108        assert_eq!(inst.steps.len(), 1);
1109        assert_eq!(inst.wake_at, Some(99999999));
1110
1111        // Verify the completed workflow was NOT restored.
1112        assert!(e.get("wf_ccc").is_none());
1113    }
1114}