1use crate::error::{Error, Result};
6use crate::scenario_studio::types::{
7 ConditionOperator, FlowCondition, FlowDefinition, FlowStep, StepType,
8};
9use chrono::Utc;
10use regex::Regex;
11use reqwest::Client;
12use serde_json::Value;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct FlowStepResult {
18 pub step_id: String,
20 pub success: bool,
22 pub response: Option<Value>,
24 pub error: Option<String>,
26 pub duration_ms: u64,
28 pub extracted_variables: HashMap<String, Value>,
30}
31
32#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
34pub struct FlowExecutionResult {
35 pub flow_id: String,
37 pub success: bool,
39 pub step_results: Vec<FlowStepResult>,
41 pub final_variables: HashMap<String, Value>,
43 pub total_duration_ms: u64,
45 pub error: Option<String>,
47}
48
49pub struct FlowExecutor {
54 variables: HashMap<String, Value>,
56 http_client: Client,
58}
59
60impl FlowExecutor {
61 pub fn new() -> Self {
63 Self {
64 variables: HashMap::new(),
65 http_client: Client::new(),
66 }
67 }
68
69 pub fn with_variables(variables: HashMap<String, Value>) -> Self {
71 Self {
72 variables,
73 http_client: Client::new(),
74 }
75 }
76
77 pub async fn execute(&mut self, flow: &FlowDefinition) -> Result<FlowExecutionResult> {
82 let start_time = Utc::now();
83 let mut step_results = Vec::new();
84 let mut executed_step_ids = std::collections::HashSet::new();
85 let mut current_step_ids = self.find_start_steps(flow);
86
87 for (key, value) in &flow.variables {
89 self.variables.insert(key.clone(), value.clone());
90 }
91
92 while !current_step_ids.is_empty() {
94 let mut next_step_ids = Vec::new();
95
96 for step_id in current_step_ids {
97 if executed_step_ids.contains(&step_id) {
98 continue; }
100
101 let step = flow
102 .steps
103 .iter()
104 .find(|s| s.id == step_id)
105 .ok_or_else(|| Error::validation(format!("Step {} not found", step_id)))?;
106
107 if let Some(ref condition) = step.condition {
109 if !self.evaluate_condition(condition)? {
110 continue; }
112 }
113
114 match step.step_type {
116 StepType::Loop => {
117 let loop_results = self.execute_loop(step, flow).await?;
119 step_results.extend(loop_results);
120 executed_step_ids.insert(step_id.clone());
121 }
122 StepType::Parallel => {
123 let parallel_results = self.execute_parallel(step, flow).await?;
125 step_results.extend(parallel_results);
126 executed_step_ids.insert(step_id.clone());
127 }
128 _ => {
129 let step_result = self.execute_step(step).await?;
131 step_results.push(step_result.clone());
132 executed_step_ids.insert(step_id.clone());
133 }
134 }
135
136 let connections = flow.connections.iter().filter(|c| c.from_step_id == step_id);
138
139 for connection in connections {
140 if let Some(ref condition) = connection.condition {
142 if !self.evaluate_condition(condition)? {
143 continue; }
145 }
146
147 if !executed_step_ids.contains(&connection.to_step_id) {
148 next_step_ids.push(connection.to_step_id.clone());
149 }
150 }
151 }
152
153 current_step_ids = next_step_ids;
154 }
155
156 let end_time = Utc::now();
157 let total_duration_ms = (end_time - start_time).num_milliseconds() as u64;
158
159 let success = step_results.iter().all(|r| r.success);
160
161 let error = if success {
163 None
164 } else {
165 step_results.iter().find_map(|r| r.error.as_ref()).cloned()
166 };
167
168 Ok(FlowExecutionResult {
169 flow_id: flow.id.clone(),
170 success,
171 step_results,
172 final_variables: self.variables.clone(),
173 total_duration_ms,
174 error,
175 })
176 }
177
178 fn find_start_steps(&self, flow: &FlowDefinition) -> Vec<String> {
180 let has_incoming: std::collections::HashSet<String> =
181 flow.connections.iter().map(|c| c.to_step_id.clone()).collect();
182
183 flow.steps
184 .iter()
185 .filter(|s| !has_incoming.contains(&s.id))
186 .map(|s| s.id.clone())
187 .collect()
188 }
189
190 async fn execute_step(&mut self, step: &FlowStep) -> Result<FlowStepResult> {
192 let start_time = Utc::now();
193 let mut extracted_variables = HashMap::new();
194
195 if let Some(delay_ms) = step.delay_ms {
197 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
198 }
199
200 let (success, response, error) = match step.step_type {
201 StepType::ApiCall => self.execute_api_call(step).await,
202 StepType::Condition => {
203 (true, None, None)
205 }
206 StepType::Delay => {
207 (true, None, None)
209 }
210 StepType::Loop => {
211 (false, None, Some("Loop steps must be handled at flow level".to_string()))
214 }
215 StepType::Parallel => {
216 (false, None, Some("Parallel steps must be handled at flow level".to_string()))
219 }
220 };
221
222 if let Some(ref resp) = response {
224 for (key, path) in &step.extract {
225 if let Some(value) = self.extract_value(resp, path) {
226 extracted_variables.insert(key.clone(), value.clone());
227 self.variables.insert(key.clone(), value);
228 }
229 }
230 }
231
232 let end_time = Utc::now();
233 let duration_ms = (end_time - start_time).num_milliseconds() as u64;
234
235 Ok(FlowStepResult {
236 step_id: step.id.clone(),
237 success,
238 response,
239 error,
240 duration_ms,
241 extracted_variables,
242 })
243 }
244
245 async fn execute_api_call(&self, step: &FlowStep) -> (bool, Option<Value>, Option<String>) {
247 let method = match step.method.as_ref() {
249 Some(m) => m,
250 None => {
251 return (false, None, Some("API call step missing method".to_string()));
252 }
253 };
254
255 let endpoint = match step.endpoint.as_ref() {
256 Some(e) => e,
257 None => {
258 return (false, None, Some("API call step missing endpoint".to_string()));
259 }
260 };
261
262 let endpoint = self.substitute_variables(endpoint);
264 let body = step.body.as_ref().map(|b| self.substitute_variables_in_value(b));
265
266 let method = match method.to_uppercase().as_str() {
268 "GET" => reqwest::Method::GET,
269 "POST" => reqwest::Method::POST,
270 "PUT" => reqwest::Method::PUT,
271 "PATCH" => reqwest::Method::PATCH,
272 "DELETE" => reqwest::Method::DELETE,
273 "HEAD" => reqwest::Method::HEAD,
274 "OPTIONS" => reqwest::Method::OPTIONS,
275 _ => {
276 return (false, None, Some(format!("Unsupported HTTP method: {}", method)));
277 }
278 };
279
280 let mut request = self.http_client.request(method, &endpoint);
281
282 for (key, value) in &step.headers {
284 let header_value = self.substitute_variables(value);
285 request = request.header(key, &header_value);
286 }
287
288 if let Some(ref body_value) = body {
290 if let Ok(json_body) = serde_json::to_string(body_value) {
291 request = request.header("Content-Type", "application/json").body(json_body);
292 }
293 }
294
295 match request.send().await {
297 Ok(response) => {
298 let status = response.status();
299 let status_code = status.as_u16();
300
301 if let Some(expected) = step.expected_status {
303 if status_code != expected {
304 return (
305 false,
306 Some(serde_json::json!({
307 "status": status_code,
308 "error": format!("Expected status {}, got {}", expected, status_code)
309 })),
310 Some(format!(
311 "Status code mismatch: expected {}, got {}",
312 expected, status_code
313 )),
314 );
315 }
316 }
317
318 let response_body = match response.text().await {
320 Ok(text) => {
321 serde_json::from_str(&text).unwrap_or_else(|_| {
323 serde_json::json!({
324 "body": text,
325 "status": status_code
326 })
327 })
328 }
329 Err(e) => {
330 return (false, None, Some(format!("Failed to read response body: {}", e)));
331 }
332 };
333
334 let full_response = serde_json::json!({
336 "status": status_code,
337 "headers": {}, "body": response_body
339 });
340
341 (true, Some(full_response), None)
342 }
343 Err(e) => (false, None, Some(format!("API call failed: {}", e))),
344 }
345 }
346
347 fn substitute_variables(&self, text: &str) -> String {
349 let re = Regex::new(r"\{\{([^}]+)\}\}").unwrap();
350 re.replace_all(text, |caps: ®ex::Captures| {
351 let var_name = caps.get(1).unwrap().as_str().trim();
352 self.variables
353 .get(var_name)
354 .map(|v| {
355 if let Some(s) = v.as_str() {
357 s.to_string()
358 } else {
359 v.to_string()
360 }
361 })
362 .unwrap_or_else(|| format!("{{{{{}}}}}", var_name)) })
364 .to_string()
365 }
366
367 fn substitute_variables_in_value(&self, value: &Value) -> Value {
369 match value {
370 Value::String(s) => Value::String(self.substitute_variables(s)),
371 Value::Object(map) => {
372 let mut new_map = serde_json::Map::new();
373 for (k, v) in map {
374 new_map.insert(k.clone(), self.substitute_variables_in_value(v));
375 }
376 Value::Object(new_map)
377 }
378 Value::Array(arr) => {
379 Value::Array(arr.iter().map(|v| self.substitute_variables_in_value(v)).collect())
380 }
381 _ => value.clone(),
382 }
383 }
384
385 fn evaluate_condition(&self, condition: &FlowCondition) -> Result<bool> {
387 let expression = self.substitute_variables(&condition.expression);
389
390 let left_value = if expression.starts_with("{{") && expression.ends_with("}}") {
392 let var_name = expression
394 .strip_prefix("{{")
395 .and_then(|s| s.strip_suffix("}}"))
396 .map(|s| s.trim());
397 var_name
398 .and_then(|name| self.variables.get(name))
399 .cloned()
400 .unwrap_or(Value::Null)
401 } else {
402 serde_json::from_str(&expression).unwrap_or(Value::String(expression))
404 };
405
406 let right_value = &condition.value;
407
408 let result = match condition.operator {
410 ConditionOperator::Eq => left_value == *right_value,
411 ConditionOperator::Ne => left_value != *right_value,
412 ConditionOperator::Gt => self.compare_values(&left_value, right_value, |a, b| a > b),
413 ConditionOperator::Gte => self.compare_values(&left_value, right_value, |a, b| a >= b),
414 ConditionOperator::Lt => self.compare_values(&left_value, right_value, |a, b| a < b),
415 ConditionOperator::Lte => self.compare_values(&left_value, right_value, |a, b| a <= b),
416 ConditionOperator::Contains => {
417 if let (Some(left_str), Some(right_str)) =
418 (left_value.as_str(), right_value.as_str())
419 {
420 left_str.contains(right_str)
421 } else {
422 false
423 }
424 }
425 ConditionOperator::NotContains => {
426 if let (Some(left_str), Some(right_str)) =
427 (left_value.as_str(), right_value.as_str())
428 {
429 !left_str.contains(right_str)
430 } else {
431 true
432 }
433 }
434 ConditionOperator::Matches => {
435 if let (Some(left_str), Some(right_str)) =
436 (left_value.as_str(), right_value.as_str())
437 {
438 Regex::new(right_str).map(|re| re.is_match(left_str)).unwrap_or(false)
439 } else {
440 false
441 }
442 }
443 ConditionOperator::Exists => left_value != Value::Null,
444 };
445
446 Ok(result)
447 }
448
449 fn compare_values<F>(&self, left: &Value, right: &Value, cmp: F) -> bool
451 where
452 F: Fn(f64, f64) -> bool,
453 {
454 match (left.as_f64(), right.as_f64()) {
455 (Some(l), Some(r)) => cmp(l, r),
456 _ => false,
457 }
458 }
459
460 async fn execute_loop(
465 &mut self,
466 loop_step: &FlowStep,
467 flow: &FlowDefinition,
468 ) -> Result<Vec<FlowStepResult>> {
469 let mut all_results = Vec::new();
470
471 let loop_count = loop_step.metadata.get("loop_count").and_then(|v| v.as_u64()).unwrap_or(1);
473
474 let loop_condition = loop_step.metadata.get("loop_condition");
475
476 let child_step_ids: Vec<String> = flow
478 .connections
479 .iter()
480 .filter(|c| c.from_step_id == loop_step.id)
481 .map(|c| c.to_step_id.clone())
482 .collect();
483
484 if child_step_ids.is_empty() {
485 return Ok(all_results);
486 }
487
488 for iteration in 0..loop_count {
490 self.variables
492 .insert("loop_iteration".to_string(), serde_json::json!(iteration));
493 self.variables.insert("loop_index".to_string(), serde_json::json!(iteration));
494
495 if let Some(condition_value) = loop_condition {
497 if let Some(condition_str) = condition_value.as_str() {
498 let condition_result = self
500 .evaluate_condition(&FlowCondition {
501 expression: condition_str.to_string(),
502 operator: ConditionOperator::Eq,
503 value: Value::Bool(true),
504 })
505 .unwrap_or(false);
506
507 if !condition_result {
508 break; }
510 }
511 }
512
513 for child_step_id in &child_step_ids {
515 if let Some(child_step) = flow.steps.iter().find(|s| s.id == *child_step_id) {
516 if let Some(ref condition) = child_step.condition {
518 if !self.evaluate_condition(condition)? {
519 continue;
520 }
521 }
522
523 let step_result = self.execute_step(child_step).await?;
524 all_results.push(step_result);
525 }
526 }
527 }
528
529 Ok(all_results)
530 }
531
532 async fn execute_parallel(
536 &mut self,
537 parallel_step: &FlowStep,
538 flow: &FlowDefinition,
539 ) -> Result<Vec<FlowStepResult>> {
540 let child_step_ids: Vec<String> = flow
542 .connections
543 .iter()
544 .filter(|c| c.from_step_id == parallel_step.id)
545 .map(|c| c.to_step_id.clone())
546 .collect();
547
548 if child_step_ids.is_empty() {
549 return Ok(Vec::new());
550 }
551
552 let child_steps: Vec<&FlowStep> = child_step_ids
554 .iter()
555 .filter_map(|step_id| flow.steps.iter().find(|s| s.id == *step_id))
556 .collect();
557
558 let mut tasks = Vec::new();
561
562 for child_step in child_steps {
563 let variables_clone = self.variables.clone();
565 let step_clone = child_step.clone();
566 let http_client = self.http_client.clone();
567
568 let task = tokio::spawn(async move {
570 let mut branch_executor = FlowExecutor {
572 variables: variables_clone,
573 http_client,
574 };
575
576 if let Some(ref condition) = step_clone.condition {
578 match branch_executor.evaluate_condition(condition) {
579 Ok(true) => {}
580 Ok(false) => {
581 return FlowStepResult {
582 step_id: step_clone.id.clone(),
583 success: false,
584 response: None,
585 error: Some("Condition not met".to_string()),
586 duration_ms: 0,
587 extracted_variables: HashMap::new(),
588 };
589 }
590 Err(e) => {
591 return FlowStepResult {
592 step_id: step_clone.id.clone(),
593 success: false,
594 response: None,
595 error: Some(format!("Condition evaluation error: {}", e)),
596 duration_ms: 0,
597 extracted_variables: HashMap::new(),
598 };
599 }
600 }
601 }
602
603 branch_executor
605 .execute_step(&step_clone)
606 .await
607 .unwrap_or_else(|e| FlowStepResult {
608 step_id: step_clone.id.clone(),
609 success: false,
610 response: None,
611 error: Some(format!("Execution error: {}", e)),
612 duration_ms: 0,
613 extracted_variables: HashMap::new(),
614 })
615 });
616
617 tasks.push(task);
618 }
619
620 let mut results = Vec::new();
622 for task in tasks {
623 match task.await {
624 Ok(result) => {
625 results.push(result);
626 }
627 Err(e) => {
628 results.push(FlowStepResult {
629 step_id: "unknown".to_string(),
630 success: false,
631 response: None,
632 error: Some(format!("Parallel task error: {}", e)),
633 duration_ms: 0,
634 extracted_variables: HashMap::new(),
635 });
636 }
637 }
638 }
639
640 for result in &results {
643 for (key, value) in &result.extracted_variables {
644 self.variables.insert(key.clone(), value.clone());
645 }
646 }
647
648 Ok(results)
649 }
650
651 fn extract_value(&self, json: &Value, path: &str) -> Option<Value> {
653 let parts: Vec<&str> = path.split('.').collect();
655 let mut current = json;
656
657 for part in parts {
658 match current {
659 Value::Object(map) => {
660 current = map.get(part)?;
661 }
662 Value::Array(arr) => {
663 let index: usize = part.parse().ok()?;
664 current = arr.get(index)?;
665 }
666 _ => return None,
667 }
668 }
669
670 Some(current.clone())
671 }
672}
673
674impl Default for FlowExecutor {
675 fn default() -> Self {
676 Self::new()
677 }
678}