mockforge_core/scenario_studio/
flow.rs

1//! Flow execution engine
2//!
3//! This module provides the execution engine for business flows defined in the Scenario Studio.
4
5use crate::error::{Error, Result};
6use crate::scenario_studio::types::{
7    FlowCondition, FlowConnection, FlowDefinition, FlowStep, StepType,
8};
9use chrono::Utc;
10use serde_json::Value;
11use std::collections::HashMap;
12
13/// Result of executing a flow step
14#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct FlowStepResult {
16    /// ID of the step that was executed
17    pub step_id: String,
18    /// Whether the step executed successfully
19    pub success: bool,
20    /// Response data (if applicable)
21    pub response: Option<Value>,
22    /// Error message (if execution failed)
23    pub error: Option<String>,
24    /// Duration in milliseconds
25    pub duration_ms: u64,
26    /// Variables extracted from the response
27    pub extracted_variables: HashMap<String, Value>,
28}
29
30/// Result of executing an entire flow
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct FlowExecutionResult {
33    /// ID of the flow that was executed
34    pub flow_id: String,
35    /// Whether the flow completed successfully
36    pub success: bool,
37    /// Results for each step that was executed
38    pub step_results: Vec<FlowStepResult>,
39    /// Final variables after flow execution
40    pub final_variables: HashMap<String, Value>,
41    /// Total duration in milliseconds
42    pub total_duration_ms: u64,
43    /// Error message (if execution failed)
44    pub error: Option<String>,
45}
46
47/// Flow execution engine
48///
49/// Executes business flows defined in the Scenario Studio, handling
50/// step sequencing, conditions, and variable extraction.
51pub struct FlowExecutor {
52    /// Variables available during execution
53    variables: HashMap<String, Value>,
54}
55
56impl FlowExecutor {
57    /// Create a new flow executor
58    pub fn new() -> Self {
59        Self {
60            variables: HashMap::new(),
61        }
62    }
63
64    /// Create a new flow executor with initial variables
65    pub fn with_variables(variables: HashMap<String, Value>) -> Self {
66        Self { variables }
67    }
68
69    /// Execute a flow definition
70    ///
71    /// This method executes the flow steps in order, following connections
72    /// and evaluating conditions to determine the execution path.
73    pub async fn execute(&mut self, flow: &FlowDefinition) -> Result<FlowExecutionResult> {
74        let start_time = Utc::now();
75        let mut step_results = Vec::new();
76        let mut executed_step_ids = std::collections::HashSet::new();
77        let mut current_step_ids = self.find_start_steps(flow);
78
79        // Initialize variables from flow
80        for (key, value) in &flow.variables {
81            self.variables.insert(key.clone(), value.clone());
82        }
83
84        // Execute flow until no more steps to execute
85        while !current_step_ids.is_empty() {
86            let mut next_step_ids = Vec::new();
87
88            for step_id in current_step_ids {
89                if executed_step_ids.contains(&step_id) {
90                    continue; // Skip already executed steps to prevent infinite loops
91                }
92
93                let step = flow
94                    .steps
95                    .iter()
96                    .find(|s| s.id == step_id)
97                    .ok_or_else(|| Error::validation(format!("Step {} not found", step_id)))?;
98
99                // Check if step condition is met
100                if let Some(ref condition) = step.condition {
101                    if !self.evaluate_condition(condition)? {
102                        continue; // Skip this step if condition is not met
103                    }
104                }
105
106                // Execute the step
107                let step_result = self.execute_step(step).await?;
108                step_results.push(step_result.clone());
109                executed_step_ids.insert(step_id.clone());
110
111                // Find next steps based on connections
112                let connections = flow.connections.iter().filter(|c| c.from_step_id == step_id);
113
114                for connection in connections {
115                    // Check connection condition if present
116                    if let Some(ref condition) = connection.condition {
117                        if !self.evaluate_condition(condition)? {
118                            continue; // Skip this connection if condition is not met
119                        }
120                    }
121
122                    if !executed_step_ids.contains(&connection.to_step_id) {
123                        next_step_ids.push(connection.to_step_id.clone());
124                    }
125                }
126            }
127
128            current_step_ids = next_step_ids;
129        }
130
131        let end_time = Utc::now();
132        let total_duration_ms = (end_time - start_time).num_milliseconds() as u64;
133
134        let success = step_results.iter().all(|r| r.success);
135
136        // Extract error before moving step_results
137        let error = if success {
138            None
139        } else {
140            step_results.iter().find_map(|r| r.error.as_ref()).map(|e| e.clone())
141        };
142
143        Ok(FlowExecutionResult {
144            flow_id: flow.id.clone(),
145            success,
146            step_results,
147            final_variables: self.variables.clone(),
148            total_duration_ms,
149            error,
150        })
151    }
152
153    /// Find the starting steps in a flow (steps with no incoming connections)
154    fn find_start_steps(&self, flow: &FlowDefinition) -> Vec<String> {
155        let mut has_incoming: std::collections::HashSet<String> =
156            flow.connections.iter().map(|c| c.to_step_id.clone()).collect();
157
158        flow.steps
159            .iter()
160            .filter(|s| !has_incoming.contains(&s.id))
161            .map(|s| s.id.clone())
162            .collect()
163    }
164
165    /// Execute a single flow step
166    async fn execute_step(&mut self, step: &FlowStep) -> Result<FlowStepResult> {
167        let start_time = Utc::now();
168        let mut extracted_variables = HashMap::new();
169
170        // Apply delay if specified
171        if let Some(delay_ms) = step.delay_ms {
172            tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
173        }
174
175        let (success, response, error) = match step.step_type {
176            StepType::ApiCall => {
177                // TODO: Implement actual API call execution
178                // For now, return a placeholder response
179                (
180                    true,
181                    Some(serde_json::json!({"status": "ok", "message": "API call executed"})),
182                    None,
183                )
184            }
185            StepType::Condition => {
186                // Conditions are evaluated before step execution
187                (true, None, None)
188            }
189            StepType::Delay => {
190                // Delay is already applied above
191                (true, None, None)
192            }
193            StepType::Loop => {
194                // TODO: Implement loop execution
195                (true, None, None)
196            }
197            StepType::Parallel => {
198                // TODO: Implement parallel execution
199                (true, None, None)
200            }
201        };
202
203        // Extract variables from response
204        if let Some(ref resp) = response {
205            for (key, path) in &step.extract {
206                if let Some(value) = self.extract_value(resp, path) {
207                    extracted_variables.insert(key.clone(), value.clone());
208                    self.variables.insert(key.clone(), value);
209                }
210            }
211        }
212
213        let end_time = Utc::now();
214        let duration_ms = (end_time - start_time).num_milliseconds() as u64;
215
216        Ok(FlowStepResult {
217            step_id: step.id.clone(),
218            success,
219            response,
220            error,
221            duration_ms,
222            extracted_variables,
223        })
224    }
225
226    /// Evaluate a flow condition
227    fn evaluate_condition(&self, condition: &FlowCondition) -> Result<bool> {
228        // TODO: Implement proper condition evaluation
229        // For now, return true for all conditions
230        // This should evaluate the expression against the current variables
231        Ok(true)
232    }
233
234    /// Extract a value from a JSON object using a path expression
235    fn extract_value(&self, json: &Value, path: &str) -> Option<Value> {
236        // Simple path extraction (e.g., "body.id" or "status")
237        let parts: Vec<&str> = path.split('.').collect();
238        let mut current = json;
239
240        for part in parts {
241            match current {
242                Value::Object(map) => {
243                    current = map.get(part)?;
244                }
245                Value::Array(arr) => {
246                    let index: usize = part.parse().ok()?;
247                    current = arr.get(index)?;
248                }
249                _ => return None,
250            }
251        }
252
253        Some(current.clone())
254    }
255}
256
257impl Default for FlowExecutor {
258    fn default() -> Self {
259        Self::new()
260    }
261}