mockforge_core/scenario_studio/
flow.rs

1//! Flow execution engine
2//!
3//! This module provides the execution engine for business flows defined in the Scenario Studio.
4
5use crate::error::{Error, Result};
6use crate::scenario_studio::types::{
7    ConditionOperator, FlowCondition, FlowDefinition, FlowStep, StepType,
8};
9use chrono::Utc;
10use regex::Regex;
11use reqwest::Client;
12use serde_json::Value;
13use std::collections::HashMap;
14
15/// Result of executing a flow step
16#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct FlowStepResult {
18    /// ID of the step that was executed
19    pub step_id: String,
20    /// Whether the step executed successfully
21    pub success: bool,
22    /// Response data (if applicable)
23    pub response: Option<Value>,
24    /// Error message (if execution failed)
25    pub error: Option<String>,
26    /// Duration in milliseconds
27    pub duration_ms: u64,
28    /// Variables extracted from the response
29    pub extracted_variables: HashMap<String, Value>,
30}
31
32/// Result of executing an entire flow
33#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
34pub struct FlowExecutionResult {
35    /// ID of the flow that was executed
36    pub flow_id: String,
37    /// Whether the flow completed successfully
38    pub success: bool,
39    /// Results for each step that was executed
40    pub step_results: Vec<FlowStepResult>,
41    /// Final variables after flow execution
42    pub final_variables: HashMap<String, Value>,
43    /// Total duration in milliseconds
44    pub total_duration_ms: u64,
45    /// Error message (if execution failed)
46    pub error: Option<String>,
47}
48
49/// Flow execution engine
50///
51/// Executes business flows defined in the Scenario Studio, handling
52/// step sequencing, conditions, and variable extraction.
53pub struct FlowExecutor {
54    /// Variables available during execution
55    variables: HashMap<String, Value>,
56    /// HTTP client for API calls
57    http_client: Client,
58}
59
60impl FlowExecutor {
61    /// Create a new flow executor
62    pub fn new() -> Self {
63        Self {
64            variables: HashMap::new(),
65            http_client: Client::new(),
66        }
67    }
68
69    /// Create a new flow executor with initial variables
70    pub fn with_variables(variables: HashMap<String, Value>) -> Self {
71        Self {
72            variables,
73            http_client: Client::new(),
74        }
75    }
76
77    /// Execute a flow definition
78    ///
79    /// This method executes the flow steps in order, following connections
80    /// and evaluating conditions to determine the execution path.
81    pub async fn execute(&mut self, flow: &FlowDefinition) -> Result<FlowExecutionResult> {
82        let start_time = Utc::now();
83        let mut step_results = Vec::new();
84        let mut executed_step_ids = std::collections::HashSet::new();
85        let mut current_step_ids = self.find_start_steps(flow);
86
87        // Initialize variables from flow
88        for (key, value) in &flow.variables {
89            self.variables.insert(key.clone(), value.clone());
90        }
91
92        // Execute flow until no more steps to execute
93        while !current_step_ids.is_empty() {
94            let mut next_step_ids = Vec::new();
95
96            for step_id in current_step_ids {
97                if executed_step_ids.contains(&step_id) {
98                    continue; // Skip already executed steps to prevent infinite loops
99                }
100
101                let step = flow
102                    .steps
103                    .iter()
104                    .find(|s| s.id == step_id)
105                    .ok_or_else(|| Error::validation(format!("Step {} not found", step_id)))?;
106
107                // Check if step condition is met
108                if let Some(ref condition) = step.condition {
109                    if !self.evaluate_condition(condition)? {
110                        continue; // Skip this step if condition is not met
111                    }
112                }
113
114                // Handle special step types
115                match step.step_type {
116                    StepType::Loop => {
117                        // Execute loop: get child steps and iterate
118                        let loop_results = self.execute_loop(step, flow).await?;
119                        step_results.extend(loop_results);
120                        executed_step_ids.insert(step_id.clone());
121                    }
122                    StepType::Parallel => {
123                        // Execute parallel: get child steps and run in parallel
124                        let parallel_results = self.execute_parallel(step, flow).await?;
125                        step_results.extend(parallel_results);
126                        executed_step_ids.insert(step_id.clone());
127                    }
128                    _ => {
129                        // Execute the step normally
130                        let step_result = self.execute_step(step).await?;
131                        step_results.push(step_result.clone());
132                        executed_step_ids.insert(step_id.clone());
133                    }
134                }
135
136                // Find next steps based on connections
137                let connections = flow.connections.iter().filter(|c| c.from_step_id == step_id);
138
139                for connection in connections {
140                    // Check connection condition if present
141                    if let Some(ref condition) = connection.condition {
142                        if !self.evaluate_condition(condition)? {
143                            continue; // Skip this connection if condition is not met
144                        }
145                    }
146
147                    if !executed_step_ids.contains(&connection.to_step_id) {
148                        next_step_ids.push(connection.to_step_id.clone());
149                    }
150                }
151            }
152
153            current_step_ids = next_step_ids;
154        }
155
156        let end_time = Utc::now();
157        let total_duration_ms = (end_time - start_time).num_milliseconds() as u64;
158
159        let success = step_results.iter().all(|r| r.success);
160
161        // Extract error before moving step_results
162        let error = if success {
163            None
164        } else {
165            step_results.iter().find_map(|r| r.error.as_ref()).cloned()
166        };
167
168        Ok(FlowExecutionResult {
169            flow_id: flow.id.clone(),
170            success,
171            step_results,
172            final_variables: self.variables.clone(),
173            total_duration_ms,
174            error,
175        })
176    }
177
178    /// Find the starting steps in a flow (steps with no incoming connections)
179    fn find_start_steps(&self, flow: &FlowDefinition) -> Vec<String> {
180        let has_incoming: std::collections::HashSet<String> =
181            flow.connections.iter().map(|c| c.to_step_id.clone()).collect();
182
183        flow.steps
184            .iter()
185            .filter(|s| !has_incoming.contains(&s.id))
186            .map(|s| s.id.clone())
187            .collect()
188    }
189
190    /// Execute a single flow step
191    async fn execute_step(&mut self, step: &FlowStep) -> Result<FlowStepResult> {
192        let start_time = Utc::now();
193        let mut extracted_variables = HashMap::new();
194
195        // Apply delay if specified
196        if let Some(delay_ms) = step.delay_ms {
197            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
198        }
199
200        let (success, response, error) = match step.step_type {
201            StepType::ApiCall => self.execute_api_call(step).await,
202            StepType::Condition => {
203                // Conditions are evaluated before step execution
204                (true, None, None)
205            }
206            StepType::Delay => {
207                // Delay is already applied above
208                (true, None, None)
209            }
210            StepType::Loop => {
211                // Loop execution is handled at the flow level, not step level
212                // This should not be reached in normal execution
213                (false, None, Some("Loop steps must be handled at flow level".to_string()))
214            }
215            StepType::Parallel => {
216                // Parallel execution is handled at the flow level, not step level
217                // This should not be reached in normal execution
218                (false, None, Some("Parallel steps must be handled at flow level".to_string()))
219            }
220        };
221
222        // Extract variables from response
223        if let Some(ref resp) = response {
224            for (key, path) in &step.extract {
225                if let Some(value) = self.extract_value(resp, path) {
226                    extracted_variables.insert(key.clone(), value.clone());
227                    self.variables.insert(key.clone(), value);
228                }
229            }
230        }
231
232        let end_time = Utc::now();
233        let duration_ms = (end_time - start_time).num_milliseconds() as u64;
234
235        Ok(FlowStepResult {
236            step_id: step.id.clone(),
237            success,
238            response,
239            error,
240            duration_ms,
241            extracted_variables,
242        })
243    }
244
245    /// Execute an API call step
246    async fn execute_api_call(&self, step: &FlowStep) -> (bool, Option<Value>, Option<String>) {
247        // Get method and endpoint
248        let method = match step.method.as_ref() {
249            Some(m) => m,
250            None => {
251                return (false, None, Some("API call step missing method".to_string()));
252            }
253        };
254
255        let endpoint = match step.endpoint.as_ref() {
256            Some(e) => e,
257            None => {
258                return (false, None, Some("API call step missing endpoint".to_string()));
259            }
260        };
261
262        // Substitute variables in endpoint and body
263        let endpoint = self.substitute_variables(endpoint);
264        let body = step.body.as_ref().map(|b| self.substitute_variables_in_value(b));
265
266        // Build request
267        let method = match method.to_uppercase().as_str() {
268            "GET" => reqwest::Method::GET,
269            "POST" => reqwest::Method::POST,
270            "PUT" => reqwest::Method::PUT,
271            "PATCH" => reqwest::Method::PATCH,
272            "DELETE" => reqwest::Method::DELETE,
273            "HEAD" => reqwest::Method::HEAD,
274            "OPTIONS" => reqwest::Method::OPTIONS,
275            _ => {
276                return (false, None, Some(format!("Unsupported HTTP method: {}", method)));
277            }
278        };
279
280        let mut request = self.http_client.request(method, &endpoint);
281
282        // Add headers
283        for (key, value) in &step.headers {
284            let header_value = self.substitute_variables(value);
285            request = request.header(key, &header_value);
286        }
287
288        // Add body if present
289        if let Some(ref body_value) = body {
290            if let Ok(json_body) = serde_json::to_string(body_value) {
291                request = request.header("Content-Type", "application/json").body(json_body);
292            }
293        }
294
295        // Execute request
296        match request.send().await {
297            Ok(response) => {
298                let status = response.status();
299                let status_code = status.as_u16();
300
301                // Check expected status if specified
302                if let Some(expected) = step.expected_status {
303                    if status_code != expected {
304                        return (
305                            false,
306                            Some(serde_json::json!({
307                                "status": status_code,
308                                "error": format!("Expected status {}, got {}", expected, status_code)
309                            })),
310                            Some(format!(
311                                "Status code mismatch: expected {}, got {}",
312                                expected, status_code
313                            )),
314                        );
315                    }
316                }
317
318                // Parse response body
319                let response_body = match response.text().await {
320                    Ok(text) => {
321                        // Try to parse as JSON, fallback to string
322                        serde_json::from_str(&text).unwrap_or_else(|_| {
323                            serde_json::json!({
324                                "body": text,
325                                "status": status_code
326                            })
327                        })
328                    }
329                    Err(e) => {
330                        return (false, None, Some(format!("Failed to read response body: {}", e)));
331                    }
332                };
333
334                // Build full response object
335                let full_response = serde_json::json!({
336                    "status": status_code,
337                    "headers": {}, // Could extract headers if needed
338                    "body": response_body
339                });
340
341                (true, Some(full_response), None)
342            }
343            Err(e) => (false, None, Some(format!("API call failed: {}", e))),
344        }
345    }
346
347    /// Substitute variables in a string (e.g., "{{variable_name}}")
348    fn substitute_variables(&self, text: &str) -> String {
349        let re = Regex::new(r"\{\{([^}]+)\}\}").unwrap();
350        re.replace_all(text, |caps: &regex::Captures| {
351            let var_name = caps.get(1).unwrap().as_str().trim();
352            self.variables
353                .get(var_name)
354                .map(|v| {
355                    // Convert value to string
356                    if let Some(s) = v.as_str() {
357                        s.to_string()
358                    } else {
359                        v.to_string()
360                    }
361                })
362                .unwrap_or_else(|| format!("{{{{{}}}}}", var_name)) // Keep original if not found
363        })
364        .to_string()
365    }
366
367    /// Substitute variables in a JSON value
368    fn substitute_variables_in_value(&self, value: &Value) -> Value {
369        match value {
370            Value::String(s) => Value::String(self.substitute_variables(s)),
371            Value::Object(map) => {
372                let mut new_map = serde_json::Map::new();
373                for (k, v) in map {
374                    new_map.insert(k.clone(), self.substitute_variables_in_value(v));
375                }
376                Value::Object(new_map)
377            }
378            Value::Array(arr) => {
379                Value::Array(arr.iter().map(|v| self.substitute_variables_in_value(v)).collect())
380            }
381            _ => value.clone(),
382        }
383    }
384
385    /// Evaluate a flow condition
386    fn evaluate_condition(&self, condition: &FlowCondition) -> Result<bool> {
387        // Substitute variables in expression
388        let expression = self.substitute_variables(&condition.expression);
389
390        // Get the value to compare (from variables or literal)
391        let left_value = if expression.starts_with("{{") && expression.ends_with("}}") {
392            // Extract variable name
393            let var_name = expression
394                .strip_prefix("{{")
395                .and_then(|s| s.strip_suffix("}}"))
396                .map(|s| s.trim());
397            var_name
398                .and_then(|name| self.variables.get(name))
399                .cloned()
400                .unwrap_or(Value::Null)
401        } else {
402            // Try to parse as JSON value
403            serde_json::from_str(&expression).unwrap_or(Value::String(expression))
404        };
405
406        let right_value = &condition.value;
407
408        // Apply operator
409        let result = match condition.operator {
410            ConditionOperator::Eq => left_value == *right_value,
411            ConditionOperator::Ne => left_value != *right_value,
412            ConditionOperator::Gt => self.compare_values(&left_value, right_value, |a, b| a > b),
413            ConditionOperator::Gte => self.compare_values(&left_value, right_value, |a, b| a >= b),
414            ConditionOperator::Lt => self.compare_values(&left_value, right_value, |a, b| a < b),
415            ConditionOperator::Lte => self.compare_values(&left_value, right_value, |a, b| a <= b),
416            ConditionOperator::Contains => {
417                if let (Some(left_str), Some(right_str)) =
418                    (left_value.as_str(), right_value.as_str())
419                {
420                    left_str.contains(right_str)
421                } else {
422                    false
423                }
424            }
425            ConditionOperator::NotContains => {
426                if let (Some(left_str), Some(right_str)) =
427                    (left_value.as_str(), right_value.as_str())
428                {
429                    !left_str.contains(right_str)
430                } else {
431                    true
432                }
433            }
434            ConditionOperator::Matches => {
435                if let (Some(left_str), Some(right_str)) =
436                    (left_value.as_str(), right_value.as_str())
437                {
438                    Regex::new(right_str).map(|re| re.is_match(left_str)).unwrap_or(false)
439                } else {
440                    false
441                }
442            }
443            ConditionOperator::Exists => left_value != Value::Null,
444        };
445
446        Ok(result)
447    }
448
449    /// Compare two values numerically
450    fn compare_values<F>(&self, left: &Value, right: &Value, cmp: F) -> bool
451    where
452        F: Fn(f64, f64) -> bool,
453    {
454        match (left.as_f64(), right.as_f64()) {
455            (Some(l), Some(r)) => cmp(l, r),
456            _ => false,
457        }
458    }
459
460    /// Execute a loop step
461    ///
462    /// Loops execute their child steps multiple times based on loop configuration
463    /// stored in step metadata (e.g., "loop_count" or "loop_condition").
464    async fn execute_loop(
465        &mut self,
466        loop_step: &FlowStep,
467        flow: &FlowDefinition,
468    ) -> Result<Vec<FlowStepResult>> {
469        let mut all_results = Vec::new();
470
471        // Get loop configuration from metadata
472        let loop_count = loop_step.metadata.get("loop_count").and_then(|v| v.as_u64()).unwrap_or(1);
473
474        let loop_condition = loop_step.metadata.get("loop_condition");
475
476        // Find child steps (steps connected from this loop step)
477        let child_step_ids: Vec<String> = flow
478            .connections
479            .iter()
480            .filter(|c| c.from_step_id == loop_step.id)
481            .map(|c| c.to_step_id.clone())
482            .collect();
483
484        if child_step_ids.is_empty() {
485            return Ok(all_results);
486        }
487
488        // Execute loop iterations
489        for iteration in 0..loop_count {
490            // Set loop iteration variable
491            self.variables
492                .insert("loop_iteration".to_string(), serde_json::json!(iteration));
493            self.variables.insert("loop_index".to_string(), serde_json::json!(iteration));
494
495            // Check loop condition if specified
496            if let Some(condition_value) = loop_condition {
497                if let Some(condition_str) = condition_value.as_str() {
498                    // Evaluate condition (simplified - could be enhanced)
499                    let condition_result = self
500                        .evaluate_condition(&FlowCondition {
501                            expression: condition_str.to_string(),
502                            operator: ConditionOperator::Eq,
503                            value: Value::Bool(true),
504                        })
505                        .unwrap_or(false);
506
507                    if !condition_result {
508                        break; // Exit loop if condition fails
509                    }
510                }
511            }
512
513            // Execute child steps for this iteration
514            for child_step_id in &child_step_ids {
515                if let Some(child_step) = flow.steps.iter().find(|s| s.id == *child_step_id) {
516                    // Check if step condition is met
517                    if let Some(ref condition) = child_step.condition {
518                        if !self.evaluate_condition(condition)? {
519                            continue;
520                        }
521                    }
522
523                    let step_result = self.execute_step(child_step).await?;
524                    all_results.push(step_result);
525                }
526            }
527        }
528
529        Ok(all_results)
530    }
531
532    /// Execute a parallel step
533    ///
534    /// Parallel steps execute their child steps concurrently.
535    async fn execute_parallel(
536        &mut self,
537        parallel_step: &FlowStep,
538        flow: &FlowDefinition,
539    ) -> Result<Vec<FlowStepResult>> {
540        // Find child steps (steps connected from this parallel step)
541        let child_step_ids: Vec<String> = flow
542            .connections
543            .iter()
544            .filter(|c| c.from_step_id == parallel_step.id)
545            .map(|c| c.to_step_id.clone())
546            .collect();
547
548        if child_step_ids.is_empty() {
549            return Ok(Vec::new());
550        }
551
552        // Collect child steps
553        let child_steps: Vec<&FlowStep> = child_step_ids
554            .iter()
555            .filter_map(|step_id| flow.steps.iter().find(|s| s.id == *step_id))
556            .collect();
557
558        // Execute all child steps in parallel using tokio::spawn
559        // Note: We need to clone variables for each parallel execution
560        let mut tasks = Vec::new();
561
562        for child_step in child_steps {
563            // Clone variables for this parallel branch
564            let variables_clone = self.variables.clone();
565            let step_clone = child_step.clone();
566            let http_client = self.http_client.clone();
567
568            // Create a task for this parallel step
569            let task = tokio::spawn(async move {
570                // Create a temporary executor for this parallel branch
571                let mut branch_executor = FlowExecutor {
572                    variables: variables_clone,
573                    http_client,
574                };
575
576                // Check condition if present
577                if let Some(ref condition) = step_clone.condition {
578                    match branch_executor.evaluate_condition(condition) {
579                        Ok(true) => {}
580                        Ok(false) => {
581                            return FlowStepResult {
582                                step_id: step_clone.id.clone(),
583                                success: false,
584                                response: None,
585                                error: Some("Condition not met".to_string()),
586                                duration_ms: 0,
587                                extracted_variables: HashMap::new(),
588                            };
589                        }
590                        Err(e) => {
591                            return FlowStepResult {
592                                step_id: step_clone.id.clone(),
593                                success: false,
594                                response: None,
595                                error: Some(format!("Condition evaluation error: {}", e)),
596                                duration_ms: 0,
597                                extracted_variables: HashMap::new(),
598                            };
599                        }
600                    }
601                }
602
603                // Execute the step
604                branch_executor
605                    .execute_step(&step_clone)
606                    .await
607                    .unwrap_or_else(|e| FlowStepResult {
608                        step_id: step_clone.id.clone(),
609                        success: false,
610                        response: None,
611                        error: Some(format!("Execution error: {}", e)),
612                        duration_ms: 0,
613                        extracted_variables: HashMap::new(),
614                    })
615            });
616
617            tasks.push(task);
618        }
619
620        // Wait for all parallel tasks to complete
621        let mut results = Vec::new();
622        for task in tasks {
623            match task.await {
624                Ok(result) => {
625                    results.push(result);
626                }
627                Err(e) => {
628                    results.push(FlowStepResult {
629                        step_id: "unknown".to_string(),
630                        success: false,
631                        response: None,
632                        error: Some(format!("Parallel task error: {}", e)),
633                        duration_ms: 0,
634                        extracted_variables: HashMap::new(),
635                    });
636                }
637            }
638        }
639
640        // Merge variables from all parallel branches
641        // (Last write wins for conflicts)
642        for result in &results {
643            for (key, value) in &result.extracted_variables {
644                self.variables.insert(key.clone(), value.clone());
645            }
646        }
647
648        Ok(results)
649    }
650
651    /// Extract a value from a JSON object using a path expression
652    fn extract_value(&self, json: &Value, path: &str) -> Option<Value> {
653        // Simple path extraction (e.g., "body.id" or "status")
654        let parts: Vec<&str> = path.split('.').collect();
655        let mut current = json;
656
657        for part in parts {
658            match current {
659                Value::Object(map) => {
660                    current = map.get(part)?;
661                }
662                Value::Array(arr) => {
663                    let index: usize = part.parse().ok()?;
664                    current = arr.get(index)?;
665                }
666                _ => return None,
667            }
668        }
669
670        Some(current.clone())
671    }
672}
673
674impl Default for FlowExecutor {
675    fn default() -> Self {
676        Self::new()
677    }
678}