1use crate::error::{Error, Result};
6use crate::scenario_studio::types::{
7 ConditionOperator, FlowCondition, FlowDefinition, FlowStep, StepType,
8};
9use chrono::Utc;
10use regex::Regex;
11use reqwest::Client;
12use serde_json::Value;
13use std::collections::HashMap;
14
15#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
17pub struct FlowStepResult {
18 pub step_id: String,
20 pub success: bool,
22 pub response: Option<Value>,
24 pub error: Option<String>,
26 pub duration_ms: u64,
28 pub extracted_variables: HashMap<String, Value>,
30}
31
32#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
34pub struct FlowExecutionResult {
35 pub flow_id: String,
37 pub success: bool,
39 pub step_results: Vec<FlowStepResult>,
41 pub final_variables: HashMap<String, Value>,
43 pub total_duration_ms: u64,
45 pub error: Option<String>,
47}
48
49pub struct FlowExecutor {
54 variables: HashMap<String, Value>,
56 http_client: Client,
58}
59
60impl FlowExecutor {
61 pub fn new() -> Self {
63 Self {
64 variables: HashMap::new(),
65 http_client: Client::new(),
66 }
67 }
68
69 pub fn with_variables(variables: HashMap<String, Value>) -> Self {
71 Self {
72 variables,
73 http_client: Client::new(),
74 }
75 }
76
77 pub async fn execute(&mut self, flow: &FlowDefinition) -> Result<FlowExecutionResult> {
82 let start_time = Utc::now();
83 let mut step_results = Vec::new();
84 let mut executed_step_ids = std::collections::HashSet::new();
85 let mut current_step_ids = self.find_start_steps(flow);
86
87 for (key, value) in &flow.variables {
89 self.variables.insert(key.clone(), value.clone());
90 }
91
92 while !current_step_ids.is_empty() {
94 let mut next_step_ids = Vec::new();
95
96 for step_id in current_step_ids {
97 if executed_step_ids.contains(&step_id) {
98 continue; }
100
101 let step = flow
102 .steps
103 .iter()
104 .find(|s| s.id == step_id)
105 .ok_or_else(|| Error::validation(format!("Step {} not found", step_id)))?;
106
107 if let Some(ref condition) = step.condition {
109 if !self.evaluate_condition(condition)? {
110 continue; }
112 }
113
114 match step.step_type {
116 StepType::Loop => {
117 let loop_results = self.execute_loop(step, flow).await?;
119 step_results.extend(loop_results);
120 executed_step_ids.insert(step_id.clone());
121 }
122 StepType::Parallel => {
123 let parallel_results = self.execute_parallel(step, flow).await?;
125 step_results.extend(parallel_results);
126 executed_step_ids.insert(step_id.clone());
127 }
128 _ => {
129 let step_result = self.execute_step(step).await?;
131 step_results.push(step_result.clone());
132 executed_step_ids.insert(step_id.clone());
133 }
134 }
135
136 let connections = flow.connections.iter().filter(|c| c.from_step_id == step_id);
138
139 for connection in connections {
140 if let Some(ref condition) = connection.condition {
142 if !self.evaluate_condition(condition)? {
143 continue; }
145 }
146
147 if !executed_step_ids.contains(&connection.to_step_id) {
148 next_step_ids.push(connection.to_step_id.clone());
149 }
150 }
151 }
152
153 current_step_ids = next_step_ids;
154 }
155
156 let end_time = Utc::now();
157 let total_duration_ms = (end_time - start_time).num_milliseconds() as u64;
158
159 let success = step_results.iter().all(|r| r.success);
160
161 let error = if success {
163 None
164 } else {
165 step_results.iter().find_map(|r| r.error.as_ref()).cloned()
166 };
167
168 Ok(FlowExecutionResult {
169 flow_id: flow.id.clone(),
170 success,
171 step_results,
172 final_variables: self.variables.clone(),
173 total_duration_ms,
174 error,
175 })
176 }
177
178 fn find_start_steps(&self, flow: &FlowDefinition) -> Vec<String> {
180 let has_incoming: std::collections::HashSet<String> =
181 flow.connections.iter().map(|c| c.to_step_id.clone()).collect();
182
183 flow.steps
184 .iter()
185 .filter(|s| !has_incoming.contains(&s.id))
186 .map(|s| s.id.clone())
187 .collect()
188 }
189
190 async fn execute_step(&mut self, step: &FlowStep) -> Result<FlowStepResult> {
192 let start_time = Utc::now();
193 let mut extracted_variables = HashMap::new();
194
195 if let Some(delay_ms) = step.delay_ms {
197 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
198 }
199
200 let (success, response, error) = match step.step_type {
201 StepType::ApiCall => self.execute_api_call(step).await,
202 StepType::Condition => {
203 (true, None, None)
205 }
206 StepType::Delay => {
207 (true, None, None)
209 }
210 StepType::Loop => {
211 (false, None, Some("Loop steps must be handled at flow level".to_string()))
214 }
215 StepType::Parallel => {
216 (false, None, Some("Parallel steps must be handled at flow level".to_string()))
219 }
220 };
221
222 if let Some(ref resp) = response {
224 for (key, path) in &step.extract {
225 if let Some(value) = self.extract_value(resp, path) {
226 extracted_variables.insert(key.clone(), value.clone());
227 self.variables.insert(key.clone(), value);
228 }
229 }
230 }
231
232 let end_time = Utc::now();
233 let duration_ms = (end_time - start_time).num_milliseconds() as u64;
234
235 Ok(FlowStepResult {
236 step_id: step.id.clone(),
237 success,
238 response,
239 error,
240 duration_ms,
241 extracted_variables,
242 })
243 }
244
245 async fn execute_api_call(&self, step: &FlowStep) -> (bool, Option<Value>, Option<String>) {
247 let method = match step.method.as_ref() {
249 Some(m) => m,
250 None => {
251 return (false, None, Some("API call step missing method".to_string()));
252 }
253 };
254
255 let endpoint = match step.endpoint.as_ref() {
256 Some(e) => e,
257 None => {
258 return (false, None, Some("API call step missing endpoint".to_string()));
259 }
260 };
261
262 let endpoint = self.substitute_variables(endpoint);
264 let body = step.body.as_ref().map(|b| self.substitute_variables_in_value(b));
265
266 let method = match method.to_uppercase().as_str() {
268 "GET" => reqwest::Method::GET,
269 "POST" => reqwest::Method::POST,
270 "PUT" => reqwest::Method::PUT,
271 "PATCH" => reqwest::Method::PATCH,
272 "DELETE" => reqwest::Method::DELETE,
273 "HEAD" => reqwest::Method::HEAD,
274 "OPTIONS" => reqwest::Method::OPTIONS,
275 _ => {
276 return (false, None, Some(format!("Unsupported HTTP method: {}", method)));
277 }
278 };
279
280 let mut request = self.http_client.request(method, &endpoint);
281
282 for (key, value) in &step.headers {
284 let header_value = self.substitute_variables(value);
285 request = request.header(key, &header_value);
286 }
287
288 if let Some(ref body_value) = body {
290 if let Ok(json_body) = serde_json::to_string(body_value) {
291 request = request.header("Content-Type", "application/json").body(json_body);
292 }
293 }
294
295 match request.send().await {
297 Ok(response) => {
298 let status = response.status();
299 let status_code = status.as_u16();
300
301 if let Some(expected) = step.expected_status {
303 if status_code != expected {
304 return (
305 false,
306 Some(serde_json::json!({
307 "status": status_code,
308 "error": format!("Expected status {}, got {}", expected, status_code)
309 })),
310 Some(format!(
311 "Status code mismatch: expected {}, got {}",
312 expected, status_code
313 )),
314 );
315 }
316 }
317
318 let response_body = match response.text().await {
320 Ok(text) => {
321 serde_json::from_str(&text).unwrap_or_else(|_| {
323 serde_json::json!({
324 "body": text,
325 "status": status_code
326 })
327 })
328 }
329 Err(e) => {
330 return (false, None, Some(format!("Failed to read response body: {}", e)));
331 }
332 };
333
334 let full_response = serde_json::json!({
336 "status": status_code,
337 "headers": {}, "body": response_body
339 });
340
341 (true, Some(full_response), None)
342 }
343 Err(e) => (false, None, Some(format!("API call failed: {}", e))),
344 }
345 }
346
347 fn substitute_variables(&self, text: &str) -> String {
349 let re = Regex::new(r"\{\{([^}]+)\}\}").unwrap();
350 re.replace_all(text, |caps: ®ex::Captures| {
351 let var_name = caps.get(1).unwrap().as_str().trim();
352 self.variables
353 .get(var_name)
354 .map(|v| {
355 if let Some(s) = v.as_str() {
357 s.to_string()
358 } else {
359 v.to_string()
360 }
361 })
362 .unwrap_or_else(|| format!("{{{{{}}}}}", var_name)) })
364 .to_string()
365 }
366
367 fn substitute_variables_in_value(&self, value: &Value) -> Value {
369 match value {
370 Value::String(s) => Value::String(self.substitute_variables(s)),
371 Value::Object(map) => {
372 let mut new_map = serde_json::Map::new();
373 for (k, v) in map {
374 new_map.insert(k.clone(), self.substitute_variables_in_value(v));
375 }
376 Value::Object(new_map)
377 }
378 Value::Array(arr) => {
379 Value::Array(arr.iter().map(|v| self.substitute_variables_in_value(v)).collect())
380 }
381 _ => value.clone(),
382 }
383 }
384
385 fn evaluate_condition(&self, condition: &FlowCondition) -> Result<bool> {
387 let expression = self.substitute_variables(&condition.expression);
389
390 let left_value = if expression.starts_with("{{") && expression.ends_with("}}") {
392 let var_name = expression
394 .strip_prefix("{{")
395 .and_then(|s| s.strip_suffix("}}"))
396 .map(|s| s.trim());
397 var_name
398 .and_then(|name| self.variables.get(name))
399 .cloned()
400 .unwrap_or(Value::Null)
401 } else {
402 serde_json::from_str(&expression).unwrap_or(Value::String(expression))
404 };
405
406 let right_value = &condition.value;
407
408 let result = match condition.operator {
410 ConditionOperator::Eq => left_value == *right_value,
411 ConditionOperator::Ne => left_value != *right_value,
412 ConditionOperator::Gt => self.compare_values(&left_value, right_value, |a, b| a > b),
413 ConditionOperator::Gte => self.compare_values(&left_value, right_value, |a, b| a >= b),
414 ConditionOperator::Lt => self.compare_values(&left_value, right_value, |a, b| a < b),
415 ConditionOperator::Lte => self.compare_values(&left_value, right_value, |a, b| a <= b),
416 ConditionOperator::Contains => {
417 if let (Some(left_str), Some(right_str)) =
418 (left_value.as_str(), right_value.as_str())
419 {
420 left_str.contains(right_str)
421 } else {
422 false
423 }
424 }
425 ConditionOperator::NotContains => {
426 if let (Some(left_str), Some(right_str)) =
427 (left_value.as_str(), right_value.as_str())
428 {
429 !left_str.contains(right_str)
430 } else {
431 true
432 }
433 }
434 ConditionOperator::Matches => {
435 if let (Some(left_str), Some(right_str)) =
436 (left_value.as_str(), right_value.as_str())
437 {
438 Regex::new(right_str).map(|re| re.is_match(left_str)).unwrap_or(false)
439 } else {
440 false
441 }
442 }
443 ConditionOperator::Exists => left_value != Value::Null,
444 };
445
446 Ok(result)
447 }
448
449 fn compare_values<F>(&self, left: &Value, right: &Value, cmp: F) -> bool
451 where
452 F: Fn(f64, f64) -> bool,
453 {
454 match (left.as_f64(), right.as_f64()) {
455 (Some(l), Some(r)) => cmp(l, r),
456 _ => false,
457 }
458 }
459
460 async fn execute_loop(
465 &mut self,
466 loop_step: &FlowStep,
467 flow: &FlowDefinition,
468 ) -> Result<Vec<FlowStepResult>> {
469 let mut all_results = Vec::new();
470
471 let loop_count = loop_step.metadata.get("loop_count").and_then(|v| v.as_u64()).unwrap_or(1);
473
474 let loop_condition = loop_step.metadata.get("loop_condition");
475
476 let child_step_ids: Vec<String> = flow
478 .connections
479 .iter()
480 .filter(|c| c.from_step_id == loop_step.id)
481 .map(|c| c.to_step_id.clone())
482 .collect();
483
484 if child_step_ids.is_empty() {
485 return Ok(all_results);
486 }
487
488 for iteration in 0..loop_count {
490 self.variables
492 .insert("loop_iteration".to_string(), serde_json::json!(iteration));
493 self.variables.insert("loop_index".to_string(), serde_json::json!(iteration));
494
495 if let Some(condition_value) = loop_condition {
497 if let Some(condition_str) = condition_value.as_str() {
498 let condition_result = self
500 .evaluate_condition(&FlowCondition {
501 expression: condition_str.to_string(),
502 operator: ConditionOperator::Eq,
503 value: Value::Bool(true),
504 })
505 .unwrap_or(false);
506
507 if !condition_result {
508 break; }
510 }
511 }
512
513 for child_step_id in &child_step_ids {
515 if let Some(child_step) = flow.steps.iter().find(|s| s.id == *child_step_id) {
516 if let Some(ref condition) = child_step.condition {
518 if !self.evaluate_condition(condition)? {
519 continue;
520 }
521 }
522
523 let step_result = self.execute_step(child_step).await?;
524 all_results.push(step_result);
525 }
526 }
527 }
528
529 Ok(all_results)
530 }
531
532 async fn execute_parallel(
536 &mut self,
537 parallel_step: &FlowStep,
538 flow: &FlowDefinition,
539 ) -> Result<Vec<FlowStepResult>> {
540 let child_step_ids: Vec<String> = flow
542 .connections
543 .iter()
544 .filter(|c| c.from_step_id == parallel_step.id)
545 .map(|c| c.to_step_id.clone())
546 .collect();
547
548 if child_step_ids.is_empty() {
549 return Ok(Vec::new());
550 }
551
552 let child_steps: Vec<&FlowStep> = child_step_ids
554 .iter()
555 .filter_map(|step_id| flow.steps.iter().find(|s| s.id == *step_id))
556 .collect();
557
558 let mut tasks = Vec::new();
561
562 for child_step in child_steps {
563 let variables_clone = self.variables.clone();
565 let step_clone = child_step.clone();
566 let http_client = self.http_client.clone();
567
568 let task = tokio::spawn(async move {
570 let mut branch_executor = FlowExecutor {
572 variables: variables_clone,
573 http_client,
574 };
575
576 if let Some(ref condition) = step_clone.condition {
578 match branch_executor.evaluate_condition(condition) {
579 Ok(true) => {}
580 Ok(false) => {
581 return FlowStepResult {
582 step_id: step_clone.id.clone(),
583 success: false,
584 response: None,
585 error: Some("Condition not met".to_string()),
586 duration_ms: 0,
587 extracted_variables: HashMap::new(),
588 };
589 }
590 Err(e) => {
591 return FlowStepResult {
592 step_id: step_clone.id.clone(),
593 success: false,
594 response: None,
595 error: Some(format!("Condition evaluation error: {}", e)),
596 duration_ms: 0,
597 extracted_variables: HashMap::new(),
598 };
599 }
600 }
601 }
602
603 branch_executor
605 .execute_step(&step_clone)
606 .await
607 .unwrap_or_else(|e| FlowStepResult {
608 step_id: step_clone.id.clone(),
609 success: false,
610 response: None,
611 error: Some(format!("Execution error: {}", e)),
612 duration_ms: 0,
613 extracted_variables: HashMap::new(),
614 })
615 });
616
617 tasks.push(task);
618 }
619
620 let mut results = Vec::new();
622 for task in tasks {
623 match task.await {
624 Ok(result) => {
625 results.push(result);
626 }
627 Err(e) => {
628 results.push(FlowStepResult {
629 step_id: "unknown".to_string(),
630 success: false,
631 response: None,
632 error: Some(format!("Parallel task error: {}", e)),
633 duration_ms: 0,
634 extracted_variables: HashMap::new(),
635 });
636 }
637 }
638 }
639
640 for result in &results {
643 for (key, value) in &result.extracted_variables {
644 self.variables.insert(key.clone(), value.clone());
645 }
646 }
647
648 Ok(results)
649 }
650
651 fn extract_value(&self, json: &Value, path: &str) -> Option<Value> {
653 let parts: Vec<&str> = path.split('.').collect();
655 let mut current = json;
656
657 for part in parts {
658 match current {
659 Value::Object(map) => {
660 current = map.get(part)?;
661 }
662 Value::Array(arr) => {
663 let index: usize = part.parse().ok()?;
664 current = arr.get(index)?;
665 }
666 _ => return None,
667 }
668 }
669
670 Some(current.clone())
671 }
672}
673
674impl Default for FlowExecutor {
675 fn default() -> Self {
676 Self::new()
677 }
678}
679
680#[cfg(test)]
681mod tests {
682 use super::*;
683 use serde_json::json;
684
685 #[test]
686 fn test_flow_step_result_creation() {
687 let mut extracted = HashMap::new();
688 extracted.insert("user_id".to_string(), json!("123"));
689
690 let result = FlowStepResult {
691 step_id: "step-1".to_string(),
692 success: true,
693 response: Some(json!({"status": "ok"})),
694 error: None,
695 duration_ms: 150,
696 extracted_variables: extracted.clone(),
697 };
698
699 assert_eq!(result.step_id, "step-1");
700 assert!(result.success);
701 assert!(result.response.is_some());
702 assert!(result.error.is_none());
703 assert_eq!(result.duration_ms, 150);
704 assert_eq!(result.extracted_variables.len(), 1);
705 }
706
707 #[test]
708 fn test_flow_step_result_with_error() {
709 let result = FlowStepResult {
710 step_id: "step-2".to_string(),
711 success: false,
712 response: None,
713 error: Some("Request failed".to_string()),
714 duration_ms: 50,
715 extracted_variables: HashMap::new(),
716 };
717
718 assert!(!result.success);
719 assert!(result.error.is_some());
720 assert_eq!(result.error.unwrap(), "Request failed");
721 }
722
723 #[test]
724 fn test_flow_execution_result_creation() {
725 let step_result = FlowStepResult {
726 step_id: "step-1".to_string(),
727 success: true,
728 response: None,
729 error: None,
730 duration_ms: 100,
731 extracted_variables: HashMap::new(),
732 };
733
734 let mut final_vars = HashMap::new();
735 final_vars.insert("result".to_string(), json!("success"));
736
737 let result = FlowExecutionResult {
738 flow_id: "flow-123".to_string(),
739 success: true,
740 step_results: vec![step_result],
741 final_variables: final_vars.clone(),
742 total_duration_ms: 200,
743 error: None,
744 };
745
746 assert_eq!(result.flow_id, "flow-123");
747 assert!(result.success);
748 assert_eq!(result.step_results.len(), 1);
749 assert_eq!(result.final_variables.len(), 1);
750 assert_eq!(result.total_duration_ms, 200);
751 }
752
753 #[test]
754 fn test_flow_execution_result_with_error() {
755 let step_result = FlowStepResult {
756 step_id: "step-1".to_string(),
757 success: false,
758 response: None,
759 error: Some("Step failed".to_string()),
760 duration_ms: 50,
761 extracted_variables: HashMap::new(),
762 };
763
764 let result = FlowExecutionResult {
765 flow_id: "flow-456".to_string(),
766 success: false,
767 step_results: vec![step_result],
768 final_variables: HashMap::new(),
769 total_duration_ms: 100,
770 error: Some("Flow execution failed".to_string()),
771 };
772
773 assert!(!result.success);
774 assert!(result.error.is_some());
775 }
776
777 #[test]
778 fn test_flow_executor_new() {
779 let executor = FlowExecutor::new();
780 let _ = executor;
782 }
783
784 #[test]
785 fn test_flow_executor_default() {
786 let executor = FlowExecutor::default();
787 let _ = executor;
789 }
790
791 #[test]
792 fn test_flow_executor_with_variables() {
793 let mut variables = HashMap::new();
794 variables.insert("api_key".to_string(), json!("secret123"));
795 variables.insert("base_url".to_string(), json!("https://api.example.com"));
796
797 let executor = FlowExecutor::with_variables(variables);
798 let _ = executor;
800 }
801
802 #[test]
803 fn test_flow_step_result_clone() {
804 let result1 = FlowStepResult {
805 step_id: "step-1".to_string(),
806 success: true,
807 response: Some(json!({"status": "ok"})),
808 error: None,
809 duration_ms: 100,
810 extracted_variables: HashMap::new(),
811 };
812 let result2 = result1.clone();
813 assert_eq!(result1.step_id, result2.step_id);
814 assert_eq!(result1.success, result2.success);
815 }
816
817 #[test]
818 fn test_flow_step_result_debug() {
819 let result = FlowStepResult {
820 step_id: "step-1".to_string(),
821 success: true,
822 response: None,
823 error: None,
824 duration_ms: 150,
825 extracted_variables: HashMap::new(),
826 };
827 let debug_str = format!("{:?}", result);
828 assert!(debug_str.contains("FlowStepResult"));
829 }
830
831 #[test]
832 fn test_flow_step_result_serialization() {
833 let result = FlowStepResult {
834 step_id: "step-1".to_string(),
835 success: true,
836 response: Some(json!({"data": "test"})),
837 error: None,
838 duration_ms: 200,
839 extracted_variables: HashMap::from([("var1".to_string(), json!("value1"))]),
840 };
841 let json = serde_json::to_string(&result).unwrap();
842 assert!(json.contains("step-1"));
843 assert!(json.contains("true"));
844 }
845
846 #[test]
847 fn test_flow_execution_result_clone() {
848 let result1 = FlowExecutionResult {
849 flow_id: "flow-1".to_string(),
850 success: true,
851 step_results: vec![],
852 final_variables: HashMap::new(),
853 total_duration_ms: 100,
854 error: None,
855 };
856 let result2 = result1.clone();
857 assert_eq!(result1.flow_id, result2.flow_id);
858 assert_eq!(result1.success, result2.success);
859 }
860
861 #[test]
862 fn test_flow_execution_result_debug() {
863 let result = FlowExecutionResult {
864 flow_id: "flow-123".to_string(),
865 success: false,
866 step_results: vec![],
867 final_variables: HashMap::new(),
868 total_duration_ms: 50,
869 error: Some("Error".to_string()),
870 };
871 let debug_str = format!("{:?}", result);
872 assert!(debug_str.contains("FlowExecutionResult"));
873 }
874
875 #[test]
876 fn test_flow_execution_result_serialization() {
877 let step_result = FlowStepResult {
878 step_id: "step-1".to_string(),
879 success: true,
880 response: Some(json!({"id": 1})),
881 error: None,
882 duration_ms: 100,
883 extracted_variables: HashMap::new(),
884 };
885 let result = FlowExecutionResult {
886 flow_id: "flow-456".to_string(),
887 success: true,
888 step_results: vec![step_result],
889 final_variables: HashMap::from([("result".to_string(), json!("success"))]),
890 total_duration_ms: 200,
891 error: None,
892 };
893 let json = serde_json::to_string(&result).unwrap();
894 assert!(json.contains("flow-456"));
895 assert!(json.contains("step-1"));
896 }
897
898 #[test]
899 fn test_flow_step_result_with_all_fields() {
900 let mut extracted = HashMap::new();
901 extracted.insert("user_id".to_string(), json!("123"));
902 extracted.insert("token".to_string(), json!("abc123"));
903 extracted.insert("expires_at".to_string(), json!("2024-01-01"));
904
905 let result = FlowStepResult {
906 step_id: "step-auth".to_string(),
907 success: true,
908 response: Some(json!({
909 "user": {"id": 123, "name": "Alice"},
910 "token": "abc123",
911 "expires_at": "2024-01-01"
912 })),
913 error: None,
914 duration_ms: 250,
915 extracted_variables: extracted.clone(),
916 };
917
918 assert_eq!(result.extracted_variables.len(), 3);
919 assert!(result.response.is_some());
920 assert_eq!(result.duration_ms, 250);
921 }
922
923 #[test]
924 fn test_flow_execution_result_with_multiple_steps() {
925 let step1 = FlowStepResult {
926 step_id: "step-1".to_string(),
927 success: true,
928 response: Some(json!({"id": 1})),
929 error: None,
930 duration_ms: 100,
931 extracted_variables: HashMap::from([("id".to_string(), json!(1))]),
932 };
933 let step2 = FlowStepResult {
934 step_id: "step-2".to_string(),
935 success: true,
936 response: Some(json!({"status": "ok"})),
937 error: None,
938 duration_ms: 150,
939 extracted_variables: HashMap::new(),
940 };
941 let step3 = FlowStepResult {
942 step_id: "step-3".to_string(),
943 success: true,
944 response: None,
945 error: None,
946 duration_ms: 50,
947 extracted_variables: HashMap::new(),
948 };
949
950 let result = FlowExecutionResult {
951 flow_id: "flow-multi".to_string(),
952 success: true,
953 step_results: vec![step1, step2, step3],
954 final_variables: HashMap::from([
955 ("id".to_string(), json!(1)),
956 ("status".to_string(), json!("ok")),
957 ]),
958 total_duration_ms: 300,
959 error: None,
960 };
961
962 assert_eq!(result.step_results.len(), 3);
963 assert_eq!(result.final_variables.len(), 2);
964 assert_eq!(result.total_duration_ms, 300);
965 }
966
967 #[test]
968 fn test_flow_step_result_with_extracted_variables() {
969 let mut extracted = HashMap::new();
970 extracted.insert("order_id".to_string(), json!("order-123"));
971 extracted.insert("total".to_string(), json!(99.99));
972 extracted.insert("currency".to_string(), json!("USD"));
973
974 let result = FlowStepResult {
975 step_id: "step-checkout".to_string(),
976 success: true,
977 response: Some(json!({
978 "order": {"id": "order-123", "total": 99.99, "currency": "USD"}
979 })),
980 error: None,
981 duration_ms: 300,
982 extracted_variables: extracted.clone(),
983 };
984
985 assert_eq!(result.extracted_variables.len(), 3);
986 assert_eq!(result.extracted_variables.get("order_id"), Some(&json!("order-123")));
987 }
988
989 #[test]
990 fn test_flow_execution_result_with_error_and_steps() {
991 let step1 = FlowStepResult {
992 step_id: "step-1".to_string(),
993 success: true,
994 response: Some(json!({"id": 1})),
995 error: None,
996 duration_ms: 100,
997 extracted_variables: HashMap::new(),
998 };
999 let step2 = FlowStepResult {
1000 step_id: "step-2".to_string(),
1001 success: false,
1002 response: None,
1003 error: Some("Connection timeout".to_string()),
1004 duration_ms: 5000,
1005 extracted_variables: HashMap::new(),
1006 };
1007
1008 let result = FlowExecutionResult {
1009 flow_id: "flow-error".to_string(),
1010 success: false,
1011 step_results: vec![step1, step2],
1012 final_variables: HashMap::new(),
1013 total_duration_ms: 5100,
1014 error: Some("Flow failed at step-2: Connection timeout".to_string()),
1015 };
1016
1017 assert!(!result.success);
1018 assert_eq!(result.step_results.len(), 2);
1019 assert!(result.error.is_some());
1020 assert_eq!(result.total_duration_ms, 5100);
1021 }
1022}