use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum WorkflowTrigger {
Manual,
Webhook {
event: String,
},
Schedule {
cron: String,
},
Threshold {
metric: String,
condition: String,
value: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StepType {
AgentTask {
agent_role: String,
prompt_template: String,
},
HttpCall {
method: String,
url: String,
body_template: Option<String>,
},
Condition {
expression: String,
if_true: String,
if_false: String,
},
Delay {
seconds: u64,
},
Notification {
channel: String,
message_template: String,
},
AssignToHuman {
team: String,
message: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StepCondition {
Always,
IfPreviousSucceeded,
IfFieldEquals {
field: String,
value: String,
},
IfScoreAbove {
field: String,
threshold: f64,
},
Expression(String),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum FailureAction {
Abort,
Skip,
Retry {
max: u32,
},
GoTo {
step_id: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowStepDef {
pub id: String,
pub name: String,
pub step_type: StepType,
pub condition: Option<StepCondition>,
pub on_failure: FailureAction,
pub timeout_seconds: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowDefinition {
pub id: String,
pub name: String,
pub description: String,
pub trigger: WorkflowTrigger,
pub steps: Vec<WorkflowStepDef>,
pub timeout_seconds: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum RunStatus {
Pending,
Running,
Completed,
Failed,
Paused,
TimedOut,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StepStatus {
Completed,
Failed,
Skipped,
TimedOut,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepResult {
pub step_id: String,
pub status: StepStatus,
pub output: serde_json::Value,
pub duration_ms: u64,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkflowRun {
pub run_id: String,
pub workflow_id: String,
pub status: RunStatus,
pub current_step_index: usize,
pub trigger_data: serde_json::Value,
pub step_results: Vec<StepResult>,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
#[derive(Clone)]
pub struct WorkflowEngine {
workflows: Arc<RwLock<HashMap<String, WorkflowDefinition>>>,
runs: Arc<RwLock<HashMap<String, WorkflowRun>>>,
}
impl WorkflowEngine {
pub fn new() -> Self {
Self {
workflows: Arc::new(RwLock::new(HashMap::new())),
runs: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn register_workflow(&self, workflow: WorkflowDefinition) {
let id = workflow.id.clone();
self.workflows.write().await.insert(id, workflow);
}
pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowDefinition> {
self.workflows.read().await.get(workflow_id).cloned()
}
pub async fn start(
&self,
workflow_id: &str,
trigger_data: serde_json::Value,
) -> Option<String> {
let workflows = self.workflows.read().await;
if !workflows.contains_key(workflow_id) {
return None;
}
drop(workflows);
let run_id = Uuid::new_v4().to_string();
let now = Utc::now();
let run = WorkflowRun {
run_id: run_id.clone(),
workflow_id: workflow_id.to_string(),
status: RunStatus::Pending,
current_step_index: 0,
trigger_data,
step_results: Vec::new(),
created_at: now,
updated_at: now,
};
self.runs.write().await.insert(run_id.clone(), run);
Some(run_id)
}
pub async fn advance(&self, run_id: &str) -> Result<bool, String> {
let (mut run, workflow) = {
let runs = self.runs.read().await;
let run = runs
.get(run_id)
.ok_or_else(|| format!("run {run_id} not found"))?
.clone();
let workflows = self.workflows.read().await;
let workflow = workflows
.get(&run.workflow_id)
.ok_or_else(|| format!("workflow {} not found", run.workflow_id))?
.clone();
(run, workflow)
};
if matches!(
run.status,
RunStatus::Completed | RunStatus::Failed | RunStatus::TimedOut
) {
return Ok(false);
}
if run.status == RunStatus::Pending {
run.status = RunStatus::Running;
}
if run.current_step_index >= workflow.steps.len() {
run.status = RunStatus::Completed;
run.updated_at = Utc::now();
self.runs.write().await.insert(run_id.to_string(), run);
return Ok(false);
}
let step = &workflow.steps[run.current_step_index];
let should_execute = evaluate_condition(&step.condition, &run);
let start = std::time::Instant::now();
let result = if should_execute {
execute_step(step, &run)
} else {
StepResult {
step_id: step.id.clone(),
status: StepStatus::Skipped,
output: serde_json::json!({ "skipped": true }),
duration_ms: 0,
error: None,
}
};
let duration_ms = start.elapsed().as_millis() as u64;
let result = if should_execute {
StepResult {
duration_ms,
..result
}
} else {
result
};
let mut next_index = run.current_step_index + 1;
if let StepType::Condition {
ref expression,
ref if_true,
ref if_false,
} = step.step_type
{
let branch_target = if evaluate_expression(expression, &run) {
if_true
} else {
if_false
};
if let Some(idx) = workflow.steps.iter().position(|s| s.id == *branch_target) {
next_index = idx;
}
}
if result.status == StepStatus::Failed {
match &step.on_failure {
FailureAction::Abort => {
run.step_results.push(result);
run.status = RunStatus::Failed;
run.updated_at = Utc::now();
self.runs.write().await.insert(run_id.to_string(), run);
return Ok(true);
}
FailureAction::Skip => {
}
FailureAction::Retry { max } => {
let retry_count = run
.step_results
.iter()
.filter(|r| r.step_id == step.id && r.status == StepStatus::Failed)
.count() as u32;
if retry_count < *max {
run.step_results.push(result);
run.updated_at = Utc::now();
self.runs.write().await.insert(run_id.to_string(), run);
return Ok(true);
}
run.step_results.push(result);
run.status = RunStatus::Failed;
run.updated_at = Utc::now();
self.runs.write().await.insert(run_id.to_string(), run);
return Ok(true);
}
FailureAction::GoTo { step_id } => {
if let Some(idx) = workflow.steps.iter().position(|s| s.id == *step_id) {
next_index = idx;
}
}
}
}
run.step_results.push(result);
run.current_step_index = next_index;
if run.current_step_index >= workflow.steps.len() {
run.status = RunStatus::Completed;
}
run.updated_at = Utc::now();
self.runs.write().await.insert(run_id.to_string(), run);
Ok(true)
}
pub async fn get_run(&self, run_id: &str) -> Option<WorkflowRun> {
self.runs.read().await.get(run_id).cloned()
}
pub async fn list_runs(&self, workflow_id: &str) -> Vec<WorkflowRun> {
self.runs
.read()
.await
.values()
.filter(|r| r.workflow_id == workflow_id)
.cloned()
.collect()
}
pub async fn pause(&self, run_id: &str) -> Result<(), String> {
let mut runs = self.runs.write().await;
let run = runs
.get_mut(run_id)
.ok_or_else(|| format!("run {run_id} not found"))?;
if run.status != RunStatus::Running {
return Err(format!(
"run {run_id} is not running (status: {:?})",
run.status
));
}
run.status = RunStatus::Paused;
run.updated_at = Utc::now();
Ok(())
}
pub async fn resume(&self, run_id: &str) -> Result<(), String> {
let mut runs = self.runs.write().await;
let run = runs
.get_mut(run_id)
.ok_or_else(|| format!("run {run_id} not found"))?;
if run.status != RunStatus::Paused {
return Err(format!(
"run {run_id} is not paused (status: {:?})",
run.status
));
}
run.status = RunStatus::Running;
run.updated_at = Utc::now();
Ok(())
}
pub async fn run_to_completion(&self, run_id: &str) -> Result<WorkflowRun, String> {
let mut iterations = 0u32;
loop {
let advanced = self.advance(run_id).await?;
if !advanced {
break;
}
iterations += 1;
if iterations > 1000 {
return Err("workflow exceeded 1000 iterations — possible infinite loop".into());
}
}
self.get_run(run_id)
.await
.ok_or_else(|| format!("run {run_id} disappeared"))
}
}
impl Default for WorkflowEngine {
fn default() -> Self {
Self::new()
}
}
fn evaluate_condition(condition: &Option<StepCondition>, run: &WorkflowRun) -> bool {
match condition {
None | Some(StepCondition::Always) => true,
Some(StepCondition::IfPreviousSucceeded) => run
.step_results
.last()
.map(|r| r.status == StepStatus::Completed)
.unwrap_or(true),
Some(StepCondition::IfFieldEquals { field, value }) => {
extract_field(&run.trigger_data, field)
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.map(|v| v == *value)
.unwrap_or(false)
}
Some(StepCondition::IfScoreAbove { field, threshold }) => {
extract_field(&run.trigger_data, field)
.and_then(serde_json::Value::as_f64)
.map(|v| v > *threshold)
.unwrap_or(false)
}
Some(StepCondition::Expression(expr)) => evaluate_expression(expr, run),
}
}
fn evaluate_expression(expr: &str, run: &WorkflowRun) -> bool {
let expr = expr.trim();
if expr.eq_ignore_ascii_case("true") {
return true;
}
if expr.eq_ignore_ascii_case("false") {
return false;
}
if let Some((lhs, rhs)) = expr.split_once("==") {
let field = lhs.trim();
let expected = rhs.trim().trim_matches('"').trim_matches('\'');
let actual = resolve_field(field, run);
return actual.as_deref() == Some(expected);
}
if let Some((lhs, rhs)) = expr.split_once('>') {
let field = lhs.trim();
let threshold: f64 = match rhs.trim().parse() {
Ok(v) => v,
Err(_) => return false,
};
let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
Some(v) => v,
None => return false,
};
return actual > threshold;
}
if let Some((lhs, rhs)) = expr.split_once('<') {
let field = lhs.trim();
let threshold: f64 = match rhs.trim().parse() {
Ok(v) => v,
Err(_) => return false,
};
let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
Some(v) => v,
None => return false,
};
return actual < threshold;
}
false
}
fn resolve_field(field: &str, run: &WorkflowRun) -> Option<String> {
if let Some(v) = extract_field(&run.trigger_data, field) {
return value_to_string(v);
}
for result in run.step_results.iter().rev() {
if let Some(v) = extract_field(&result.output, field) {
return value_to_string(v);
}
}
None
}
fn extract_field<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
let mut current = value;
for segment in path.split('.') {
current = current.get(segment)?;
}
Some(current)
}
fn value_to_string(v: &serde_json::Value) -> Option<String> {
match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
_ => Some(v.to_string()),
}
}
fn execute_step(step: &WorkflowStepDef, _run: &WorkflowRun) -> StepResult {
match &step.step_type {
StepType::AgentTask {
agent_role,
prompt_template,
} => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({
"agent_role": agent_role,
"prompt": prompt_template,
"response": format!("Simulated response from {agent_role} agent"),
}),
duration_ms: 0,
error: None,
},
StepType::HttpCall {
method,
url,
body_template,
} => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({
"method": method,
"url": url,
"body": body_template,
"status_code": 200,
"response_body": "{}",
}),
duration_ms: 0,
error: None,
},
StepType::Condition { expression, .. } => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({
"evaluated_expression": expression,
}),
duration_ms: 0,
error: None,
},
StepType::Delay { seconds } => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({ "delayed_seconds": seconds }),
duration_ms: 0,
error: None,
},
StepType::Notification {
channel,
message_template,
} => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({
"channel": channel,
"message": message_template,
"sent": true,
}),
duration_ms: 0,
error: None,
},
StepType::AssignToHuman { team, message } => StepResult {
step_id: step.id.clone(),
status: StepStatus::Completed,
output: serde_json::json!({
"team": team,
"message": message,
"assigned": true,
}),
duration_ms: 0,
error: None,
},
}
}
pub fn lead_qualification_workflow() -> WorkflowDefinition {
WorkflowDefinition {
id: "lead_qualification".to_string(),
name: "Lead Qualification Pipeline".to_string(),
description: "Qualifies incoming leads and routes hot prospects to sales.".to_string(),
trigger: WorkflowTrigger::Webhook {
event: "new_lead".to_string(),
},
timeout_seconds: Some(3600),
steps: vec![
WorkflowStepDef {
id: "qualify".to_string(),
name: "Qualify Lead".to_string(),
step_type: StepType::AgentTask {
agent_role: "analyst".to_string(),
prompt_template: "Analyze the following lead data and classify as HOT, WARM, or COLD: {{lead_data}}".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: Some(120),
},
WorkflowStepDef {
id: "check_hot".to_string(),
name: "Check if Lead is HOT".to_string(),
step_type: StepType::Condition {
expression: "score > 80".to_string(),
if_true: "assign_sales".to_string(),
if_false: "notify_marketing".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Abort,
timeout_seconds: None,
},
WorkflowStepDef {
id: "assign_sales".to_string(),
name: "Assign to Sales Team".to_string(),
step_type: StepType::AssignToHuman {
team: "sales".to_string(),
message: "New HOT lead requires immediate follow-up.".to_string(),
},
condition: None,
on_failure: FailureAction::Retry { max: 2 },
timeout_seconds: Some(60),
},
WorkflowStepDef {
id: "compose_outreach".to_string(),
name: "Compose Outreach Email".to_string(),
step_type: StepType::AgentTask {
agent_role: "copywriter".to_string(),
prompt_template: "Draft a personalized outreach email for this lead: {{lead_data}}".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Skip,
timeout_seconds: Some(180),
},
WorkflowStepDef {
id: "schedule_followup".to_string(),
name: "Schedule Follow-up".to_string(),
step_type: StepType::Notification {
channel: "calendar".to_string(),
message_template: "Follow-up with lead {{lead_name}} in 48 hours.".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Skip,
timeout_seconds: Some(30),
},
WorkflowStepDef {
id: "notify_marketing".to_string(),
name: "Notify Marketing (warm/cold lead)".to_string(),
step_type: StepType::Notification {
channel: "slack".to_string(),
message_template: "New lead classified as warm/cold — added to nurture campaign.".to_string(),
},
condition: None,
on_failure: FailureAction::Skip,
timeout_seconds: Some(30),
},
],
}
}
pub fn support_ticket_workflow() -> WorkflowDefinition {
WorkflowDefinition {
id: "support_ticket".to_string(),
name: "Support Ticket Pipeline".to_string(),
description: "Routes, triages, and responds to support tickets.".to_string(),
trigger: WorkflowTrigger::Webhook {
event: "new_ticket".to_string(),
},
timeout_seconds: Some(1800),
steps: vec![
WorkflowStepDef {
id: "route".to_string(),
name: "Route Ticket".to_string(),
step_type: StepType::AgentTask {
agent_role: "router".to_string(),
prompt_template: "Classify this support ticket by urgency (critical/high/medium/low) and category: {{ticket}}".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: Some(60),
},
WorkflowStepDef {
id: "check_urgent".to_string(),
name: "Check Urgency".to_string(),
step_type: StepType::Condition {
expression: "priority == critical".to_string(),
if_true: "notify_team".to_string(),
if_false: "generate_response".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Abort,
timeout_seconds: None,
},
WorkflowStepDef {
id: "notify_team".to_string(),
name: "Notify On-Call Team".to_string(),
step_type: StepType::Notification {
channel: "pagerduty".to_string(),
message_template: "CRITICAL ticket requires immediate attention: {{ticket_id}}".to_string(),
},
condition: None,
on_failure: FailureAction::Retry { max: 3 },
timeout_seconds: Some(30),
},
WorkflowStepDef {
id: "generate_response".to_string(),
name: "Generate Response".to_string(),
step_type: StepType::AgentTask {
agent_role: "support".to_string(),
prompt_template: "Generate a helpful response for this support ticket: {{ticket}}".to_string(),
},
condition: None,
on_failure: FailureAction::Retry { max: 2 },
timeout_seconds: Some(120),
},
WorkflowStepDef {
id: "quality_check".to_string(),
name: "Quality Check".to_string(),
step_type: StepType::AgentTask {
agent_role: "reviewer".to_string(),
prompt_template: "Review this support response for accuracy and tone. Score 0-100: {{response}}".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Skip,
timeout_seconds: Some(60),
},
WorkflowStepDef {
id: "check_quality".to_string(),
name: "Check Quality Score".to_string(),
step_type: StepType::Condition {
expression: "quality_score > 70".to_string(),
if_true: "send_response".to_string(),
if_false: "assign_human".to_string(),
},
condition: Some(StepCondition::IfPreviousSucceeded),
on_failure: FailureAction::Abort,
timeout_seconds: None,
},
WorkflowStepDef {
id: "send_response".to_string(),
name: "Send Response to Customer".to_string(),
step_type: StepType::Notification {
channel: "email".to_string(),
message_template: "Your support request has been addressed: {{response}}".to_string(),
},
condition: None,
on_failure: FailureAction::Retry { max: 2 },
timeout_seconds: Some(30),
},
WorkflowStepDef {
id: "assign_human".to_string(),
name: "Assign to Human Agent".to_string(),
step_type: StepType::AssignToHuman {
team: "support_l2".to_string(),
message: "AI-generated response did not meet quality threshold — please handle manually.".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: Some(60),
},
],
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
fn simple_workflow(steps: Vec<WorkflowStepDef>) -> WorkflowDefinition {
WorkflowDefinition {
id: "test_wf".to_string(),
name: "Test Workflow".to_string(),
description: "A test workflow".to_string(),
trigger: WorkflowTrigger::Manual,
steps,
timeout_seconds: None,
}
}
fn agent_step(id: &str, name: &str) -> WorkflowStepDef {
WorkflowStepDef {
id: id.to_string(),
name: name.to_string(),
step_type: StepType::AgentTask {
agent_role: "tester".to_string(),
prompt_template: "Do something".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: None,
}
}
#[tokio::test]
async fn test_engine_new() {
let engine = WorkflowEngine::new();
assert!(engine.workflows.read().await.is_empty());
assert!(engine.runs.read().await.is_empty());
}
#[tokio::test]
async fn test_engine_default() {
let engine = WorkflowEngine::default();
assert!(engine.workflows.read().await.is_empty());
}
#[tokio::test]
async fn test_register_workflow() {
let engine = WorkflowEngine::new();
let wf = simple_workflow(vec![agent_step("s1", "Step 1")]);
engine.register_workflow(wf).await;
assert!(engine.get_workflow("test_wf").await.is_some());
}
#[tokio::test]
async fn test_register_workflow_overwrite() {
let engine = WorkflowEngine::new();
let wf1 = simple_workflow(vec![agent_step("s1", "Step 1")]);
engine.register_workflow(wf1).await;
let mut wf2 = simple_workflow(vec![agent_step("s1", "Step 1"), agent_step("s2", "Step 2")]);
wf2.id = "test_wf".to_string();
engine.register_workflow(wf2).await;
let wf = engine.get_workflow("test_wf").await.unwrap();
assert_eq!(wf.steps.len(), 2);
}
#[tokio::test]
async fn test_get_workflow_missing() {
let engine = WorkflowEngine::new();
assert!(engine.get_workflow("nope").await.is_none());
}
#[tokio::test]
async fn test_start_returns_run_id() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
assert!(!run_id.is_empty());
}
#[tokio::test]
async fn test_start_unknown_workflow_returns_none() {
let engine = WorkflowEngine::new();
assert!(engine
.start("no_such", serde_json::json!({}))
.await
.is_none());
}
#[tokio::test]
async fn test_get_run_initial_state() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({"key": "val"}))
.await
.unwrap();
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Pending);
assert_eq!(run.current_step_index, 0);
assert_eq!(run.trigger_data["key"], "val");
assert!(run.step_results.is_empty());
}
#[tokio::test]
async fn test_get_run_missing() {
let engine = WorkflowEngine::new();
assert!(engine.get_run("nope").await.is_none());
}
#[tokio::test]
async fn test_advance_single_step() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let advanced = engine.advance(&run_id).await.unwrap();
assert!(advanced);
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Completed);
assert_eq!(run.step_results.len(), 1);
assert_eq!(run.step_results[0].status, StepStatus::Completed);
}
#[tokio::test]
async fn test_advance_multi_step() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![
agent_step("s1", "Step 1"),
agent_step("s2", "Step 2"),
agent_step("s3", "Step 3"),
]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
assert!(engine.advance(&run_id).await.unwrap());
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Running);
assert_eq!(run.step_results.len(), 1);
assert!(engine.advance(&run_id).await.unwrap());
assert!(engine.advance(&run_id).await.unwrap());
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Completed);
assert_eq!(run.step_results.len(), 3);
}
#[tokio::test]
async fn test_advance_completed_run_returns_false() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
engine.advance(&run_id).await.unwrap();
assert!(!engine.advance(&run_id).await.unwrap());
}
#[tokio::test]
async fn test_advance_unknown_run_errors() {
let engine = WorkflowEngine::new();
let result = engine.advance("nope").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_list_runs() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let runs = engine.list_runs("test_wf").await;
assert_eq!(runs.len(), 2);
}
#[tokio::test]
async fn test_list_runs_empty() {
let engine = WorkflowEngine::new();
let runs = engine.list_runs("nothing").await;
assert!(runs.is_empty());
}
#[tokio::test]
async fn test_run_to_completion() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![
agent_step("s1", "Step 1"),
agent_step("s2", "Step 2"),
]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Completed);
assert_eq!(run.step_results.len(), 2);
}
#[tokio::test]
async fn test_pause_resume() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![
agent_step("s1", "Step 1"),
agent_step("s2", "Step 2"),
]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
engine.advance(&run_id).await.unwrap();
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Running);
engine.pause(&run_id).await.unwrap();
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Paused);
engine.resume(&run_id).await.unwrap();
let run = engine.get_run(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Running);
}
#[tokio::test]
async fn test_pause_non_running_fails() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
assert!(engine.pause(&run_id).await.is_err());
}
#[tokio::test]
async fn test_resume_non_paused_fails() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![
agent_step("s1", "Step 1"),
agent_step("s2", "Step 2"),
]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
engine.advance(&run_id).await.unwrap();
assert!(engine.resume(&run_id).await.is_err());
}
#[tokio::test]
async fn test_condition_if_previous_succeeded() {
let engine = WorkflowEngine::new();
let mut step2 = agent_step("s2", "Step 2");
step2.condition = Some(StepCondition::IfPreviousSucceeded);
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1"), step2]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Completed);
assert_eq!(run.step_results.len(), 2);
assert_eq!(run.step_results[1].status, StepStatus::Completed);
}
#[tokio::test]
async fn test_condition_if_field_equals() {
let engine = WorkflowEngine::new();
let mut step = agent_step("s1", "Conditional Step");
step.condition = Some(StepCondition::IfFieldEquals {
field: "tier".to_string(),
value: "gold".to_string(),
});
engine.register_workflow(simple_workflow(vec![step])).await;
let run_id = engine
.start("test_wf", serde_json::json!({"tier": "gold"}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].status, StepStatus::Completed);
}
#[tokio::test]
async fn test_condition_if_field_equals_mismatch_skips() {
let engine = WorkflowEngine::new();
let mut step = agent_step("s1", "Conditional Step");
step.condition = Some(StepCondition::IfFieldEquals {
field: "tier".to_string(),
value: "gold".to_string(),
});
engine.register_workflow(simple_workflow(vec![step])).await;
let run_id = engine
.start("test_wf", serde_json::json!({"tier": "silver"}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].status, StepStatus::Skipped);
}
#[tokio::test]
async fn test_condition_if_score_above() {
let engine = WorkflowEngine::new();
let mut step = agent_step("s1", "High Score Step");
step.condition = Some(StepCondition::IfScoreAbove {
field: "score".to_string(),
threshold: 50.0,
});
engine.register_workflow(simple_workflow(vec![step])).await;
let run_id = engine
.start("test_wf", serde_json::json!({"score": 80}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].status, StepStatus::Completed);
}
#[tokio::test]
async fn test_condition_if_score_below_skips() {
let engine = WorkflowEngine::new();
let mut step = agent_step("s1", "High Score Step");
step.condition = Some(StepCondition::IfScoreAbove {
field: "score".to_string(),
threshold: 50.0,
});
engine.register_workflow(simple_workflow(vec![step])).await;
let run_id = engine
.start("test_wf", serde_json::json!({"score": 30}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].status, StepStatus::Skipped);
}
#[tokio::test]
async fn test_delay_step() {
let engine = WorkflowEngine::new();
let wf = simple_workflow(vec![WorkflowStepDef {
id: "d1".to_string(),
name: "Delay".to_string(),
step_type: StepType::Delay { seconds: 5 },
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: None,
}]);
engine.register_workflow(wf).await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].output["delayed_seconds"], 5);
}
#[tokio::test]
async fn test_notification_step() {
let engine = WorkflowEngine::new();
let wf = simple_workflow(vec![WorkflowStepDef {
id: "n1".to_string(),
name: "Notify".to_string(),
step_type: StepType::Notification {
channel: "slack".to_string(),
message_template: "Hello!".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: None,
}]);
engine.register_workflow(wf).await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].output["sent"], true);
}
#[tokio::test]
async fn test_assign_to_human_step() {
let engine = WorkflowEngine::new();
let wf = simple_workflow(vec![WorkflowStepDef {
id: "h1".to_string(),
name: "Human".to_string(),
step_type: StepType::AssignToHuman {
team: "ops".to_string(),
message: "Please handle".to_string(),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: None,
}]);
engine.register_workflow(wf).await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].output["assigned"], true);
}
#[tokio::test]
async fn test_http_call_step() {
let engine = WorkflowEngine::new();
let wf = simple_workflow(vec![WorkflowStepDef {
id: "http1".to_string(),
name: "HTTP".to_string(),
step_type: StepType::HttpCall {
method: "POST".to_string(),
url: "https://api.example.com/hook".to_string(),
body_template: Some(r#"{"data": "{{payload}}"}"#.to_string()),
},
condition: None,
on_failure: FailureAction::Abort,
timeout_seconds: None,
}]);
engine.register_workflow(wf).await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.step_results[0].output["status_code"], 200);
}
#[tokio::test]
async fn test_failure_action_skip() {
let engine = WorkflowEngine::new();
let mut step1 = agent_step("s1", "Step 1");
step1.condition = Some(StepCondition::IfFieldEquals {
field: "x".to_string(),
value: "impossible".to_string(),
});
step1.on_failure = FailureAction::Skip;
let step2 = agent_step("s2", "Step 2");
engine
.register_workflow(simple_workflow(vec![step1, step2]))
.await;
let run_id = engine
.start("test_wf", serde_json::json!({}))
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert_eq!(run.status, RunStatus::Completed);
assert_eq!(run.step_results[0].status, StepStatus::Skipped);
assert_eq!(run.step_results[1].status, StepStatus::Completed);
}
#[tokio::test]
async fn test_expression_true_literal() {
let run = WorkflowRun {
run_id: "r1".into(),
workflow_id: "w1".into(),
status: RunStatus::Running,
current_step_index: 0,
trigger_data: serde_json::json!({}),
step_results: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
};
assert!(evaluate_expression("true", &run));
assert!(!evaluate_expression("false", &run));
}
#[tokio::test]
async fn test_expression_field_equals() {
let run = WorkflowRun {
run_id: "r1".into(),
workflow_id: "w1".into(),
status: RunStatus::Running,
current_step_index: 0,
trigger_data: serde_json::json!({"priority": "critical"}),
step_results: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
};
assert!(evaluate_expression("priority == critical", &run));
assert!(!evaluate_expression("priority == low", &run));
}
#[tokio::test]
async fn test_expression_numeric_comparison() {
let run = WorkflowRun {
run_id: "r1".into(),
workflow_id: "w1".into(),
status: RunStatus::Running,
current_step_index: 0,
trigger_data: serde_json::json!({"score": 85}),
step_results: vec![],
created_at: Utc::now(),
updated_at: Utc::now(),
};
assert!(evaluate_expression("score > 80", &run));
assert!(!evaluate_expression("score > 90", &run));
assert!(evaluate_expression("score < 90", &run));
assert!(!evaluate_expression("score < 80", &run));
}
#[tokio::test]
async fn test_lead_qualification_template() {
let wf = lead_qualification_workflow();
assert_eq!(wf.id, "lead_qualification");
assert!(!wf.steps.is_empty());
assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
assert!(wf.timeout_seconds.is_some());
let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
assert_eq!(ids.len(), unique.len());
}
#[tokio::test]
async fn test_support_ticket_template() {
let wf = support_ticket_workflow();
assert_eq!(wf.id, "support_ticket");
assert!(!wf.steps.is_empty());
assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
assert_eq!(ids.len(), unique.len());
}
#[tokio::test]
async fn test_lead_qualification_run_to_completion() {
let engine = WorkflowEngine::new();
engine
.register_workflow(lead_qualification_workflow())
.await;
let run_id = engine
.start(
"lead_qualification",
serde_json::json!({"lead_name": "Acme Corp", "score": 90}),
)
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert!(matches!(
run.status,
RunStatus::Completed | RunStatus::Failed
));
assert!(!run.step_results.is_empty());
}
#[tokio::test]
async fn test_support_ticket_run_to_completion() {
let engine = WorkflowEngine::new();
engine.register_workflow(support_ticket_workflow()).await;
let run_id = engine
.start(
"support_ticket",
serde_json::json!({"ticket": "My app crashes", "priority": "high"}),
)
.await
.unwrap();
let run = engine.run_to_completion(&run_id).await.unwrap();
assert!(matches!(
run.status,
RunStatus::Completed | RunStatus::Failed
));
assert!(!run.step_results.is_empty());
}
#[tokio::test]
async fn test_workflow_definition_serde() {
let wf = lead_qualification_workflow();
let json = serde_json::to_string_pretty(&wf).unwrap();
let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.id, wf.id);
assert_eq!(deserialized.steps.len(), wf.steps.len());
}
#[tokio::test]
async fn test_trigger_variants_serde() {
let triggers = vec![
WorkflowTrigger::Manual,
WorkflowTrigger::Webhook {
event: "push".into(),
},
WorkflowTrigger::Schedule {
cron: "0 * * * *".into(),
},
WorkflowTrigger::Threshold {
metric: "cpu".into(),
condition: "above".into(),
value: 90.0,
},
];
for t in &triggers {
let json = serde_json::to_string(t).unwrap();
let back: WorkflowTrigger = serde_json::from_str(&json).unwrap();
assert_eq!(*t, back);
}
}
#[tokio::test]
async fn test_engine_clone_shared_state() {
let engine = WorkflowEngine::new();
let engine2 = engine.clone();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
assert!(engine2.get_workflow("test_wf").await.is_some());
}
#[tokio::test]
async fn test_concurrent_starts() {
let engine = WorkflowEngine::new();
engine
.register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
.await;
let mut handles = vec![];
for _ in 0..10 {
let e = engine.clone();
handles.push(tokio::spawn(async move {
e.start("test_wf", serde_json::json!({})).await.unwrap()
}));
}
let mut ids = vec![];
for h in handles {
ids.push(h.await.unwrap());
}
let unique: std::collections::HashSet<&str> =
ids.iter().map(std::string::String::as_str).collect();
assert_eq!(unique.len(), 10);
}
}