1use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18use uuid::Uuid;
19
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
26#[serde(tag = "type", rename_all = "snake_case")]
27pub enum WorkflowTrigger {
28 Manual,
30 Webhook {
32 event: String,
34 },
35 Schedule {
37 cron: String,
39 },
40 Threshold {
42 metric: String,
44 condition: String,
46 value: f64,
48 },
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
57#[serde(tag = "type", rename_all = "snake_case")]
58pub enum StepType {
59 AgentTask {
61 agent_role: String,
63 prompt_template: String,
65 },
66 HttpCall {
68 method: String,
70 url: String,
72 body_template: Option<String>,
74 },
75 Condition {
77 expression: String,
79 if_true: String,
81 if_false: String,
83 },
84 Delay {
86 seconds: u64,
88 },
89 Notification {
91 channel: String,
93 message_template: String,
95 },
96 AssignToHuman {
98 team: String,
100 message: String,
102 },
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
107#[serde(tag = "type", rename_all = "snake_case")]
108pub enum StepCondition {
109 Always,
111 IfPreviousSucceeded,
113 IfFieldEquals {
115 field: String,
117 value: String,
119 },
120 IfScoreAbove {
122 field: String,
124 threshold: f64,
126 },
127 Expression(String),
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
133#[serde(rename_all = "snake_case")]
134pub enum FailureAction {
135 Abort,
137 Skip,
139 Retry {
141 max: u32,
143 },
144 GoTo {
146 step_id: String,
148 },
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
157pub struct WorkflowStepDef {
158 pub id: String,
160 pub name: String,
162 pub step_type: StepType,
164 pub condition: Option<StepCondition>,
166 pub on_failure: FailureAction,
168 pub timeout_seconds: Option<u64>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct WorkflowDefinition {
175 pub id: String,
177 pub name: String,
179 pub description: String,
181 pub trigger: WorkflowTrigger,
183 pub steps: Vec<WorkflowStepDef>,
185 pub timeout_seconds: Option<u64>,
187}
188
189#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
195#[serde(rename_all = "snake_case")]
196pub enum RunStatus {
197 Pending,
199 Running,
201 Completed,
203 Failed,
205 Paused,
207 TimedOut,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
213#[serde(rename_all = "snake_case")]
214pub enum StepStatus {
215 Completed,
217 Failed,
219 Skipped,
221 TimedOut,
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub struct StepResult {
228 pub step_id: String,
230 pub status: StepStatus,
232 pub output: serde_json::Value,
234 pub duration_ms: u64,
236 pub error: Option<String>,
238}
239
240#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct WorkflowRun {
243 pub run_id: String,
245 pub workflow_id: String,
247 pub status: RunStatus,
249 pub current_step_index: usize,
251 pub trigger_data: serde_json::Value,
253 pub step_results: Vec<StepResult>,
255 pub created_at: DateTime<Utc>,
257 pub updated_at: DateTime<Utc>,
259}
260
261#[derive(Clone)]
267pub struct WorkflowEngine {
268 workflows: Arc<RwLock<HashMap<String, WorkflowDefinition>>>,
269 runs: Arc<RwLock<HashMap<String, WorkflowRun>>>,
270}
271
272impl WorkflowEngine {
273 pub fn new() -> Self {
275 Self {
276 workflows: Arc::new(RwLock::new(HashMap::new())),
277 runs: Arc::new(RwLock::new(HashMap::new())),
278 }
279 }
280
281 pub async fn register_workflow(&self, workflow: WorkflowDefinition) {
283 let id = workflow.id.clone();
284 self.workflows.write().await.insert(id, workflow);
285 }
286
287 pub async fn get_workflow(&self, workflow_id: &str) -> Option<WorkflowDefinition> {
289 self.workflows.read().await.get(workflow_id).cloned()
290 }
291
292 pub async fn start(
296 &self,
297 workflow_id: &str,
298 trigger_data: serde_json::Value,
299 ) -> Option<String> {
300 let workflows = self.workflows.read().await;
301 if !workflows.contains_key(workflow_id) {
302 return None;
303 }
304 drop(workflows);
305
306 let run_id = Uuid::new_v4().to_string();
307 let now = Utc::now();
308 let run = WorkflowRun {
309 run_id: run_id.clone(),
310 workflow_id: workflow_id.to_string(),
311 status: RunStatus::Pending,
312 current_step_index: 0,
313 trigger_data,
314 step_results: Vec::new(),
315 created_at: now,
316 updated_at: now,
317 };
318 self.runs.write().await.insert(run_id.clone(), run);
319 Some(run_id)
320 }
321
322 pub async fn advance(&self, run_id: &str) -> Result<bool, String> {
328 let (mut run, workflow) = {
330 let runs = self.runs.read().await;
331 let run = runs
332 .get(run_id)
333 .ok_or_else(|| format!("run {run_id} not found"))?
334 .clone();
335
336 let workflows = self.workflows.read().await;
337 let workflow = workflows
338 .get(&run.workflow_id)
339 .ok_or_else(|| format!("workflow {} not found", run.workflow_id))?
340 .clone();
341 (run, workflow)
342 };
343
344 if matches!(
346 run.status,
347 RunStatus::Completed | RunStatus::Failed | RunStatus::TimedOut
348 ) {
349 return Ok(false);
350 }
351
352 if run.status == RunStatus::Pending {
354 run.status = RunStatus::Running;
355 }
356
357 if run.current_step_index >= workflow.steps.len() {
359 run.status = RunStatus::Completed;
360 run.updated_at = Utc::now();
361 self.runs.write().await.insert(run_id.to_string(), run);
362 return Ok(false);
363 }
364
365 let step = &workflow.steps[run.current_step_index];
366
367 let should_execute = evaluate_condition(&step.condition, &run);
369
370 let start = std::time::Instant::now();
371
372 let result = if should_execute {
373 execute_step(step, &run)
374 } else {
375 StepResult {
376 step_id: step.id.clone(),
377 status: StepStatus::Skipped,
378 output: serde_json::json!({ "skipped": true }),
379 duration_ms: 0,
380 error: None,
381 }
382 };
383
384 let duration_ms = start.elapsed().as_millis() as u64;
385
386 let result = if should_execute {
388 StepResult {
389 duration_ms,
390 ..result
391 }
392 } else {
393 result
394 };
395
396 let mut next_index = run.current_step_index + 1;
398 if let StepType::Condition {
399 ref expression,
400 ref if_true,
401 ref if_false,
402 } = step.step_type
403 {
404 let branch_target = if evaluate_expression(expression, &run) {
405 if_true
406 } else {
407 if_false
408 };
409 if let Some(idx) = workflow.steps.iter().position(|s| s.id == *branch_target) {
411 next_index = idx;
412 }
413 }
414
415 if result.status == StepStatus::Failed {
417 match &step.on_failure {
418 FailureAction::Abort => {
419 run.step_results.push(result);
420 run.status = RunStatus::Failed;
421 run.updated_at = Utc::now();
422 self.runs.write().await.insert(run_id.to_string(), run);
423 return Ok(true);
424 }
425 FailureAction::Skip => {
426 }
428 FailureAction::Retry { max } => {
429 let retry_count = run
430 .step_results
431 .iter()
432 .filter(|r| r.step_id == step.id && r.status == StepStatus::Failed)
433 .count() as u32;
434 if retry_count < *max {
435 run.step_results.push(result);
437 run.updated_at = Utc::now();
438 self.runs.write().await.insert(run_id.to_string(), run);
439 return Ok(true);
440 }
441 run.step_results.push(result);
443 run.status = RunStatus::Failed;
444 run.updated_at = Utc::now();
445 self.runs.write().await.insert(run_id.to_string(), run);
446 return Ok(true);
447 }
448 FailureAction::GoTo { step_id } => {
449 if let Some(idx) = workflow.steps.iter().position(|s| s.id == *step_id) {
450 next_index = idx;
451 }
452 }
453 }
454 }
455
456 run.step_results.push(result);
457 run.current_step_index = next_index;
458
459 if run.current_step_index >= workflow.steps.len() {
461 run.status = RunStatus::Completed;
462 }
463
464 run.updated_at = Utc::now();
465 self.runs.write().await.insert(run_id.to_string(), run);
466 Ok(true)
467 }
468
469 pub async fn get_run(&self, run_id: &str) -> Option<WorkflowRun> {
471 self.runs.read().await.get(run_id).cloned()
472 }
473
474 pub async fn list_runs(&self, workflow_id: &str) -> Vec<WorkflowRun> {
476 self.runs
477 .read()
478 .await
479 .values()
480 .filter(|r| r.workflow_id == workflow_id)
481 .cloned()
482 .collect()
483 }
484
485 pub async fn pause(&self, run_id: &str) -> Result<(), String> {
487 let mut runs = self.runs.write().await;
488 let run = runs
489 .get_mut(run_id)
490 .ok_or_else(|| format!("run {run_id} not found"))?;
491 if run.status != RunStatus::Running {
492 return Err(format!(
493 "run {run_id} is not running (status: {:?})",
494 run.status
495 ));
496 }
497 run.status = RunStatus::Paused;
498 run.updated_at = Utc::now();
499 Ok(())
500 }
501
502 pub async fn resume(&self, run_id: &str) -> Result<(), String> {
504 let mut runs = self.runs.write().await;
505 let run = runs
506 .get_mut(run_id)
507 .ok_or_else(|| format!("run {run_id} not found"))?;
508 if run.status != RunStatus::Paused {
509 return Err(format!(
510 "run {run_id} is not paused (status: {:?})",
511 run.status
512 ));
513 }
514 run.status = RunStatus::Running;
515 run.updated_at = Utc::now();
516 Ok(())
517 }
518
519 pub async fn run_to_completion(&self, run_id: &str) -> Result<WorkflowRun, String> {
523 let mut iterations = 0u32;
524 loop {
525 let advanced = self.advance(run_id).await?;
526 if !advanced {
527 break;
528 }
529 iterations += 1;
530 if iterations > 1000 {
531 return Err("workflow exceeded 1000 iterations — possible infinite loop".into());
532 }
533 }
534 self.get_run(run_id)
535 .await
536 .ok_or_else(|| format!("run {run_id} disappeared"))
537 }
538}
539
540impl Default for WorkflowEngine {
541 fn default() -> Self {
542 Self::new()
543 }
544}
545
546fn evaluate_condition(condition: &Option<StepCondition>, run: &WorkflowRun) -> bool {
552 match condition {
553 None | Some(StepCondition::Always) => true,
554 Some(StepCondition::IfPreviousSucceeded) => run
555 .step_results
556 .last()
557 .map(|r| r.status == StepStatus::Completed)
558 .unwrap_or(true),
559 Some(StepCondition::IfFieldEquals { field, value }) => {
560 extract_field(&run.trigger_data, field)
561 .and_then(|v| v.as_str().map(std::string::ToString::to_string))
562 .map(|v| v == *value)
563 .unwrap_or(false)
564 }
565 Some(StepCondition::IfScoreAbove { field, threshold }) => {
566 extract_field(&run.trigger_data, field)
567 .and_then(serde_json::Value::as_f64)
568 .map(|v| v > *threshold)
569 .unwrap_or(false)
570 }
571 Some(StepCondition::Expression(expr)) => evaluate_expression(expr, run),
572 }
573}
574
575fn evaluate_expression(expr: &str, run: &WorkflowRun) -> bool {
582 let expr = expr.trim();
583 if expr.eq_ignore_ascii_case("true") {
584 return true;
585 }
586 if expr.eq_ignore_ascii_case("false") {
587 return false;
588 }
589
590 if let Some((lhs, rhs)) = expr.split_once("==") {
592 let field = lhs.trim();
593 let expected = rhs.trim().trim_matches('"').trim_matches('\'');
594 let actual = resolve_field(field, run);
595 return actual.as_deref() == Some(expected);
596 }
597
598 if let Some((lhs, rhs)) = expr.split_once('>') {
600 let field = lhs.trim();
601 let threshold: f64 = match rhs.trim().parse() {
602 Ok(v) => v,
603 Err(_) => return false,
604 };
605 let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
606 Some(v) => v,
607 None => return false,
608 };
609 return actual > threshold;
610 }
611
612 if let Some((lhs, rhs)) = expr.split_once('<') {
614 let field = lhs.trim();
615 let threshold: f64 = match rhs.trim().parse() {
616 Ok(v) => v,
617 Err(_) => return false,
618 };
619 let actual: f64 = match resolve_field(field, run).and_then(|s| s.parse().ok()) {
620 Some(v) => v,
621 None => return false,
622 };
623 return actual < threshold;
624 }
625
626 false
627}
628
629fn resolve_field(field: &str, run: &WorkflowRun) -> Option<String> {
631 if let Some(v) = extract_field(&run.trigger_data, field) {
633 return value_to_string(v);
634 }
635 for result in run.step_results.iter().rev() {
637 if let Some(v) = extract_field(&result.output, field) {
638 return value_to_string(v);
639 }
640 }
641 None
642}
643
644fn extract_field<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
646 let mut current = value;
647 for segment in path.split('.') {
648 current = current.get(segment)?;
649 }
650 Some(current)
651}
652
653fn value_to_string(v: &serde_json::Value) -> Option<String> {
655 match v {
656 serde_json::Value::String(s) => Some(s.clone()),
657 serde_json::Value::Number(n) => Some(n.to_string()),
658 serde_json::Value::Bool(b) => Some(b.to_string()),
659 _ => Some(v.to_string()),
660 }
661}
662
663fn execute_step(step: &WorkflowStepDef, _run: &WorkflowRun) -> StepResult {
670 match &step.step_type {
671 StepType::AgentTask {
672 agent_role,
673 prompt_template,
674 } => StepResult {
675 step_id: step.id.clone(),
676 status: StepStatus::Completed,
677 output: serde_json::json!({
678 "agent_role": agent_role,
679 "prompt": prompt_template,
680 "response": format!("Simulated response from {agent_role} agent"),
681 }),
682 duration_ms: 0,
683 error: None,
684 },
685 StepType::HttpCall {
686 method,
687 url,
688 body_template,
689 } => StepResult {
690 step_id: step.id.clone(),
691 status: StepStatus::Completed,
692 output: serde_json::json!({
693 "method": method,
694 "url": url,
695 "body": body_template,
696 "status_code": 200,
697 "response_body": "{}",
698 }),
699 duration_ms: 0,
700 error: None,
701 },
702 StepType::Condition { expression, .. } => StepResult {
703 step_id: step.id.clone(),
704 status: StepStatus::Completed,
705 output: serde_json::json!({
706 "evaluated_expression": expression,
707 }),
708 duration_ms: 0,
709 error: None,
710 },
711 StepType::Delay { seconds } => StepResult {
712 step_id: step.id.clone(),
713 status: StepStatus::Completed,
714 output: serde_json::json!({ "delayed_seconds": seconds }),
715 duration_ms: 0,
716 error: None,
717 },
718 StepType::Notification {
719 channel,
720 message_template,
721 } => StepResult {
722 step_id: step.id.clone(),
723 status: StepStatus::Completed,
724 output: serde_json::json!({
725 "channel": channel,
726 "message": message_template,
727 "sent": true,
728 }),
729 duration_ms: 0,
730 error: None,
731 },
732 StepType::AssignToHuman { team, message } => StepResult {
733 step_id: step.id.clone(),
734 status: StepStatus::Completed,
735 output: serde_json::json!({
736 "team": team,
737 "message": message,
738 "assigned": true,
739 }),
740 duration_ms: 0,
741 error: None,
742 },
743 }
744}
745
746pub fn lead_qualification_workflow() -> WorkflowDefinition {
755 WorkflowDefinition {
756 id: "lead_qualification".to_string(),
757 name: "Lead Qualification Pipeline".to_string(),
758 description: "Qualifies incoming leads and routes hot prospects to sales.".to_string(),
759 trigger: WorkflowTrigger::Webhook {
760 event: "new_lead".to_string(),
761 },
762 timeout_seconds: Some(3600),
763 steps: vec![
764 WorkflowStepDef {
765 id: "qualify".to_string(),
766 name: "Qualify Lead".to_string(),
767 step_type: StepType::AgentTask {
768 agent_role: "analyst".to_string(),
769 prompt_template: "Analyze the following lead data and classify as HOT, WARM, or COLD: {{lead_data}}".to_string(),
770 },
771 condition: None,
772 on_failure: FailureAction::Abort,
773 timeout_seconds: Some(120),
774 },
775 WorkflowStepDef {
776 id: "check_hot".to_string(),
777 name: "Check if Lead is HOT".to_string(),
778 step_type: StepType::Condition {
779 expression: "score > 80".to_string(),
780 if_true: "assign_sales".to_string(),
781 if_false: "notify_marketing".to_string(),
782 },
783 condition: Some(StepCondition::IfPreviousSucceeded),
784 on_failure: FailureAction::Abort,
785 timeout_seconds: None,
786 },
787 WorkflowStepDef {
788 id: "assign_sales".to_string(),
789 name: "Assign to Sales Team".to_string(),
790 step_type: StepType::AssignToHuman {
791 team: "sales".to_string(),
792 message: "New HOT lead requires immediate follow-up.".to_string(),
793 },
794 condition: None,
795 on_failure: FailureAction::Retry { max: 2 },
796 timeout_seconds: Some(60),
797 },
798 WorkflowStepDef {
799 id: "compose_outreach".to_string(),
800 name: "Compose Outreach Email".to_string(),
801 step_type: StepType::AgentTask {
802 agent_role: "copywriter".to_string(),
803 prompt_template: "Draft a personalized outreach email for this lead: {{lead_data}}".to_string(),
804 },
805 condition: Some(StepCondition::IfPreviousSucceeded),
806 on_failure: FailureAction::Skip,
807 timeout_seconds: Some(180),
808 },
809 WorkflowStepDef {
810 id: "schedule_followup".to_string(),
811 name: "Schedule Follow-up".to_string(),
812 step_type: StepType::Notification {
813 channel: "calendar".to_string(),
814 message_template: "Follow-up with lead {{lead_name}} in 48 hours.".to_string(),
815 },
816 condition: Some(StepCondition::IfPreviousSucceeded),
817 on_failure: FailureAction::Skip,
818 timeout_seconds: Some(30),
819 },
820 WorkflowStepDef {
821 id: "notify_marketing".to_string(),
822 name: "Notify Marketing (warm/cold lead)".to_string(),
823 step_type: StepType::Notification {
824 channel: "slack".to_string(),
825 message_template: "New lead classified as warm/cold — added to nurture campaign.".to_string(),
826 },
827 condition: None,
828 on_failure: FailureAction::Skip,
829 timeout_seconds: Some(30),
830 },
831 ],
832 }
833}
834
835pub fn support_ticket_workflow() -> WorkflowDefinition {
840 WorkflowDefinition {
841 id: "support_ticket".to_string(),
842 name: "Support Ticket Pipeline".to_string(),
843 description: "Routes, triages, and responds to support tickets.".to_string(),
844 trigger: WorkflowTrigger::Webhook {
845 event: "new_ticket".to_string(),
846 },
847 timeout_seconds: Some(1800),
848 steps: vec![
849 WorkflowStepDef {
850 id: "route".to_string(),
851 name: "Route Ticket".to_string(),
852 step_type: StepType::AgentTask {
853 agent_role: "router".to_string(),
854 prompt_template: "Classify this support ticket by urgency (critical/high/medium/low) and category: {{ticket}}".to_string(),
855 },
856 condition: None,
857 on_failure: FailureAction::Abort,
858 timeout_seconds: Some(60),
859 },
860 WorkflowStepDef {
861 id: "check_urgent".to_string(),
862 name: "Check Urgency".to_string(),
863 step_type: StepType::Condition {
864 expression: "priority == critical".to_string(),
865 if_true: "notify_team".to_string(),
866 if_false: "generate_response".to_string(),
867 },
868 condition: Some(StepCondition::IfPreviousSucceeded),
869 on_failure: FailureAction::Abort,
870 timeout_seconds: None,
871 },
872 WorkflowStepDef {
873 id: "notify_team".to_string(),
874 name: "Notify On-Call Team".to_string(),
875 step_type: StepType::Notification {
876 channel: "pagerduty".to_string(),
877 message_template: "CRITICAL ticket requires immediate attention: {{ticket_id}}".to_string(),
878 },
879 condition: None,
880 on_failure: FailureAction::Retry { max: 3 },
881 timeout_seconds: Some(30),
882 },
883 WorkflowStepDef {
884 id: "generate_response".to_string(),
885 name: "Generate Response".to_string(),
886 step_type: StepType::AgentTask {
887 agent_role: "support".to_string(),
888 prompt_template: "Generate a helpful response for this support ticket: {{ticket}}".to_string(),
889 },
890 condition: None,
891 on_failure: FailureAction::Retry { max: 2 },
892 timeout_seconds: Some(120),
893 },
894 WorkflowStepDef {
895 id: "quality_check".to_string(),
896 name: "Quality Check".to_string(),
897 step_type: StepType::AgentTask {
898 agent_role: "reviewer".to_string(),
899 prompt_template: "Review this support response for accuracy and tone. Score 0-100: {{response}}".to_string(),
900 },
901 condition: Some(StepCondition::IfPreviousSucceeded),
902 on_failure: FailureAction::Skip,
903 timeout_seconds: Some(60),
904 },
905 WorkflowStepDef {
906 id: "check_quality".to_string(),
907 name: "Check Quality Score".to_string(),
908 step_type: StepType::Condition {
909 expression: "quality_score > 70".to_string(),
910 if_true: "send_response".to_string(),
911 if_false: "assign_human".to_string(),
912 },
913 condition: Some(StepCondition::IfPreviousSucceeded),
914 on_failure: FailureAction::Abort,
915 timeout_seconds: None,
916 },
917 WorkflowStepDef {
918 id: "send_response".to_string(),
919 name: "Send Response to Customer".to_string(),
920 step_type: StepType::Notification {
921 channel: "email".to_string(),
922 message_template: "Your support request has been addressed: {{response}}".to_string(),
923 },
924 condition: None,
925 on_failure: FailureAction::Retry { max: 2 },
926 timeout_seconds: Some(30),
927 },
928 WorkflowStepDef {
929 id: "assign_human".to_string(),
930 name: "Assign to Human Agent".to_string(),
931 step_type: StepType::AssignToHuman {
932 team: "support_l2".to_string(),
933 message: "AI-generated response did not meet quality threshold — please handle manually.".to_string(),
934 },
935 condition: None,
936 on_failure: FailureAction::Abort,
937 timeout_seconds: Some(60),
938 },
939 ],
940 }
941}
942
943#[cfg(test)]
948#[allow(clippy::unwrap_used, clippy::expect_used)]
949mod tests {
950 use super::*;
951
952 fn simple_workflow(steps: Vec<WorkflowStepDef>) -> WorkflowDefinition {
955 WorkflowDefinition {
956 id: "test_wf".to_string(),
957 name: "Test Workflow".to_string(),
958 description: "A test workflow".to_string(),
959 trigger: WorkflowTrigger::Manual,
960 steps,
961 timeout_seconds: None,
962 }
963 }
964
965 fn agent_step(id: &str, name: &str) -> WorkflowStepDef {
966 WorkflowStepDef {
967 id: id.to_string(),
968 name: name.to_string(),
969 step_type: StepType::AgentTask {
970 agent_role: "tester".to_string(),
971 prompt_template: "Do something".to_string(),
972 },
973 condition: None,
974 on_failure: FailureAction::Abort,
975 timeout_seconds: None,
976 }
977 }
978
979 #[tokio::test]
982 async fn test_engine_new() {
983 let engine = WorkflowEngine::new();
984 assert!(engine.workflows.read().await.is_empty());
985 assert!(engine.runs.read().await.is_empty());
986 }
987
988 #[tokio::test]
989 async fn test_engine_default() {
990 let engine = WorkflowEngine::default();
991 assert!(engine.workflows.read().await.is_empty());
992 }
993
994 #[tokio::test]
995 async fn test_register_workflow() {
996 let engine = WorkflowEngine::new();
997 let wf = simple_workflow(vec![agent_step("s1", "Step 1")]);
998 engine.register_workflow(wf).await;
999 assert!(engine.get_workflow("test_wf").await.is_some());
1000 }
1001
1002 #[tokio::test]
1003 async fn test_register_workflow_overwrite() {
1004 let engine = WorkflowEngine::new();
1005 let wf1 = simple_workflow(vec![agent_step("s1", "Step 1")]);
1006 engine.register_workflow(wf1).await;
1007
1008 let mut wf2 = simple_workflow(vec![agent_step("s1", "Step 1"), agent_step("s2", "Step 2")]);
1009 wf2.id = "test_wf".to_string();
1010 engine.register_workflow(wf2).await;
1011
1012 let wf = engine.get_workflow("test_wf").await.unwrap();
1013 assert_eq!(wf.steps.len(), 2);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_get_workflow_missing() {
1018 let engine = WorkflowEngine::new();
1019 assert!(engine.get_workflow("nope").await.is_none());
1020 }
1021
1022 #[tokio::test]
1025 async fn test_start_returns_run_id() {
1026 let engine = WorkflowEngine::new();
1027 engine
1028 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1029 .await;
1030 let run_id = engine
1031 .start("test_wf", serde_json::json!({}))
1032 .await
1033 .unwrap();
1034 assert!(!run_id.is_empty());
1035 }
1036
1037 #[tokio::test]
1038 async fn test_start_unknown_workflow_returns_none() {
1039 let engine = WorkflowEngine::new();
1040 assert!(engine
1041 .start("no_such", serde_json::json!({}))
1042 .await
1043 .is_none());
1044 }
1045
1046 #[tokio::test]
1047 async fn test_get_run_initial_state() {
1048 let engine = WorkflowEngine::new();
1049 engine
1050 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1051 .await;
1052 let run_id = engine
1053 .start("test_wf", serde_json::json!({"key": "val"}))
1054 .await
1055 .unwrap();
1056 let run = engine.get_run(&run_id).await.unwrap();
1057 assert_eq!(run.status, RunStatus::Pending);
1058 assert_eq!(run.current_step_index, 0);
1059 assert_eq!(run.trigger_data["key"], "val");
1060 assert!(run.step_results.is_empty());
1061 }
1062
1063 #[tokio::test]
1064 async fn test_get_run_missing() {
1065 let engine = WorkflowEngine::new();
1066 assert!(engine.get_run("nope").await.is_none());
1067 }
1068
1069 #[tokio::test]
1072 async fn test_advance_single_step() {
1073 let engine = WorkflowEngine::new();
1074 engine
1075 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1076 .await;
1077 let run_id = engine
1078 .start("test_wf", serde_json::json!({}))
1079 .await
1080 .unwrap();
1081
1082 let advanced = engine.advance(&run_id).await.unwrap();
1083 assert!(advanced);
1084
1085 let run = engine.get_run(&run_id).await.unwrap();
1086 assert_eq!(run.status, RunStatus::Completed);
1087 assert_eq!(run.step_results.len(), 1);
1088 assert_eq!(run.step_results[0].status, StepStatus::Completed);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_advance_multi_step() {
1093 let engine = WorkflowEngine::new();
1094 engine
1095 .register_workflow(simple_workflow(vec![
1096 agent_step("s1", "Step 1"),
1097 agent_step("s2", "Step 2"),
1098 agent_step("s3", "Step 3"),
1099 ]))
1100 .await;
1101 let run_id = engine
1102 .start("test_wf", serde_json::json!({}))
1103 .await
1104 .unwrap();
1105
1106 assert!(engine.advance(&run_id).await.unwrap());
1108 let run = engine.get_run(&run_id).await.unwrap();
1109 assert_eq!(run.status, RunStatus::Running);
1110 assert_eq!(run.step_results.len(), 1);
1111
1112 assert!(engine.advance(&run_id).await.unwrap());
1114 assert!(engine.advance(&run_id).await.unwrap());
1116
1117 let run = engine.get_run(&run_id).await.unwrap();
1118 assert_eq!(run.status, RunStatus::Completed);
1119 assert_eq!(run.step_results.len(), 3);
1120 }
1121
1122 #[tokio::test]
1123 async fn test_advance_completed_run_returns_false() {
1124 let engine = WorkflowEngine::new();
1125 engine
1126 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1127 .await;
1128 let run_id = engine
1129 .start("test_wf", serde_json::json!({}))
1130 .await
1131 .unwrap();
1132 engine.advance(&run_id).await.unwrap();
1133 assert!(!engine.advance(&run_id).await.unwrap());
1135 }
1136
1137 #[tokio::test]
1138 async fn test_advance_unknown_run_errors() {
1139 let engine = WorkflowEngine::new();
1140 let result = engine.advance("nope").await;
1141 assert!(result.is_err());
1142 }
1143
1144 #[tokio::test]
1147 async fn test_list_runs() {
1148 let engine = WorkflowEngine::new();
1149 engine
1150 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1151 .await;
1152 engine
1153 .start("test_wf", serde_json::json!({}))
1154 .await
1155 .unwrap();
1156 engine
1157 .start("test_wf", serde_json::json!({}))
1158 .await
1159 .unwrap();
1160
1161 let runs = engine.list_runs("test_wf").await;
1162 assert_eq!(runs.len(), 2);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_list_runs_empty() {
1167 let engine = WorkflowEngine::new();
1168 let runs = engine.list_runs("nothing").await;
1169 assert!(runs.is_empty());
1170 }
1171
1172 #[tokio::test]
1175 async fn test_run_to_completion() {
1176 let engine = WorkflowEngine::new();
1177 engine
1178 .register_workflow(simple_workflow(vec![
1179 agent_step("s1", "Step 1"),
1180 agent_step("s2", "Step 2"),
1181 ]))
1182 .await;
1183 let run_id = engine
1184 .start("test_wf", serde_json::json!({}))
1185 .await
1186 .unwrap();
1187 let run = engine.run_to_completion(&run_id).await.unwrap();
1188 assert_eq!(run.status, RunStatus::Completed);
1189 assert_eq!(run.step_results.len(), 2);
1190 }
1191
1192 #[tokio::test]
1195 async fn test_pause_resume() {
1196 let engine = WorkflowEngine::new();
1197 engine
1198 .register_workflow(simple_workflow(vec![
1199 agent_step("s1", "Step 1"),
1200 agent_step("s2", "Step 2"),
1201 ]))
1202 .await;
1203 let run_id = engine
1204 .start("test_wf", serde_json::json!({}))
1205 .await
1206 .unwrap();
1207
1208 engine.advance(&run_id).await.unwrap();
1210 let run = engine.get_run(&run_id).await.unwrap();
1211 assert_eq!(run.status, RunStatus::Running);
1212
1213 engine.pause(&run_id).await.unwrap();
1215 let run = engine.get_run(&run_id).await.unwrap();
1216 assert_eq!(run.status, RunStatus::Paused);
1217
1218 engine.resume(&run_id).await.unwrap();
1220 let run = engine.get_run(&run_id).await.unwrap();
1221 assert_eq!(run.status, RunStatus::Running);
1222 }
1223
1224 #[tokio::test]
1225 async fn test_pause_non_running_fails() {
1226 let engine = WorkflowEngine::new();
1227 engine
1228 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1229 .await;
1230 let run_id = engine
1231 .start("test_wf", serde_json::json!({}))
1232 .await
1233 .unwrap();
1234 assert!(engine.pause(&run_id).await.is_err());
1236 }
1237
1238 #[tokio::test]
1239 async fn test_resume_non_paused_fails() {
1240 let engine = WorkflowEngine::new();
1241 engine
1242 .register_workflow(simple_workflow(vec![
1243 agent_step("s1", "Step 1"),
1244 agent_step("s2", "Step 2"),
1245 ]))
1246 .await;
1247 let run_id = engine
1248 .start("test_wf", serde_json::json!({}))
1249 .await
1250 .unwrap();
1251 engine.advance(&run_id).await.unwrap();
1252 assert!(engine.resume(&run_id).await.is_err());
1254 }
1255
1256 #[tokio::test]
1259 async fn test_condition_if_previous_succeeded() {
1260 let engine = WorkflowEngine::new();
1261 let mut step2 = agent_step("s2", "Step 2");
1262 step2.condition = Some(StepCondition::IfPreviousSucceeded);
1263 engine
1264 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1"), step2]))
1265 .await;
1266 let run_id = engine
1267 .start("test_wf", serde_json::json!({}))
1268 .await
1269 .unwrap();
1270 let run = engine.run_to_completion(&run_id).await.unwrap();
1271 assert_eq!(run.status, RunStatus::Completed);
1272 assert_eq!(run.step_results.len(), 2);
1273 assert_eq!(run.step_results[1].status, StepStatus::Completed);
1274 }
1275
1276 #[tokio::test]
1277 async fn test_condition_if_field_equals() {
1278 let engine = WorkflowEngine::new();
1279 let mut step = agent_step("s1", "Conditional Step");
1280 step.condition = Some(StepCondition::IfFieldEquals {
1281 field: "tier".to_string(),
1282 value: "gold".to_string(),
1283 });
1284 engine.register_workflow(simple_workflow(vec![step])).await;
1285
1286 let run_id = engine
1288 .start("test_wf", serde_json::json!({"tier": "gold"}))
1289 .await
1290 .unwrap();
1291 let run = engine.run_to_completion(&run_id).await.unwrap();
1292 assert_eq!(run.step_results[0].status, StepStatus::Completed);
1293 }
1294
1295 #[tokio::test]
1296 async fn test_condition_if_field_equals_mismatch_skips() {
1297 let engine = WorkflowEngine::new();
1298 let mut step = agent_step("s1", "Conditional Step");
1299 step.condition = Some(StepCondition::IfFieldEquals {
1300 field: "tier".to_string(),
1301 value: "gold".to_string(),
1302 });
1303 engine.register_workflow(simple_workflow(vec![step])).await;
1304
1305 let run_id = engine
1306 .start("test_wf", serde_json::json!({"tier": "silver"}))
1307 .await
1308 .unwrap();
1309 let run = engine.run_to_completion(&run_id).await.unwrap();
1310 assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1311 }
1312
1313 #[tokio::test]
1314 async fn test_condition_if_score_above() {
1315 let engine = WorkflowEngine::new();
1316 let mut step = agent_step("s1", "High Score Step");
1317 step.condition = Some(StepCondition::IfScoreAbove {
1318 field: "score".to_string(),
1319 threshold: 50.0,
1320 });
1321 engine.register_workflow(simple_workflow(vec![step])).await;
1322
1323 let run_id = engine
1324 .start("test_wf", serde_json::json!({"score": 80}))
1325 .await
1326 .unwrap();
1327 let run = engine.run_to_completion(&run_id).await.unwrap();
1328 assert_eq!(run.step_results[0].status, StepStatus::Completed);
1329 }
1330
1331 #[tokio::test]
1332 async fn test_condition_if_score_below_skips() {
1333 let engine = WorkflowEngine::new();
1334 let mut step = agent_step("s1", "High Score Step");
1335 step.condition = Some(StepCondition::IfScoreAbove {
1336 field: "score".to_string(),
1337 threshold: 50.0,
1338 });
1339 engine.register_workflow(simple_workflow(vec![step])).await;
1340
1341 let run_id = engine
1342 .start("test_wf", serde_json::json!({"score": 30}))
1343 .await
1344 .unwrap();
1345 let run = engine.run_to_completion(&run_id).await.unwrap();
1346 assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1347 }
1348
1349 #[tokio::test]
1352 async fn test_delay_step() {
1353 let engine = WorkflowEngine::new();
1354 let wf = simple_workflow(vec![WorkflowStepDef {
1355 id: "d1".to_string(),
1356 name: "Delay".to_string(),
1357 step_type: StepType::Delay { seconds: 5 },
1358 condition: None,
1359 on_failure: FailureAction::Abort,
1360 timeout_seconds: None,
1361 }]);
1362 engine.register_workflow(wf).await;
1363 let run_id = engine
1364 .start("test_wf", serde_json::json!({}))
1365 .await
1366 .unwrap();
1367 let run = engine.run_to_completion(&run_id).await.unwrap();
1368 assert_eq!(run.step_results[0].output["delayed_seconds"], 5);
1369 }
1370
1371 #[tokio::test]
1372 async fn test_notification_step() {
1373 let engine = WorkflowEngine::new();
1374 let wf = simple_workflow(vec![WorkflowStepDef {
1375 id: "n1".to_string(),
1376 name: "Notify".to_string(),
1377 step_type: StepType::Notification {
1378 channel: "slack".to_string(),
1379 message_template: "Hello!".to_string(),
1380 },
1381 condition: None,
1382 on_failure: FailureAction::Abort,
1383 timeout_seconds: None,
1384 }]);
1385 engine.register_workflow(wf).await;
1386 let run_id = engine
1387 .start("test_wf", serde_json::json!({}))
1388 .await
1389 .unwrap();
1390 let run = engine.run_to_completion(&run_id).await.unwrap();
1391 assert_eq!(run.step_results[0].output["sent"], true);
1392 }
1393
1394 #[tokio::test]
1395 async fn test_assign_to_human_step() {
1396 let engine = WorkflowEngine::new();
1397 let wf = simple_workflow(vec![WorkflowStepDef {
1398 id: "h1".to_string(),
1399 name: "Human".to_string(),
1400 step_type: StepType::AssignToHuman {
1401 team: "ops".to_string(),
1402 message: "Please handle".to_string(),
1403 },
1404 condition: None,
1405 on_failure: FailureAction::Abort,
1406 timeout_seconds: None,
1407 }]);
1408 engine.register_workflow(wf).await;
1409 let run_id = engine
1410 .start("test_wf", serde_json::json!({}))
1411 .await
1412 .unwrap();
1413 let run = engine.run_to_completion(&run_id).await.unwrap();
1414 assert_eq!(run.step_results[0].output["assigned"], true);
1415 }
1416
1417 #[tokio::test]
1418 async fn test_http_call_step() {
1419 let engine = WorkflowEngine::new();
1420 let wf = simple_workflow(vec![WorkflowStepDef {
1421 id: "http1".to_string(),
1422 name: "HTTP".to_string(),
1423 step_type: StepType::HttpCall {
1424 method: "POST".to_string(),
1425 url: "https://api.example.com/hook".to_string(),
1426 body_template: Some(r#"{"data": "{{payload}}"}"#.to_string()),
1427 },
1428 condition: None,
1429 on_failure: FailureAction::Abort,
1430 timeout_seconds: None,
1431 }]);
1432 engine.register_workflow(wf).await;
1433 let run_id = engine
1434 .start("test_wf", serde_json::json!({}))
1435 .await
1436 .unwrap();
1437 let run = engine.run_to_completion(&run_id).await.unwrap();
1438 assert_eq!(run.step_results[0].output["status_code"], 200);
1439 }
1440
1441 #[tokio::test]
1444 async fn test_failure_action_skip() {
1445 let engine = WorkflowEngine::new();
1446 let mut step1 = agent_step("s1", "Step 1");
1450 step1.condition = Some(StepCondition::IfFieldEquals {
1451 field: "x".to_string(),
1452 value: "impossible".to_string(),
1453 });
1454 step1.on_failure = FailureAction::Skip;
1455 let step2 = agent_step("s2", "Step 2");
1456 engine
1457 .register_workflow(simple_workflow(vec![step1, step2]))
1458 .await;
1459 let run_id = engine
1460 .start("test_wf", serde_json::json!({}))
1461 .await
1462 .unwrap();
1463 let run = engine.run_to_completion(&run_id).await.unwrap();
1464 assert_eq!(run.status, RunStatus::Completed);
1465 assert_eq!(run.step_results[0].status, StepStatus::Skipped);
1467 assert_eq!(run.step_results[1].status, StepStatus::Completed);
1468 }
1469
1470 #[tokio::test]
1473 async fn test_expression_true_literal() {
1474 let run = WorkflowRun {
1475 run_id: "r1".into(),
1476 workflow_id: "w1".into(),
1477 status: RunStatus::Running,
1478 current_step_index: 0,
1479 trigger_data: serde_json::json!({}),
1480 step_results: vec![],
1481 created_at: Utc::now(),
1482 updated_at: Utc::now(),
1483 };
1484 assert!(evaluate_expression("true", &run));
1485 assert!(!evaluate_expression("false", &run));
1486 }
1487
1488 #[tokio::test]
1489 async fn test_expression_field_equals() {
1490 let run = WorkflowRun {
1491 run_id: "r1".into(),
1492 workflow_id: "w1".into(),
1493 status: RunStatus::Running,
1494 current_step_index: 0,
1495 trigger_data: serde_json::json!({"priority": "critical"}),
1496 step_results: vec![],
1497 created_at: Utc::now(),
1498 updated_at: Utc::now(),
1499 };
1500 assert!(evaluate_expression("priority == critical", &run));
1501 assert!(!evaluate_expression("priority == low", &run));
1502 }
1503
1504 #[tokio::test]
1505 async fn test_expression_numeric_comparison() {
1506 let run = WorkflowRun {
1507 run_id: "r1".into(),
1508 workflow_id: "w1".into(),
1509 status: RunStatus::Running,
1510 current_step_index: 0,
1511 trigger_data: serde_json::json!({"score": 85}),
1512 step_results: vec![],
1513 created_at: Utc::now(),
1514 updated_at: Utc::now(),
1515 };
1516 assert!(evaluate_expression("score > 80", &run));
1517 assert!(!evaluate_expression("score > 90", &run));
1518 assert!(evaluate_expression("score < 90", &run));
1519 assert!(!evaluate_expression("score < 80", &run));
1520 }
1521
1522 #[tokio::test]
1525 async fn test_lead_qualification_template() {
1526 let wf = lead_qualification_workflow();
1527 assert_eq!(wf.id, "lead_qualification");
1528 assert!(!wf.steps.is_empty());
1529 assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
1530 assert!(wf.timeout_seconds.is_some());
1531
1532 let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
1534 let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
1535 assert_eq!(ids.len(), unique.len());
1536 }
1537
1538 #[tokio::test]
1539 async fn test_support_ticket_template() {
1540 let wf = support_ticket_workflow();
1541 assert_eq!(wf.id, "support_ticket");
1542 assert!(!wf.steps.is_empty());
1543 assert!(matches!(wf.trigger, WorkflowTrigger::Webhook { .. }));
1544
1545 let ids: Vec<&str> = wf.steps.iter().map(|s| s.id.as_str()).collect();
1546 let unique: std::collections::HashSet<&str> = ids.iter().copied().collect();
1547 assert_eq!(ids.len(), unique.len());
1548 }
1549
1550 #[tokio::test]
1551 async fn test_lead_qualification_run_to_completion() {
1552 let engine = WorkflowEngine::new();
1553 engine
1554 .register_workflow(lead_qualification_workflow())
1555 .await;
1556 let run_id = engine
1557 .start(
1558 "lead_qualification",
1559 serde_json::json!({"lead_name": "Acme Corp", "score": 90}),
1560 )
1561 .await
1562 .unwrap();
1563 let run = engine.run_to_completion(&run_id).await.unwrap();
1564 assert!(matches!(
1566 run.status,
1567 RunStatus::Completed | RunStatus::Failed
1568 ));
1569 assert!(!run.step_results.is_empty());
1570 }
1571
1572 #[tokio::test]
1573 async fn test_support_ticket_run_to_completion() {
1574 let engine = WorkflowEngine::new();
1575 engine.register_workflow(support_ticket_workflow()).await;
1576 let run_id = engine
1577 .start(
1578 "support_ticket",
1579 serde_json::json!({"ticket": "My app crashes", "priority": "high"}),
1580 )
1581 .await
1582 .unwrap();
1583 let run = engine.run_to_completion(&run_id).await.unwrap();
1584 assert!(matches!(
1585 run.status,
1586 RunStatus::Completed | RunStatus::Failed
1587 ));
1588 assert!(!run.step_results.is_empty());
1589 }
1590
1591 #[tokio::test]
1594 async fn test_workflow_definition_serde() {
1595 let wf = lead_qualification_workflow();
1596 let json = serde_json::to_string_pretty(&wf).unwrap();
1597 let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap();
1598 assert_eq!(deserialized.id, wf.id);
1599 assert_eq!(deserialized.steps.len(), wf.steps.len());
1600 }
1601
1602 #[tokio::test]
1603 async fn test_trigger_variants_serde() {
1604 let triggers = vec![
1605 WorkflowTrigger::Manual,
1606 WorkflowTrigger::Webhook {
1607 event: "push".into(),
1608 },
1609 WorkflowTrigger::Schedule {
1610 cron: "0 * * * *".into(),
1611 },
1612 WorkflowTrigger::Threshold {
1613 metric: "cpu".into(),
1614 condition: "above".into(),
1615 value: 90.0,
1616 },
1617 ];
1618 for t in &triggers {
1619 let json = serde_json::to_string(t).unwrap();
1620 let back: WorkflowTrigger = serde_json::from_str(&json).unwrap();
1621 assert_eq!(*t, back);
1622 }
1623 }
1624
1625 #[tokio::test]
1628 async fn test_engine_clone_shared_state() {
1629 let engine = WorkflowEngine::new();
1630 let engine2 = engine.clone();
1631 engine
1632 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1633 .await;
1634 assert!(engine2.get_workflow("test_wf").await.is_some());
1636 }
1637
1638 #[tokio::test]
1639 async fn test_concurrent_starts() {
1640 let engine = WorkflowEngine::new();
1641 engine
1642 .register_workflow(simple_workflow(vec![agent_step("s1", "Step 1")]))
1643 .await;
1644
1645 let mut handles = vec![];
1646 for _ in 0..10 {
1647 let e = engine.clone();
1648 handles.push(tokio::spawn(async move {
1649 e.start("test_wf", serde_json::json!({})).await.unwrap()
1650 }));
1651 }
1652
1653 let mut ids = vec![];
1654 for h in handles {
1655 ids.push(h.await.unwrap());
1656 }
1657 let unique: std::collections::HashSet<&str> =
1659 ids.iter().map(std::string::String::as_str).collect();
1660 assert_eq!(unique.len(), 10);
1661 }
1662}