mockforge_core/scenario_studio/
flow.rs1use crate::error::{Error, Result};
6use crate::scenario_studio::types::{
7 FlowCondition, FlowConnection, FlowDefinition, FlowStep, StepType,
8};
9use chrono::Utc;
10use serde_json::Value;
11use std::collections::HashMap;
12
13#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
15pub struct FlowStepResult {
16 pub step_id: String,
18 pub success: bool,
20 pub response: Option<Value>,
22 pub error: Option<String>,
24 pub duration_ms: u64,
26 pub extracted_variables: HashMap<String, Value>,
28}
29
30#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct FlowExecutionResult {
33 pub flow_id: String,
35 pub success: bool,
37 pub step_results: Vec<FlowStepResult>,
39 pub final_variables: HashMap<String, Value>,
41 pub total_duration_ms: u64,
43 pub error: Option<String>,
45}
46
47pub struct FlowExecutor {
52 variables: HashMap<String, Value>,
54}
55
56impl FlowExecutor {
57 pub fn new() -> Self {
59 Self {
60 variables: HashMap::new(),
61 }
62 }
63
64 pub fn with_variables(variables: HashMap<String, Value>) -> Self {
66 Self { variables }
67 }
68
69 pub async fn execute(&mut self, flow: &FlowDefinition) -> Result<FlowExecutionResult> {
74 let start_time = Utc::now();
75 let mut step_results = Vec::new();
76 let mut executed_step_ids = std::collections::HashSet::new();
77 let mut current_step_ids = self.find_start_steps(flow);
78
79 for (key, value) in &flow.variables {
81 self.variables.insert(key.clone(), value.clone());
82 }
83
84 while !current_step_ids.is_empty() {
86 let mut next_step_ids = Vec::new();
87
88 for step_id in current_step_ids {
89 if executed_step_ids.contains(&step_id) {
90 continue; }
92
93 let step = flow
94 .steps
95 .iter()
96 .find(|s| s.id == step_id)
97 .ok_or_else(|| Error::validation(format!("Step {} not found", step_id)))?;
98
99 if let Some(ref condition) = step.condition {
101 if !self.evaluate_condition(condition)? {
102 continue; }
104 }
105
106 let step_result = self.execute_step(step).await?;
108 step_results.push(step_result.clone());
109 executed_step_ids.insert(step_id.clone());
110
111 let connections = flow.connections.iter().filter(|c| c.from_step_id == step_id);
113
114 for connection in connections {
115 if let Some(ref condition) = connection.condition {
117 if !self.evaluate_condition(condition)? {
118 continue; }
120 }
121
122 if !executed_step_ids.contains(&connection.to_step_id) {
123 next_step_ids.push(connection.to_step_id.clone());
124 }
125 }
126 }
127
128 current_step_ids = next_step_ids;
129 }
130
131 let end_time = Utc::now();
132 let total_duration_ms = (end_time - start_time).num_milliseconds() as u64;
133
134 let success = step_results.iter().all(|r| r.success);
135
136 let error = if success {
138 None
139 } else {
140 step_results.iter().find_map(|r| r.error.as_ref()).map(|e| e.clone())
141 };
142
143 Ok(FlowExecutionResult {
144 flow_id: flow.id.clone(),
145 success,
146 step_results,
147 final_variables: self.variables.clone(),
148 total_duration_ms,
149 error,
150 })
151 }
152
153 fn find_start_steps(&self, flow: &FlowDefinition) -> Vec<String> {
155 let mut has_incoming: std::collections::HashSet<String> =
156 flow.connections.iter().map(|c| c.to_step_id.clone()).collect();
157
158 flow.steps
159 .iter()
160 .filter(|s| !has_incoming.contains(&s.id))
161 .map(|s| s.id.clone())
162 .collect()
163 }
164
165 async fn execute_step(&mut self, step: &FlowStep) -> Result<FlowStepResult> {
167 let start_time = Utc::now();
168 let mut extracted_variables = HashMap::new();
169
170 if let Some(delay_ms) = step.delay_ms {
172 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
173 }
174
175 let (success, response, error) = match step.step_type {
176 StepType::ApiCall => {
177 (
180 true,
181 Some(serde_json::json!({"status": "ok", "message": "API call executed"})),
182 None,
183 )
184 }
185 StepType::Condition => {
186 (true, None, None)
188 }
189 StepType::Delay => {
190 (true, None, None)
192 }
193 StepType::Loop => {
194 (true, None, None)
196 }
197 StepType::Parallel => {
198 (true, None, None)
200 }
201 };
202
203 if let Some(ref resp) = response {
205 for (key, path) in &step.extract {
206 if let Some(value) = self.extract_value(resp, path) {
207 extracted_variables.insert(key.clone(), value.clone());
208 self.variables.insert(key.clone(), value);
209 }
210 }
211 }
212
213 let end_time = Utc::now();
214 let duration_ms = (end_time - start_time).num_milliseconds() as u64;
215
216 Ok(FlowStepResult {
217 step_id: step.id.clone(),
218 success,
219 response,
220 error,
221 duration_ms,
222 extracted_variables,
223 })
224 }
225
226 fn evaluate_condition(&self, condition: &FlowCondition) -> Result<bool> {
228 Ok(true)
232 }
233
234 fn extract_value(&self, json: &Value, path: &str) -> Option<Value> {
236 let parts: Vec<&str> = path.split('.').collect();
238 let mut current = json;
239
240 for part in parts {
241 match current {
242 Value::Object(map) => {
243 current = map.get(part)?;
244 }
245 Value::Array(arr) => {
246 let index: usize = part.parse().ok()?;
247 current = arr.get(index)?;
248 }
249 _ => return None,
250 }
251 }
252
253 Some(current.clone())
254 }
255}
256
257impl Default for FlowExecutor {
258 fn default() -> Self {
259 Self::new()
260 }
261}