Skip to main content

argentor_orchestrator/
workflow.rs

1//! Configurable workflow engine for automating business pipelines.
2//!
3//! Provides a declarative way to define multi-step workflows with conditional
4//! branching, failure handling, and multiple trigger types. Each workflow is
5//! a sequence of [`WorkflowStepDef`] items executed by the [`WorkflowEngine`],
6//! which tracks state through [`WorkflowRun`] instances.
7//!
8//! # Pre-built templates
9//!
10//! - [`lead_qualification_workflow`] — CRM lead qualification pipeline
11//! - [`support_ticket_workflow`] — Customer support ticket routing pipeline
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20// ---------------------------------------------------------------------------
21// Trigger
22// ---------------------------------------------------------------------------
23
24/// What starts a workflow.
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(tag = "type", rename_all = "snake_case")]
27pub enum WorkflowTrigger {
28    /// Started manually by a user or API call.
29    Manual,
30    /// Started by an incoming webhook event.
31    Webhook {
32        /// Event name or pattern that triggers the workflow.
33        event: String,
34    },
35    /// Started on a cron schedule.
36    Schedule {
37        /// Cron expression (e.g., `"0 9 * * MON"`).
38        cron: String,
39    },
40    /// Started when a metric crosses a threshold.
41    Threshold {
42        /// Name of the metric to monitor.
43        metric: String,
44        /// Comparison operator (e.g., `">"`, `">="`, `"<"`).
45        condition: String,
46        /// Threshold value.
47        value: f64,
48    },
49}
50
51// ---------------------------------------------------------------------------
52// Step types & conditions
53// ---------------------------------------------------------------------------
54
55/// What a step does.
56#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum StepType {
59    /// Dispatch a task to an AI agent.
60    AgentTask {
61        /// Role of the agent to dispatch to (e.g., "coder", "reviewer").
62        agent_role: String,
63        /// Template string for the agent's prompt.
64        prompt_template: String,
65    },
66    /// Make an HTTP call.
67    HttpCall {
68        /// HTTP method (e.g., "GET", "POST").
69        method: String,
70        /// Target URL.
71        url: String,
72        /// Optional body template (supports variable interpolation).
73        body_template: Option<String>,
74    },
75    /// Branch to one of two steps by id based on an expression.
76    Condition {
77        /// Boolean expression to evaluate.
78        expression: String,
79        /// Step id to jump to when the expression is true.
80        if_true: String,
81        /// Step id to jump to when the expression is false.
82        if_false: String,
83    },
84    /// Wait for a fixed duration.
85    Delay {
86        /// Number of seconds to wait.
87        seconds: u64,
88    },
89    /// Send a notification.
90    Notification {
91        /// Target channel (e.g., "slack", "email").
92        channel: String,
93        /// Message template (supports variable interpolation).
94        message_template: String,
95    },
96    /// Escalate to a human operator.
97    AssignToHuman {
98        /// Team or group to assign to.
99        team: String,
100        /// Escalation message.
101        message: String,
102    },
103}
104
105/// Conditions that gate whether a step should execute.
106#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107#[serde(tag = "type", rename_all = "snake_case")]
108pub enum StepCondition {
109    /// Always execute.
110    Always,
111    /// Execute only if the previous step succeeded.
112    IfPreviousSucceeded,
113    /// Execute if a field in the run context equals a value.
114    IfFieldEquals {
115        /// Field name to check.
116        field: String,
117        /// Expected value.
118        value: String,
119    },
120    /// Execute if a numeric field exceeds a threshold.
121    IfScoreAbove {
122        /// Field name to check.
123        field: String,
124        /// Minimum value (exclusive).
125        threshold: f64,
126    },
127    /// Evaluate a simple boolean expression string.
128    Expression(String),
129}
130
131/// What to do when a step fails.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
133#[serde(rename_all = "snake_case")]
134pub enum FailureAction {
135    /// Stop the entire workflow.
136    Abort,
137    /// Skip the failed step and continue.
138    Skip,
139    /// Retry the step up to `max` times.
140    Retry {
141        /// Maximum number of retry attempts.
142        max: u32,
143    },
144    /// Jump to a specific step by id.
145    GoTo {
146        /// Target step identifier.
147        step_id: String,
148    },
149}
150
151// ---------------------------------------------------------------------------
152// Workflow definition
153// ---------------------------------------------------------------------------
154
155/// A single step inside a workflow definition.
156#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct WorkflowStepDef {
158    /// Unique identifier for this step within its workflow.
159    pub id: String,
160    /// Human-readable step name.
161    pub name: String,
162    /// What this step does (agent task, HTTP call, condition, etc.).
163    pub step_type: StepType,
164    /// Optional guard that gates whether this step should execute.
165    pub condition: Option<StepCondition>,
166    /// Strategy when this step fails (abort, skip, retry, goto).
167    pub on_failure: FailureAction,
168    /// Maximum seconds this step may run before being timed out.
169    pub timeout_seconds: Option<u64>,
170}
171
172/// Complete workflow definition -- the "blueprint" for a pipeline.
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct WorkflowDefinition {
175    /// Unique identifier for this workflow definition.
176    pub id: String,
177    /// Human-readable name.
178    pub name: String,
179    /// Free-text description of the workflow's purpose.
180    pub description: String,
181    /// What starts this workflow (manual, webhook, schedule, threshold).
182    pub trigger: WorkflowTrigger,
183    /// Ordered list of steps that make up the pipeline.
184    pub steps: Vec<WorkflowStepDef>,
185    /// Maximum seconds the entire workflow may run before timing out.
186    pub timeout_seconds: Option<u64>,
187}
188
189// ---------------------------------------------------------------------------
190// Run-time state
191// ---------------------------------------------------------------------------
192
193/// Lifecycle status of a workflow run.
194#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195#[serde(rename_all = "snake_case")]
196pub enum RunStatus {
197    /// Run has been created but not yet started.
198    Pending,
199    /// Run is actively executing steps.
200    Running,
201    /// All steps finished successfully.
202    Completed,
203    /// A step failed and the failure action was abort.
204    Failed,
205    /// Run was manually paused.
206    Paused,
207    /// Run exceeded the workflow-level timeout.
208    TimedOut,
209}
210
211/// Status of an individual step execution.
212#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
213#[serde(rename_all = "snake_case")]
214pub enum StepStatus {
215    /// Step finished successfully.
216    Completed,
217    /// Step failed during execution.
218    Failed,
219    /// Step was skipped because its condition was not met.
220    Skipped,
221    /// Step exceeded its timeout.
222    TimedOut,
223}
224
225/// Result of executing a single step.
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct StepResult {
228    /// Identifier of the step that was executed.
229    pub step_id: String,
230    /// Outcome status of the step.
231    pub status: StepStatus,
232    /// Structured output produced by the step.
233    pub output: serde_json::Value,
234    /// Wall-clock execution time in milliseconds.
235    pub duration_ms: u64,
236    /// Error message if the step failed.
237    pub error: Option<String>,
238}
239
240/// A running (or completed) instance of a workflow.
241#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct WorkflowRun {
243    /// Unique run identifier.
244    pub run_id: String,
245    /// Identifier of the workflow definition being executed.
246    pub workflow_id: String,
247    /// Current lifecycle status of the run.
248    pub status: RunStatus,
249    /// Index of the next step to execute.
250    pub current_step_index: usize,
251    /// Data that triggered this run (e.g., webhook payload).
252    pub trigger_data: serde_json::Value,
253    /// Results of steps already executed, in order.
254    pub step_results: Vec<StepResult>,
255    /// UTC timestamp of when the run was created.
256    pub created_at: DateTime<Utc>,
257    /// UTC timestamp of the last state change.
258    pub updated_at: DateTime<Utc>,
259}
260
261// ---------------------------------------------------------------------------
262// Engine
263// ---------------------------------------------------------------------------
264
265/// Thread-safe workflow engine that stores definitions and runs.
266#[derive(Clone)]
267pub struct WorkflowEngine {
268    workflows: Arc<RwLock<HashMap<String, WorkflowDefinition>>>,
269    runs: Arc<RwLock<HashMap<String, WorkflowRun>>>,
270}
271
272impl WorkflowEngine {
273    /// Create a new, empty workflow engine.
274    pub fn new() -> Self {
275        Self {
276            workflows: Arc::new(RwLock::new(HashMap::new())),
277            runs: Arc::new(RwLock::new(HashMap::new())),
278        }
279    }
280
281    /// Register a workflow definition. Overwrites if id already exists.
282    pub async fn register_workflow(&self, workflow: WorkflowDefinition) {
283        let id = workflow.id.clone();
284        self.workflows.write().await.insert(id, workflow);
285    }
286
287    /// Get a registered workflow definition by id.
288    pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowDefinition> {
289        self.workflows.read().await.get(workflow_id).cloned()
290    }
291
292    /// Start a new run of the given workflow. Returns the `run_id`.
293    ///
294    /// Returns `None` if the workflow id is not registered.
295    pub async fn start(
296        &self,
297        workflow_id: &str,
298        trigger_data: serde_json::Value,
299    ) -> Option<String> {
300        let workflows = self.workflows.read().await;
301        if !workflows.contains_key(workflow_id) {
302            return None;
303        }
304        drop(workflows);
305
306        let run_id = Uuid::new_v4().to_string();
307        let now = Utc::now();
308        let run = WorkflowRun {
309            run_id: run_id.clone(),
310            workflow_id: workflow_id.to_string(),
311            status: RunStatus::Pending,
312            current_step_index: 0,
313            trigger_data,
314            step_results: Vec::new(),
315            created_at: now,
316            updated_at: now,
317        };
318        self.runs.write().await.insert(run_id.clone(), run);
319        Some(run_id)
320    }
321
322    /// Advance a run to the next step. Evaluates the step condition, executes
323    /// a simulated step, records the result, and moves the index forward.
324    ///
325    /// Returns `Ok(true)` if the step was advanced, `Ok(false)` if the run
326    /// is already completed/failed/timed-out, and `Err` on unknown run id.
327    pub async fn advance(&self, run_id: &str) -> Result<bool, String> {
328        // --- Retrieve run and workflow snapshots (drop locks early) --------
329        let (mut run, workflow) = {
330            let runs = self.runs.read().await;
331            let run = runs
332                .get(run_id)
333                .ok_or_else(|| format!("run {run_id} not found"))?
334                .clone();
335
336            let workflows = self.workflows.read().await;
337            let workflow = workflows
338                .get(&run.workflow_id)
339                .ok_or_else(|| format!("workflow {} not found", run.workflow_id))?
340                .clone();
341            (run, workflow)
342        };
343
344        // Terminal states — nothing to advance.
345        if matches!(
346            run.status,
347            RunStatus::Completed | RunStatus::Failed | RunStatus::TimedOut
348        ) {
349            return Ok(false);
350        }
351
352        // If still Pending, transition to Running.
353        if run.status == RunStatus::Pending {
354            run.status = RunStatus::Running;
355        }
356
357        // Check if we've exhausted all steps.
358        if run.current_step_index >= workflow.steps.len() {
359            run.status = RunStatus::Completed;
360            run.updated_at = Utc::now();
361            self.runs.write().await.insert(run_id.to_string(), run);
362            return Ok(false);
363        }
364
365        let step = &workflow.steps[run.current_step_index];
366
367        // --- Evaluate condition -------------------------------------------
368        let should_execute = evaluate_condition(&step.condition, &run);
369
370        let start = std::time::Instant::now();
371
372        let result = if should_execute {
373            execute_step(step, &run)
374        } else {
375            StepResult {
376                step_id: step.id.clone(),
377                status: StepStatus::Skipped,
378                output: serde_json::json!({ "skipped": true }),
379                duration_ms: 0,
380                error: None,
381            }
382        };
383
384        let duration_ms = start.elapsed().as_millis() as u64;
385
386        // Resolve the result with actual timing if it was executed.
387        let result = if should_execute {
388            StepResult {
389                duration_ms,
390                ..result
391            }
392        } else {
393            result
394        };
395
396        // Handle branching for Condition steps.
397        let mut next_index = run.current_step_index + 1;
398        if let StepType::Condition {
399            ref expression,
400            ref if_true,
401            ref if_false,
402        } = step.step_type
403        {
404            let branch_target = if evaluate_expression(expression, &run) {
405                if_true
406            } else {
407                if_false
408            };
409            // Find the target step index by id.
410            if let Some(idx) = workflow.steps.iter().position(|s| s.id == *branch_target) {
411                next_index = idx;
412            }
413        }
414
415        // Handle failure actions.
416        if result.status == StepStatus::Failed {
417            match &step.on_failure {
418                FailureAction::Abort => {
419                    run.step_results.push(result);
420                    run.status = RunStatus::Failed;
421                    run.updated_at = Utc::now();
422                    self.runs.write().await.insert(run_id.to_string(), run);
423                    return Ok(true);
424                }
425                FailureAction::Skip => {
426                    // fall through to advance
427                }
428                FailureAction::Retry { max } => {
429                    let retry_count = run
430                        .step_results
431                        .iter()
432                        .filter(|r| r.step_id == step.id && r.status == StepStatus::Failed)
433                        .count() as u32;
434                    if retry_count < *max {
435                        // Record the failure but do not advance the index.
436                        run.step_results.push(result);
437                        run.updated_at = Utc::now();
438                        self.runs.write().await.insert(run_id.to_string(), run);
439                        return Ok(true);
440                    }
441                    // Exhausted retries — abort.
442                    run.step_results.push(result);
443                    run.status = RunStatus::Failed;
444                    run.updated_at = Utc::now();
445                    self.runs.write().await.insert(run_id.to_string(), run);
446                    return Ok(true);
447                }
448                FailureAction::GoTo { step_id } => {
449                    if let Some(idx) = workflow.steps.iter().position(|s| s.id == *step_id) {
450                        next_index = idx;
451                    }
452                }
453            }
454        }
455
456        run.step_results.push(result);
457        run.current_step_index = next_index;
458
459        // Check if run is complete.
460        if run.current_step_index >= workflow.steps.len() {
461            run.status = RunStatus::Completed;
462        }
463
464        run.updated_at = Utc::now();
465        self.runs.write().await.insert(run_id.to_string(), run);
466        Ok(true)
467    }
468
469    /// Retrieve the current state of a run.
470    pub async fn get_run(&self, run_id: &str) -> Option<WorkflowRun> {
471        self.runs.read().await.get(run_id).cloned()
472    }
473
474    /// List all runs for a given workflow id.
475    pub async fn list_runs(&self, workflow_id: &str) -> Vec<WorkflowRun> {
476        self.runs
477            .read()
478            .await
479            .values()
480            .filter(|r| r.workflow_id == workflow_id)
481            .cloned()
482            .collect()
483    }
484
485    /// Pause a running workflow run.
486    pub async fn pause(&self, run_id: &str) -> Result<(), String> {
487        let mut runs = self.runs.write().await;
488        let run = runs
489            .get_mut(run_id)
490            .ok_or_else(|| format!("run {run_id} not found"))?;
491        if run.status != RunStatus::Running {
492            return Err(format!(
493                "run {run_id} is not running (status: {:?})",
494                run.status
495            ));
496        }
497        run.status = RunStatus::Paused;
498        run.updated_at = Utc::now();
499        Ok(())
500    }
501
502    /// Resume a paused workflow run.
503    pub async fn resume(&self, run_id: &str) -> Result<(), String> {
504        let mut runs = self.runs.write().await;
505        let run = runs
506            .get_mut(run_id)
507            .ok_or_else(|| format!("run {run_id} not found"))?;
508        if run.status != RunStatus::Paused {
509            return Err(format!(
510                "run {run_id} is not paused (status: {:?})",
511                run.status
512            ));
513        }
514        run.status = RunStatus::Running;
515        run.updated_at = Utc::now();
516        Ok(())
517    }
518
519    /// Run a workflow to completion (advance until done). Returns the final run state.
520    ///
521    /// Applies an internal safety limit of 1000 iterations.
522    pub async fn run_to_completion(&self, run_id: &str) -> Result<WorkflowRun, String> {
523        let mut iterations = 0u32;
524        loop {
525            let advanced = self.advance(run_id).await?;
526            if !advanced {
527                break;
528            }
529            iterations += 1;
530            if iterations > 1000 {
531                return Err("workflow exceeded 1000 iterations — possible infinite loop".into());
532            }
533        }
534        self.get_run(run_id)
535            .await
536            .ok_or_else(|| format!("run {run_id} disappeared"))
537    }
538}
539
540impl Default for WorkflowEngine {
541    fn default() -> Self {
542        Self::new()
543    }
544}
545
546// ---------------------------------------------------------------------------
547// Condition / expression evaluation helpers
548// ---------------------------------------------------------------------------
549
550/// Decide whether a step should execute based on its condition.
551fn evaluate_condition(condition: &Option<StepCondition>, run: &WorkflowRun) -> bool {
552    match condition {
553        None | Some(StepCondition::Always) => true,
554        Some(StepCondition::IfPreviousSucceeded) => run
555            .step_results
556            .last()
557            .map(|r| r.status == StepStatus::Completed)
558            .unwrap_or(true),
559        Some(StepCondition::IfFieldEquals { field, value }) => {
560            extract_field(&run.trigger_data, field)
561                .and_then(|v| v.as_str().map(std::string::ToString::to_string))
562                .map(|v| v == *value)
563                .unwrap_or(false)
564        }
565        Some(StepCondition::IfScoreAbove { field, threshold }) => {
566            extract_field(&run.trigger_data, field)
567                .and_then(serde_json::Value::as_f64)
568                .map(|v| v > *threshold)
569                .unwrap_or(false)
570        }
571        Some(StepCondition::Expression(expr)) => evaluate_expression(expr, run),
572    }
573}
574
575/// Very small expression evaluator. Supports:
576/// - `"true"` / `"false"` literals
577/// - `"field_name == value"` (string equality from trigger_data or last step output)
578/// - `"field_name > number"` (numeric comparison)
579///
580/// For anything more complex, callers should implement a custom evaluator.
581fn evaluate_expression(expr: &str, run: &WorkflowRun) -> bool {
582    let expr = expr.trim();
583    if expr.eq_ignore_ascii_case("true") {
584        return true;
585    }
586    if expr.eq_ignore_ascii_case("false") {
587        return false;
588    }
589
590    // Try `field == value`
591    if let Some((lhs, rhs)) = expr.split_once("==") {
592        let field = lhs.trim();
593        let expected = rhs.trim().trim_matches('"').trim_matches('\'');
594        let actual = resolve_field(field, run);
595        return actual.as_deref() == Some(expected);
596    }
597
598    // Try `field > number`
599    if let Some((lhs, rhs)) = expr.split_once('>') {
600        let field = lhs.trim();
601        let threshold: f64 = match rhs.trim().parse() {
602            Ok(v) => v,
603            Err(_) => return false,
604        };
605        let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
606            Some(v) => v,
607            None => return false,
608        };
609        return actual > threshold;
610    }
611
612    // Try `field < number`
613    if let Some((lhs, rhs)) = expr.split_once('<') {
614        let field = lhs.trim();
615        let threshold: f64 = match rhs.trim().parse() {
616            Ok(v) => v,
617            Err(_) => return false,
618        };
619        let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
620            Some(v) => v,
621            None => return false,
622        };
623        return actual < threshold;
624    }
625
626    false
627}
628
629/// Resolve a dotted field name from trigger_data or the latest step output.
630fn resolve_field(field: &str, run: &WorkflowRun) -> Option<String> {
631    // First try trigger_data.
632    if let Some(v) = extract_field(&run.trigger_data, field) {
633        return value_to_string(v);
634    }
635    // Then try the latest step output (walk backwards).
636    for result in run.step_results.iter().rev() {
637        if let Some(v) = extract_field(&result.output, field) {
638            return value_to_string(v);
639        }
640    }
641    None
642}
643
644/// Walk a dotted path (`a.b.c`) inside a JSON value.
645fn extract_field<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
646    let mut current = value;
647    for segment in path.split('.') {
648        current = current.get(segment)?;
649    }
650    Some(current)
651}
652
653/// Convert a JSON value to a string for comparison purposes.
654fn value_to_string(v: &serde_json::Value) -> Option<String> {
655    match v {
656        serde_json::Value::String(s) => Some(s.clone()),
657        serde_json::Value::Number(n) => Some(n.to_string()),
658        serde_json::Value::Bool(b) => Some(b.to_string()),
659        _ => Some(v.to_string()),
660    }
661}
662
663// ---------------------------------------------------------------------------
664// Step execution (simulated — real HTTP / agent dispatch is out of scope)
665// ---------------------------------------------------------------------------
666
667/// Simulate execution of a step. In production this would dispatch to an
668/// agent runner, make HTTP calls, etc.
669fn execute_step(step: &WorkflowStepDef, _run: &WorkflowRun) -> StepResult {
670    match &step.step_type {
671        StepType::AgentTask {
672            agent_role,
673            prompt_template,
674        } => StepResult {
675            step_id: step.id.clone(),
676            status: StepStatus::Completed,
677            output: serde_json::json!({
678                "agent_role": agent_role,
679                "prompt": prompt_template,
680                "response": format!("Simulated response from {agent_role} agent"),
681            }),
682            duration_ms: 0,
683            error: None,
684        },
685        StepType::HttpCall {
686            method,
687            url,
688            body_template,
689        } => StepResult {
690            step_id: step.id.clone(),
691            status: StepStatus::Completed,
692            output: serde_json::json!({
693                "method": method,
694                "url": url,
695                "body": body_template,
696                "status_code": 200,
697                "response_body": "{}",
698            }),
699            duration_ms: 0,
700            error: None,
701        },
702        StepType::Condition { expression, .. } => StepResult {
703            step_id: step.id.clone(),
704            status: StepStatus::Completed,
705            output: serde_json::json!({
706                "evaluated_expression": expression,
707            }),
708            duration_ms: 0,
709            error: None,
710        },
711        StepType::Delay { seconds } => StepResult {
712            step_id: step.id.clone(),
713            status: StepStatus::Completed,
714            output: serde_json::json!({ "delayed_seconds": seconds }),
715            duration_ms: 0,
716            error: None,
717        },
718        StepType::Notification {
719            channel,
720            message_template,
721        } => StepResult {
722            step_id: step.id.clone(),
723            status: StepStatus::Completed,
724            output: serde_json::json!({
725                "channel": channel,
726                "message": message_template,
727                "sent": true,
728            }),
729            duration_ms: 0,
730            error: None,
731        },
732        StepType::AssignToHuman { team, message } => StepResult {
733            step_id: step.id.clone(),
734            status: StepStatus::Completed,
735            output: serde_json::json!({
736                "team": team,
737                "message": message,
738                "assigned": true,
739            }),
740            duration_ms: 0,
741            error: None,
742        },
743    }
744}
745
746// ---------------------------------------------------------------------------
747// Pre-built workflow templates
748// ---------------------------------------------------------------------------
749
750/// Pre-built lead qualification workflow.
751///
752/// Pipeline: Webhook → qualify lead → if HOT: assign to sales → compose
753/// outreach → schedule follow-up.
754pub fn lead_qualification_workflow() -> WorkflowDefinition {
755    WorkflowDefinition {
756        id: "lead_qualification".to_string(),
757        name: "Lead Qualification Pipeline".to_string(),
758        description: "Qualifies incoming leads and routes hot prospects to sales.".to_string(),
759        trigger: WorkflowTrigger::Webhook {
760            event: "new_lead".to_string(),
761        },
762        timeout_seconds: Some(3600),
763        steps: vec![
764            WorkflowStepDef {
765                id: "qualify".to_string(),
766                name: "Qualify Lead".to_string(),
767                step_type: StepType::AgentTask {
768                    agent_role: "analyst".to_string(),
769                    prompt_template: "Analyze the following lead data and classify as HOT, WARM, or COLD: {{lead_data}}".to_string(),
770                },
771                condition: None,
772                on_failure: FailureAction::Abort,
773                timeout_seconds: Some(120),
774            },
775            WorkflowStepDef {
776                id: "check_hot".to_string(),
777                name: "Check if Lead is HOT".to_string(),
778                step_type: StepType::Condition {
779                    expression: "score > 80".to_string(),
780                    if_true: "assign_sales".to_string(),
781                    if_false: "notify_marketing".to_string(),
782                },
783                condition: Some(StepCondition::IfPreviousSucceeded),
784                on_failure: FailureAction::Abort,
785                timeout_seconds: None,
786            },
787            WorkflowStepDef {
788                id: "assign_sales".to_string(),
789                name: "Assign to Sales Team".to_string(),
790                step_type: StepType::AssignToHuman {
791                    team: "sales".to_string(),
792                    message: "New HOT lead requires immediate follow-up.".to_string(),
793                },
794                condition: None,
795                on_failure: FailureAction::Retry { max: 2 },
796                timeout_seconds: Some(60),
797            },
798            WorkflowStepDef {
799                id: "compose_outreach".to_string(),
800                name: "Compose Outreach Email".to_string(),
801                step_type: StepType::AgentTask {
802                    agent_role: "copywriter".to_string(),
803                    prompt_template: "Draft a personalized outreach email for this lead: {{lead_data}}".to_string(),
804                },
805                condition: Some(StepCondition::IfPreviousSucceeded),
806                on_failure: FailureAction::Skip,
807                timeout_seconds: Some(180),
808            },
809            WorkflowStepDef {
810                id: "schedule_followup".to_string(),
811                name: "Schedule Follow-up".to_string(),
812                step_type: StepType::Notification {
813                    channel: "calendar".to_string(),
814                    message_template: "Follow-up with lead {{lead_name}} in 48 hours.".to_string(),
815                },
816                condition: Some(StepCondition::IfPreviousSucceeded),
817                on_failure: FailureAction::Skip,
818                timeout_seconds: Some(30),
819            },
820            WorkflowStepDef {
821                id: "notify_marketing".to_string(),
822                name: "Notify Marketing (warm/cold lead)".to_string(),
823                step_type: StepType::Notification {
824                    channel: "slack".to_string(),
825                    message_template: "New lead classified as warm/cold — added to nurture campaign.".to_string(),
826                },
827                condition: None,
828                on_failure: FailureAction::Skip,
829                timeout_seconds: Some(30),
830            },
831        ],
832    }
833}
834
835/// Pre-built support ticket workflow.
836///
837/// Pipeline: Webhook → route ticket → if urgent: notify team → generate
838/// response → quality check → if low quality: assign to human.
839pub fn support_ticket_workflow() -> WorkflowDefinition {
840    WorkflowDefinition {
841        id: "support_ticket".to_string(),
842        name: "Support Ticket Pipeline".to_string(),
843        description: "Routes, triages, and responds to support tickets.".to_string(),
844        trigger: WorkflowTrigger::Webhook {
845            event: "new_ticket".to_string(),
846        },
847        timeout_seconds: Some(1800),
848        steps: vec![
849            WorkflowStepDef {
850                id: "route".to_string(),
851                name: "Route Ticket".to_string(),
852                step_type: StepType::AgentTask {
853                    agent_role: "router".to_string(),
854                    prompt_template: "Classify this support ticket by urgency (critical/high/medium/low) and category: {{ticket}}".to_string(),
855                },
856                condition: None,
857                on_failure: FailureAction::Abort,
858                timeout_seconds: Some(60),
859            },
860            WorkflowStepDef {
861                id: "check_urgent".to_string(),
862                name: "Check Urgency".to_string(),
863                step_type: StepType::Condition {
864                    expression: "priority == critical".to_string(),
865                    if_true: "notify_team".to_string(),
866                    if_false: "generate_response".to_string(),
867                },
868                condition: Some(StepCondition::IfPreviousSucceeded),
869                on_failure: FailureAction::Abort,
870                timeout_seconds: None,
871            },
872            WorkflowStepDef {
873                id: "notify_team".to_string(),
874                name: "Notify On-Call Team".to_string(),
875                step_type: StepType::Notification {
876                    channel: "pagerduty".to_string(),
877                    message_template: "CRITICAL ticket requires immediate attention: {{ticket_id}}".to_string(),
878                },
879                condition: None,
880                on_failure: FailureAction::Retry { max: 3 },
881                timeout_seconds: Some(30),
882            },
883            WorkflowStepDef {
884                id: "generate_response".to_string(),
885                name: "Generate Response".to_string(),
886                step_type: StepType::AgentTask {
887                    agent_role: "support".to_string(),
888                    prompt_template: "Generate a helpful response for this support ticket: {{ticket}}".to_string(),
889                },
890                condition: None,
891                on_failure: FailureAction::Retry { max: 2 },
892                timeout_seconds: Some(120),
893            },
894            WorkflowStepDef {
895                id: "quality_check".to_string(),
896                name: "Quality Check".to_string(),
897                step_type: StepType::AgentTask {
898                    agent_role: "reviewer".to_string(),
899                    prompt_template: "Review this support response for accuracy and tone. Score 0-100: {{response}}".to_string(),
900                },
901                condition: Some(StepCondition::IfPreviousSucceeded),
902                on_failure: FailureAction::Skip,
903                timeout_seconds: Some(60),
904            },
905            WorkflowStepDef {
906                id: "check_quality".to_string(),
907                name: "Check Quality Score".to_string(),
908                step_type: StepType::Condition {
909                    expression: "quality_score > 70".to_string(),
910                    if_true: "send_response".to_string(),
911                    if_false: "assign_human".to_string(),
912                },
913                condition: Some(StepCondition::IfPreviousSucceeded),
914                on_failure: FailureAction::Abort,
915                timeout_seconds: None,
916            },
917            WorkflowStepDef {
918                id: "send_response".to_string(),
919                name: "Send Response to Customer".to_string(),
920                step_type: StepType::Notification {
921                    channel: "email".to_string(),
922                    message_template: "Your support request has been addressed: {{response}}".to_string(),
923                },
924                condition: None,
925                on_failure: FailureAction::Retry { max: 2 },
926                timeout_seconds: Some(30),
927            },
928            WorkflowStepDef {
929                id: "assign_human".to_string(),
930                name: "Assign to Human Agent".to_string(),
931                step_type: StepType::AssignToHuman {
932                    team: "support_l2".to_string(),
933                    message: "AI-generated response did not meet quality threshold — please handle manually.".to_string(),
934                },
935                condition: None,
936                on_failure: FailureAction::Abort,
937                timeout_seconds: Some(60),
938            },
939        ],
940    }
941}
942
943// ===========================================================================
944// Tests
945// ===========================================================================
946
947#[cfg(test)]
948#[allow(clippy::unwrap_used, clippy::expect_used)]
949mod tests {
950    use super::*;
951
952    // --- Helpers -----------------------------------------------------------
953
954    fn simple_workflow(steps: Vec<WorkflowStepDef>) -> WorkflowDefinition {
955        WorkflowDefinition {
956            id: "test_wf".to_string(),
957            name: "Test Workflow".to_string(),
958            description: "A test workflow".to_string(),
959            trigger: WorkflowTrigger::Manual,
960            steps,
961            timeout_seconds: None,
962        }
963    }
964
965    fn agent_step(id: &str, name: &str) -> WorkflowStepDef {
966        WorkflowStepDef {
967            id: id.to_string(),
968            name: name.to_string(),
969            step_type: StepType::AgentTask {
970                agent_role: "tester".to_string(),
971                prompt_template: "Do something".to_string(),
972            },
973            condition: None,
974            on_failure: FailureAction::Abort,
975            timeout_seconds: None,
976        }
977    }
978
979    // --- Engine basics -----------------------------------------------------
980
981    #[tokio::test]
982    async fn test_engine_new() {
983        let engine = WorkflowEngine::new();
984        assert!(engine.workflows.read().await.is_empty());
985        assert!(engine.runs.read().await.is_empty());
986    }
987
988    #[tokio::test]
989    async fn test_engine_default() {
990        let engine = WorkflowEngine::default();
991        assert!(engine.workflows.read().await.is_empty());
992    }
993
994    #[tokio::test]
995    async fn test_register_workflow() {
996        let engine = WorkflowEngine::new();
997        let wf = simple_workflow(vec![agent_step("s1", "Step 1")]);
998        engine.register_workflow(wf).await;
999        assert!(engine.get_workflow("test_wf").await.is_some());
1000    }
1001
1002    #[tokio::test]
1003    async fn test_register_workflow_overwrite() {
1004        let engine = WorkflowEngine::new();
1005        let wf1 = simple_workflow(vec![agent_step("s1", "Step 1")]);
1006        engine.register_workflow(wf1).await;
1007
1008        let mut wf2 = simple_workflow(vec![agent_step("s1", "Step 1"), agent_step("s2", "Step 2")]);
1009        wf2.id = "test_wf".to_string();
1010        engine.register_workflow(wf2).await;
1011
1012        let wf = engine.get_workflow("test_wf").await.unwrap();
1013        assert_eq!(wf.steps.len(), 2);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_get_workflow_missing() {
1018        let engine = WorkflowEngine::new();
1019        assert!(engine.get_workflow("nope").await.is_none());
1020    }
1021
1022    // --- Start / get_run ---------------------------------------------------
1023
1024    #[tokio::test]
1025    async fn test_start_returns_run_id() {
1026        let engine = WorkflowEngine::new();
1027        engine
1028            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1029            .await;
1030        let run_id = engine
1031            .start("test_wf", serde_json::json!({}))
1032            .await
1033            .unwrap();
1034        assert!(!run_id.is_empty());
1035    }
1036
1037    #[tokio::test]
1038    async fn test_start_unknown_workflow_returns_none() {
1039        let engine = WorkflowEngine::new();
1040        assert!(engine
1041            .start("no_such", serde_json::json!({}))
1042            .await
1043            .is_none());
1044    }
1045
1046    #[tokio::test]
1047    async fn test_get_run_initial_state() {
1048        let engine = WorkflowEngine::new();
1049        engine
1050            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1051            .await;
1052        let run_id = engine
1053            .start("test_wf", serde_json::json!({"key": "val"}))
1054            .await
1055            .unwrap();
1056        let run = engine.get_run(&run_id).await.unwrap();
1057        assert_eq!(run.status, RunStatus::Pending);
1058        assert_eq!(run.current_step_index, 0);
1059        assert_eq!(run.trigger_data["key"], "val");
1060        assert!(run.step_results.is_empty());
1061    }
1062
1063    #[tokio::test]
1064    async fn test_get_run_missing() {
1065        let engine = WorkflowEngine::new();
1066        assert!(engine.get_run("nope").await.is_none());
1067    }
1068
1069    // --- Advance -----------------------------------------------------------
1070
1071    #[tokio::test]
1072    async fn test_advance_single_step() {
1073        let engine = WorkflowEngine::new();
1074        engine
1075            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1076            .await;
1077        let run_id = engine
1078            .start("test_wf", serde_json::json!({}))
1079            .await
1080            .unwrap();
1081
1082        let advanced = engine.advance(&run_id).await.unwrap();
1083        assert!(advanced);
1084
1085        let run = engine.get_run(&run_id).await.unwrap();
1086        assert_eq!(run.status, RunStatus::Completed);
1087        assert_eq!(run.step_results.len(), 1);
1088        assert_eq!(run.step_results[0].status, StepStatus::Completed);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_advance_multi_step() {
1093        let engine = WorkflowEngine::new();
1094        engine
1095            .register_workflow(simple_workflow(vec![
1096                agent_step("s1", "Step 1"),
1097                agent_step("s2", "Step 2"),
1098                agent_step("s3", "Step 3"),
1099            ]))
1100            .await;
1101        let run_id = engine
1102            .start("test_wf", serde_json::json!({}))
1103            .await
1104            .unwrap();
1105
1106        // Advance first step.
1107        assert!(engine.advance(&run_id).await.unwrap());
1108        let run = engine.get_run(&run_id).await.unwrap();
1109        assert_eq!(run.status, RunStatus::Running);
1110        assert_eq!(run.step_results.len(), 1);
1111
1112        // Advance second.
1113        assert!(engine.advance(&run_id).await.unwrap());
1114        // Advance third.
1115        assert!(engine.advance(&run_id).await.unwrap());
1116
1117        let run = engine.get_run(&run_id).await.unwrap();
1118        assert_eq!(run.status, RunStatus::Completed);
1119        assert_eq!(run.step_results.len(), 3);
1120    }
1121
1122    #[tokio::test]
1123    async fn test_advance_completed_run_returns_false() {
1124        let engine = WorkflowEngine::new();
1125        engine
1126            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1127            .await;
1128        let run_id = engine
1129            .start("test_wf", serde_json::json!({}))
1130            .await
1131            .unwrap();
1132        engine.advance(&run_id).await.unwrap();
1133        // Already completed — should return false.
1134        assert!(!engine.advance(&run_id).await.unwrap());
1135    }
1136
1137    #[tokio::test]
1138    async fn test_advance_unknown_run_errors() {
1139        let engine = WorkflowEngine::new();
1140        let result = engine.advance("nope").await;
1141        assert!(result.is_err());
1142    }
1143
1144    // --- list_runs ---------------------------------------------------------
1145
1146    #[tokio::test]
1147    async fn test_list_runs() {
1148        let engine = WorkflowEngine::new();
1149        engine
1150            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1151            .await;
1152        engine
1153            .start("test_wf", serde_json::json!({}))
1154            .await
1155            .unwrap();
1156        engine
1157            .start("test_wf", serde_json::json!({}))
1158            .await
1159            .unwrap();
1160
1161        let runs = engine.list_runs("test_wf").await;
1162        assert_eq!(runs.len(), 2);
1163    }
1164
1165    #[tokio::test]
1166    async fn test_list_runs_empty() {
1167        let engine = WorkflowEngine::new();
1168        let runs = engine.list_runs("nothing").await;
1169        assert!(runs.is_empty());
1170    }
1171
1172    // --- run_to_completion -------------------------------------------------
1173
1174    #[tokio::test]
1175    async fn test_run_to_completion() {
1176        let engine = WorkflowEngine::new();
1177        engine
1178            .register_workflow(simple_workflow(vec![
1179                agent_step("s1", "Step 1"),
1180                agent_step("s2", "Step 2"),
1181            ]))
1182            .await;
1183        let run_id = engine
1184            .start("test_wf", serde_json::json!({}))
1185            .await
1186            .unwrap();
1187        let run = engine.run_to_completion(&run_id).await.unwrap();
1188        assert_eq!(run.status, RunStatus::Completed);
1189        assert_eq!(run.step_results.len(), 2);
1190    }
1191
1192    // --- Pause / Resume ----------------------------------------------------
1193
1194    #[tokio::test]
1195    async fn test_pause_resume() {
1196        let engine = WorkflowEngine::new();
1197        engine
1198            .register_workflow(simple_workflow(vec![
1199                agent_step("s1", "Step 1"),
1200                agent_step("s2", "Step 2"),
1201            ]))
1202            .await;
1203        let run_id = engine
1204            .start("test_wf", serde_json::json!({}))
1205            .await
1206            .unwrap();
1207
1208        // Advance once to transition to Running.
1209        engine.advance(&run_id).await.unwrap();
1210        let run = engine.get_run(&run_id).await.unwrap();
1211        assert_eq!(run.status, RunStatus::Running);
1212
1213        // Pause.
1214        engine.pause(&run_id).await.unwrap();
1215        let run = engine.get_run(&run_id).await.unwrap();
1216        assert_eq!(run.status, RunStatus::Paused);
1217
1218        // Resume.
1219        engine.resume(&run_id).await.unwrap();
1220        let run = engine.get_run(&run_id).await.unwrap();
1221        assert_eq!(run.status, RunStatus::Running);
1222    }
1223
1224    #[tokio::test]
1225    async fn test_pause_non_running_fails() {
1226        let engine = WorkflowEngine::new();
1227        engine
1228            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1229            .await;
1230        let run_id = engine
1231            .start("test_wf", serde_json::json!({}))
1232            .await
1233            .unwrap();
1234        // Run is Pending, not Running.
1235        assert!(engine.pause(&run_id).await.is_err());
1236    }
1237
1238    #[tokio::test]
1239    async fn test_resume_non_paused_fails() {
1240        let engine = WorkflowEngine::new();
1241        engine
1242            .register_workflow(simple_workflow(vec![
1243                agent_step("s1", "Step 1"),
1244                agent_step("s2", "Step 2"),
1245            ]))
1246            .await;
1247        let run_id = engine
1248            .start("test_wf", serde_json::json!({}))
1249            .await
1250            .unwrap();
1251        engine.advance(&run_id).await.unwrap();
1252        // Run is Running, not Paused.
1253        assert!(engine.resume(&run_id).await.is_err());
1254    }
1255
1256    // --- Conditions --------------------------------------------------------
1257
1258    #[tokio::test]
1259    async fn test_condition_if_previous_succeeded() {
1260        let engine = WorkflowEngine::new();
1261        let mut step2 = agent_step("s2", "Step 2");
1262        step2.condition = Some(StepCondition::IfPreviousSucceeded);
1263        engine
1264            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1"), step2]))
1265            .await;
1266        let run_id = engine
1267            .start("test_wf", serde_json::json!({}))
1268            .await
1269            .unwrap();
1270        let run = engine.run_to_completion(&run_id).await.unwrap();
1271        assert_eq!(run.status, RunStatus::Completed);
1272        assert_eq!(run.step_results.len(), 2);
1273        assert_eq!(run.step_results[1].status, StepStatus::Completed);
1274    }
1275
1276    #[tokio::test]
1277    async fn test_condition_if_field_equals() {
1278        let engine = WorkflowEngine::new();
1279        let mut step = agent_step("s1", "Conditional Step");
1280        step.condition = Some(StepCondition::IfFieldEquals {
1281            field: "tier".to_string(),
1282            value: "gold".to_string(),
1283        });
1284        engine.register_workflow(simple_workflow(vec![step])).await;
1285
1286        // Trigger data matches.
1287        let run_id = engine
1288            .start("test_wf", serde_json::json!({"tier": "gold"}))
1289            .await
1290            .unwrap();
1291        let run = engine.run_to_completion(&run_id).await.unwrap();
1292        assert_eq!(run.step_results[0].status, StepStatus::Completed);
1293    }
1294
1295    #[tokio::test]
1296    async fn test_condition_if_field_equals_mismatch_skips() {
1297        let engine = WorkflowEngine::new();
1298        let mut step = agent_step("s1", "Conditional Step");
1299        step.condition = Some(StepCondition::IfFieldEquals {
1300            field: "tier".to_string(),
1301            value: "gold".to_string(),
1302        });
1303        engine.register_workflow(simple_workflow(vec![step])).await;
1304
1305        let run_id = engine
1306            .start("test_wf", serde_json::json!({"tier": "silver"}))
1307            .await
1308            .unwrap();
1309        let run = engine.run_to_completion(&run_id).await.unwrap();
1310        assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1311    }
1312
1313    #[tokio::test]
1314    async fn test_condition_if_score_above() {
1315        let engine = WorkflowEngine::new();
1316        let mut step = agent_step("s1", "High Score Step");
1317        step.condition = Some(StepCondition::IfScoreAbove {
1318            field: "score".to_string(),
1319            threshold: 50.0,
1320        });
1321        engine.register_workflow(simple_workflow(vec![step])).await;
1322
1323        let run_id = engine
1324            .start("test_wf", serde_json::json!({"score": 80}))
1325            .await
1326            .unwrap();
1327        let run = engine.run_to_completion(&run_id).await.unwrap();
1328        assert_eq!(run.step_results[0].status, StepStatus::Completed);
1329    }
1330
1331    #[tokio::test]
1332    async fn test_condition_if_score_below_skips() {
1333        let engine = WorkflowEngine::new();
1334        let mut step = agent_step("s1", "High Score Step");
1335        step.condition = Some(StepCondition::IfScoreAbove {
1336            field: "score".to_string(),
1337            threshold: 50.0,
1338        });
1339        engine.register_workflow(simple_workflow(vec![step])).await;
1340
1341        let run_id = engine
1342            .start("test_wf", serde_json::json!({"score": 30}))
1343            .await
1344            .unwrap();
1345        let run = engine.run_to_completion(&run_id).await.unwrap();
1346        assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1347    }
1348
1349    // --- Step types --------------------------------------------------------
1350
1351    #[tokio::test]
1352    async fn test_delay_step() {
1353        let engine = WorkflowEngine::new();
1354        let wf = simple_workflow(vec![WorkflowStepDef {
1355            id: "d1".to_string(),
1356            name: "Delay".to_string(),
1357            step_type: StepType::Delay { seconds: 5 },
1358            condition: None,
1359            on_failure: FailureAction::Abort,
1360            timeout_seconds: None,
1361        }]);
1362        engine.register_workflow(wf).await;
1363        let run_id = engine
1364            .start("test_wf", serde_json::json!({}))
1365            .await
1366            .unwrap();
1367        let run = engine.run_to_completion(&run_id).await.unwrap();
1368        assert_eq!(run.step_results[0].output["delayed_seconds"], 5);
1369    }
1370
1371    #[tokio::test]
1372    async fn test_notification_step() {
1373        let engine = WorkflowEngine::new();
1374        let wf = simple_workflow(vec![WorkflowStepDef {
1375            id: "n1".to_string(),
1376            name: "Notify".to_string(),
1377            step_type: StepType::Notification {
1378                channel: "slack".to_string(),
1379                message_template: "Hello!".to_string(),
1380            },
1381            condition: None,
1382            on_failure: FailureAction::Abort,
1383            timeout_seconds: None,
1384        }]);
1385        engine.register_workflow(wf).await;
1386        let run_id = engine
1387            .start("test_wf", serde_json::json!({}))
1388            .await
1389            .unwrap();
1390        let run = engine.run_to_completion(&run_id).await.unwrap();
1391        assert_eq!(run.step_results[0].output["sent"], true);
1392    }
1393
1394    #[tokio::test]
1395    async fn test_assign_to_human_step() {
1396        let engine = WorkflowEngine::new();
1397        let wf = simple_workflow(vec![WorkflowStepDef {
1398            id: "h1".to_string(),
1399            name: "Human".to_string(),
1400            step_type: StepType::AssignToHuman {
1401                team: "ops".to_string(),
1402                message: "Please handle".to_string(),
1403            },
1404            condition: None,
1405            on_failure: FailureAction::Abort,
1406            timeout_seconds: None,
1407        }]);
1408        engine.register_workflow(wf).await;
1409        let run_id = engine
1410            .start("test_wf", serde_json::json!({}))
1411            .await
1412            .unwrap();
1413        let run = engine.run_to_completion(&run_id).await.unwrap();
1414        assert_eq!(run.step_results[0].output["assigned"], true);
1415    }
1416
1417    #[tokio::test]
1418    async fn test_http_call_step() {
1419        let engine = WorkflowEngine::new();
1420        let wf = simple_workflow(vec![WorkflowStepDef {
1421            id: "http1".to_string(),
1422            name: "HTTP".to_string(),
1423            step_type: StepType::HttpCall {
1424                method: "POST".to_string(),
1425                url: "https://api.example.com/hook".to_string(),
1426                body_template: Some(r#"{"data": "{{payload}}"}"#.to_string()),
1427            },
1428            condition: None,
1429            on_failure: FailureAction::Abort,
1430            timeout_seconds: None,
1431        }]);
1432        engine.register_workflow(wf).await;
1433        let run_id = engine
1434            .start("test_wf", serde_json::json!({}))
1435            .await
1436            .unwrap();
1437        let run = engine.run_to_completion(&run_id).await.unwrap();
1438        assert_eq!(run.step_results[0].output["status_code"], 200);
1439    }
1440
1441    // --- Failure actions ---------------------------------------------------
1442
1443    #[tokio::test]
1444    async fn test_failure_action_skip() {
1445        let engine = WorkflowEngine::new();
1446        // We need a step that actually fails. We'll create a workflow and
1447        // manually inject a failed result to test Skip behavior.
1448        // Instead, we test the Skip path via a condition-skipped step.
1449        let mut step1 = agent_step("s1", "Step 1");
1450        step1.condition = Some(StepCondition::IfFieldEquals {
1451            field: "x".to_string(),
1452            value: "impossible".to_string(),
1453        });
1454        step1.on_failure = FailureAction::Skip;
1455        let step2 = agent_step("s2", "Step 2");
1456        engine
1457            .register_workflow(simple_workflow(vec![step1, step2]))
1458            .await;
1459        let run_id = engine
1460            .start("test_wf", serde_json::json!({}))
1461            .await
1462            .unwrap();
1463        let run = engine.run_to_completion(&run_id).await.unwrap();
1464        assert_eq!(run.status, RunStatus::Completed);
1465        // First step skipped, second completed.
1466        assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1467        assert_eq!(run.step_results[1].status, StepStatus::Completed);
1468    }
1469
1470    // --- Expression evaluator ----------------------------------------------
1471
1472    #[tokio::test]
1473    async fn test_expression_true_literal() {
1474        let run = WorkflowRun {
1475            run_id: "r1".into(),
1476            workflow_id: "w1".into(),
1477            status: RunStatus::Running,
1478            current_step_index: 0,
1479            trigger_data: serde_json::json!({}),
1480            step_results: vec![],
1481            created_at: Utc::now(),
1482            updated_at: Utc::now(),
1483        };
1484        assert!(evaluate_expression("true", &run));
1485        assert!(!evaluate_expression("false", &run));
1486    }
1487
1488    #[tokio::test]
1489    async fn test_expression_field_equals() {
1490        let run = WorkflowRun {
1491            run_id: "r1".into(),
1492            workflow_id: "w1".into(),
1493            status: RunStatus::Running,
1494            current_step_index: 0,
1495            trigger_data: serde_json::json!({"priority": "critical"}),
1496            step_results: vec![],
1497            created_at: Utc::now(),
1498            updated_at: Utc::now(),
1499        };
1500        assert!(evaluate_expression("priority == critical", &run));
1501        assert!(!evaluate_expression("priority == low", &run));
1502    }
1503
1504    #[tokio::test]
1505    async fn test_expression_numeric_comparison() {
1506        let run = WorkflowRun {
1507            run_id: "r1".into(),
1508            workflow_id: "w1".into(),
1509            status: RunStatus::Running,
1510            current_step_index: 0,
1511            trigger_data: serde_json::json!({"score": 85}),
1512            step_results: vec![],
1513            created_at: Utc::now(),
1514            updated_at: Utc::now(),
1515        };
1516        assert!(evaluate_expression("score > 80", &run));
1517        assert!(!evaluate_expression("score > 90", &run));
1518        assert!(evaluate_expression("score < 90", &run));
1519        assert!(!evaluate_expression("score < 80", &run));
1520    }
1521
1522    // --- Templates ---------------------------------------------------------
1523
1524    #[tokio::test]
1525    async fn test_lead_qualification_template() {
1526        let wf = lead_qualification_workflow();
1527        assert_eq!(wf.id, "lead_qualification");
1528        assert!(!wf.steps.is_empty());
1529        assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
1530        assert!(wf.timeout_seconds.is_some());
1531
1532        // Verify all step ids are unique.
1533        let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
1534        let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
1535        assert_eq!(ids.len(), unique.len());
1536    }
1537
1538    #[tokio::test]
1539    async fn test_support_ticket_template() {
1540        let wf = support_ticket_workflow();
1541        assert_eq!(wf.id, "support_ticket");
1542        assert!(!wf.steps.is_empty());
1543        assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
1544
1545        let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
1546        let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
1547        assert_eq!(ids.len(), unique.len());
1548    }
1549
1550    #[tokio::test]
1551    async fn test_lead_qualification_run_to_completion() {
1552        let engine = WorkflowEngine::new();
1553        engine
1554            .register_workflow(lead_qualification_workflow())
1555            .await;
1556        let run_id = engine
1557            .start(
1558                "lead_qualification",
1559                serde_json::json!({"lead_name": "Acme Corp", "score": 90}),
1560            )
1561            .await
1562            .unwrap();
1563        let run = engine.run_to_completion(&run_id).await.unwrap();
1564        // Should complete (all steps succeed in simulation).
1565        assert!(matches!(
1566            run.status,
1567            RunStatus::Completed | RunStatus::Failed
1568        ));
1569        assert!(!run.step_results.is_empty());
1570    }
1571
1572    #[tokio::test]
1573    async fn test_support_ticket_run_to_completion() {
1574        let engine = WorkflowEngine::new();
1575        engine.register_workflow(support_ticket_workflow()).await;
1576        let run_id = engine
1577            .start(
1578                "support_ticket",
1579                serde_json::json!({"ticket": "My app crashes", "priority": "high"}),
1580            )
1581            .await
1582            .unwrap();
1583        let run = engine.run_to_completion(&run_id).await.unwrap();
1584        assert!(matches!(
1585            run.status,
1586            RunStatus::Completed | RunStatus::Failed
1587        ));
1588        assert!(!run.step_results.is_empty());
1589    }
1590
1591    // --- Serialization roundtrip -------------------------------------------
1592
1593    #[tokio::test]
1594    async fn test_workflow_definition_serde() {
1595        let wf = lead_qualification_workflow();
1596        let json = serde_json::to_string_pretty(&wf).unwrap();
1597        let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap();
1598        assert_eq!(deserialized.id, wf.id);
1599        assert_eq!(deserialized.steps.len(), wf.steps.len());
1600    }
1601
1602    #[tokio::test]
1603    async fn test_trigger_variants_serde() {
1604        let triggers = vec![
1605            WorkflowTrigger::Manual,
1606            WorkflowTrigger::Webhook {
1607                event: "push".into(),
1608            },
1609            WorkflowTrigger::Schedule {
1610                cron: "0 * * * *".into(),
1611            },
1612            WorkflowTrigger::Threshold {
1613                metric: "cpu".into(),
1614                condition: "above".into(),
1615                value: 90.0,
1616            },
1617        ];
1618        for t in &triggers {
1619            let json = serde_json::to_string(t).unwrap();
1620            let back: WorkflowTrigger = serde_json::from_str(&json).unwrap();
1621            assert_eq!(*t, back);
1622        }
1623    }
1624
1625    // --- Thread safety (clone + concurrent) --------------------------------
1626
1627    #[tokio::test]
1628    async fn test_engine_clone_shared_state() {
1629        let engine = WorkflowEngine::new();
1630        let engine2 = engine.clone();
1631        engine
1632            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1633            .await;
1634        // engine2 should see the same workflow.
1635        assert!(engine2.get_workflow("test_wf").await.is_some());
1636    }
1637
1638    #[tokio::test]
1639    async fn test_concurrent_starts() {
1640        let engine = WorkflowEngine::new();
1641        engine
1642            .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1643            .await;
1644
1645        let mut handles = vec![];
1646        for _ in 0..10 {
1647            let e = engine.clone();
1648            handles.push(tokio::spawn(async move {
1649                e.start("test_wf", serde_json::json!({})).await.unwrap()
1650            }));
1651        }
1652
1653        let mut ids = vec![];
1654        for h in handles {
1655            ids.push(h.await.unwrap());
1656        }
1657        // All run ids should be unique.
1658        let unique: std::collections::HashSet<&str> =
1659            ids.iter().map(std::string::String::as_str).collect();
1660        assert_eq!(unique.len(), 10);
1661    }
1662}