1use crate::request_chaining::{
7 ChainConfig, ChainDefinition, ChainExecutionContext, ChainLink, ChainResponse,
8 ChainTemplatingContext, RequestChainRegistry,
9};
10use crate::request_scripting::{ScriptContext, ScriptEngine};
11use crate::templating::{expand_str_with_context, TemplatingContext};
12use crate::{Error, Result};
13use chrono::Utc;
14use futures::future::join_all;
15use reqwest::{
16 header::{HeaderMap, HeaderName, HeaderValue},
17 Client, Method,
18};
19use serde_json::Value;
20use std::collections::{HashMap, HashSet};
21use std::str::FromStr;
22use std::sync::Arc;
23use tokio::sync::Mutex;
24use tokio::time::{timeout, Duration};
25
26#[derive(Debug, Clone)]
28pub struct ExecutionRecord {
29 pub executed_at: String,
31 pub result: ChainExecutionResult,
33}
34
35#[derive(Debug)]
37pub struct ChainExecutionEngine {
38 http_client: Client,
40 registry: Arc<RequestChainRegistry>,
42 config: ChainConfig,
44 execution_history: Arc<Mutex<HashMap<String, Vec<ExecutionRecord>>>>,
46 script_engine: ScriptEngine,
48}
49
50impl ChainExecutionEngine {
51 pub fn new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Self {
58 Self::try_new(registry, config)
59 .unwrap_or_else(|e| {
60 panic!(
61 "Failed to create HTTP client for chain execution engine: {}. \
62 This typically indicates a system configuration issue (e.g., invalid timeout value).",
63 e
64 )
65 })
66 }
67
68 pub fn try_new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Result<Self> {
72 let http_client = Client::builder()
73 .timeout(Duration::from_secs(config.global_timeout_secs))
74 .build()
75 .map_err(|e| {
76 Error::generic(format!(
77 "Failed to create HTTP client: {}. \
78 Check that the timeout value ({}) is valid.",
79 e, config.global_timeout_secs
80 ))
81 })?;
82
83 Ok(Self {
84 http_client,
85 registry,
86 config,
87 execution_history: Arc::new(Mutex::new(HashMap::new())),
88 script_engine: ScriptEngine::new(),
89 })
90 }
91
92 pub async fn execute_chain(
94 &self,
95 chain_id: &str,
96 variables: Option<Value>,
97 ) -> Result<ChainExecutionResult> {
98 let chain = self
99 .registry
100 .get_chain(chain_id)
101 .await
102 .ok_or_else(|| Error::generic(format!("Chain '{}' not found", chain_id)))?;
103
104 let result = self.execute_chain_definition(&chain, variables).await?;
105
106 let record = ExecutionRecord {
108 executed_at: Utc::now().to_rfc3339(),
109 result: result.clone(),
110 };
111
112 let mut history = self.execution_history.lock().await;
113 history.entry(chain_id.to_string()).or_insert_with(Vec::new).push(record);
114
115 Ok(result)
116 }
117
118 pub async fn get_chain_history(&self, chain_id: &str) -> Vec<ExecutionRecord> {
120 let history = self.execution_history.lock().await;
121 history.get(chain_id).cloned().unwrap_or_default()
122 }
123
124 pub async fn execute_chain_definition(
126 &self,
127 chain_def: &ChainDefinition,
128 variables: Option<Value>,
129 ) -> Result<ChainExecutionResult> {
130 self.registry.validate_chain(chain_def).await?;
132
133 let start_time = std::time::Instant::now();
134 let mut execution_context = ChainExecutionContext::new(chain_def.clone());
135
136 for (key, value) in &chain_def.variables {
138 execution_context
139 .templating
140 .chain_context
141 .set_variable(key.clone(), value.clone());
142 }
143
144 if let Some(Value::Object(map)) = variables {
146 for (key, value) in map {
147 execution_context.templating.chain_context.set_variable(key, value);
148 }
149 }
150
151 if self.config.enable_parallel_execution {
152 self.execute_with_parallelism(&mut execution_context).await
153 } else {
154 self.execute_sequential(&mut execution_context).await
155 }
156 .map(|_| ChainExecutionResult {
157 chain_id: chain_def.id.clone(),
158 status: ChainExecutionStatus::Successful,
159 total_duration_ms: start_time.elapsed().as_millis() as u64,
160 request_results: execution_context.templating.chain_context.responses.clone(),
161 error_message: None,
162 })
163 }
164
165 async fn execute_with_parallelism(
167 &self,
168 execution_context: &mut ChainExecutionContext,
169 ) -> Result<()> {
170 let dep_graph = self.build_dependency_graph(&execution_context.definition.links);
171 let topo_order = self.topological_sort(&dep_graph)?;
172
173 let mut level_groups = vec![];
175 let mut processed = HashSet::new();
176
177 for request_id in topo_order {
178 if !processed.contains(&request_id) {
179 let mut level = vec![];
180 self.collect_dependency_level(request_id, &dep_graph, &mut level, &mut processed);
181 level_groups.push(level);
182 }
183 }
184
185 for level in level_groups {
187 if level.len() == 1 {
188 let request_id = &level[0];
190 let link = execution_context
191 .definition
192 .links
193 .iter()
194 .find(|l| l.request.id == *request_id)
195 .ok_or_else(|| {
196 Error::generic(format!(
197 "Chain link not found for request_id '{}' during parallel execution",
198 request_id
199 ))
200 })?;
201
202 let link_clone = link.clone();
203 self.execute_request(&link_clone, execution_context).await?;
204 } else {
205 let tasks = level
207 .into_iter()
208 .filter_map(|request_id| {
209 let link = execution_context
210 .definition
211 .links
212 .iter()
213 .find(|l| l.request.id == request_id);
214 let link = match link {
215 Some(l) => l.clone(),
216 None => {
217 tracing::error!(
218 "Chain link not found for request_id '{}' during parallel execution",
219 request_id
220 );
221 return None;
222 }
223 };
224 let parallel_context = ChainExecutionContext {
226 definition: execution_context.definition.clone(),
227 templating: execution_context.templating.clone(),
228 start_time: std::time::Instant::now(),
229 config: execution_context.config.clone(),
230 };
231
232 let context = Arc::new(Mutex::new(parallel_context));
233 let engine =
234 ChainExecutionEngine::new(self.registry.clone(), self.config.clone());
235
236 Some(tokio::spawn(async move {
237 let mut ctx = context.lock().await;
238 engine.execute_request(&link, &mut ctx).await
239 }))
240 })
241 .collect::<Vec<_>>();
242
243 let results = join_all(tasks).await;
244 for result in results {
245 result
246 .map_err(|e| Error::generic(format!("Task join error: {}", e)))?
247 .map_err(|e| Error::generic(format!("Request execution error: {}", e)))?;
248 }
249 }
250 }
251
252 Ok(())
253 }
254
255 async fn execute_sequential(
257 &self,
258 execution_context: &mut ChainExecutionContext,
259 ) -> Result<()> {
260 let links = execution_context.definition.links.clone();
261 for link in &links {
262 self.execute_request(link, execution_context).await?;
263 }
264 Ok(())
265 }
266
267 async fn execute_request(
269 &self,
270 link: &ChainLink,
271 execution_context: &mut ChainExecutionContext,
272 ) -> Result<()> {
273 let request_start = std::time::Instant::now();
274
275 execution_context.templating.set_current_request(link.request.clone());
277
278 let method = Method::from_bytes(link.request.method.as_bytes()).map_err(|e| {
279 Error::generic(format!("Invalid HTTP method '{}': {}", link.request.method, e))
280 })?;
281
282 let url = self.expand_template(&link.request.url, &execution_context.templating);
283
284 let mut headers = HeaderMap::new();
286 for (key, value) in &link.request.headers {
287 let expanded_value = self.expand_template(value, &execution_context.templating);
288 let header_name = HeaderName::from_str(key)
289 .map_err(|e| Error::generic(format!("Invalid header name '{}': {}", key, e)))?;
290 let header_value = HeaderValue::from_str(&expanded_value).map_err(|e| {
291 Error::generic(format!("Invalid header value for '{}': {}", key, e))
292 })?;
293 headers.insert(header_name, header_value);
294 }
295
296 let mut request_builder = self.http_client.request(method, &url).headers(headers.clone());
298
299 if let Some(body) = &link.request.body {
301 match body {
302 crate::request_chaining::RequestBody::Json(json_value) => {
303 let expanded_body =
304 self.expand_template_in_json(json_value, &execution_context.templating);
305 request_builder = request_builder.json(&expanded_body);
306 }
307 crate::request_chaining::RequestBody::BinaryFile { path, content_type } => {
308 let templating_context =
310 TemplatingContext::with_chain(execution_context.templating.clone());
311
312 let expanded_path = expand_str_with_context(path, &templating_context);
314
315 let binary_body = crate::request_chaining::RequestBody::binary_file(
317 expanded_path,
318 content_type.clone(),
319 );
320
321 match binary_body.to_bytes().await {
323 Ok(file_bytes) => {
324 request_builder = request_builder.body(file_bytes);
325
326 if let Some(ct) = content_type {
328 let mut headers = headers.clone();
329 headers.insert(
330 "content-type",
331 ct.parse().unwrap_or_else(|_| {
332 HeaderValue::from_static("application/octet-stream")
333 }),
334 );
335 request_builder = request_builder.headers(headers);
336 }
337 }
338 Err(e) => {
339 return Err(e);
340 }
341 }
342 }
343 }
344 }
345
346 if let Some(timeout_secs) = link.request.timeout_secs {
348 request_builder = request_builder.timeout(Duration::from_secs(timeout_secs));
349 }
350
351 if let Some(scripting) = &link.request.scripting {
353 if let Some(pre_script) = &scripting.pre_script {
354 let script_context = ScriptContext {
355 request: Some(link.request.clone()),
356 response: None,
357 chain_context: execution_context.templating.chain_context.variables.clone(),
358 variables: HashMap::new(),
359 env_vars: std::env::vars().collect(),
360 };
361
362 match self
363 .script_engine
364 .execute_script(pre_script, &script_context, scripting.timeout_ms)
365 .await
366 {
367 Ok(script_result) => {
368 for (key, value) in script_result.modified_variables {
370 execution_context.templating.chain_context.set_variable(key, value);
371 }
372 }
373 Err(e) => {
374 tracing::warn!(
375 "Pre-script execution failed for request '{}': {}",
376 link.request.id,
377 e
378 );
379 }
381 }
382 }
383 }
384
385 let response_result =
387 timeout(Duration::from_secs(self.config.global_timeout_secs), request_builder.send())
388 .await;
389
390 let response = match response_result {
391 Ok(Ok(resp)) => resp,
392 Ok(Err(e)) => {
393 return Err(Error::generic(format!("Request '{}' failed: {}", link.request.id, e)));
394 }
395 Err(_) => {
396 return Err(Error::generic(format!("Request '{}' timed out", link.request.id)));
397 }
398 };
399
400 let status = response.status();
401 let headers: HashMap<String, String> = response
402 .headers()
403 .iter()
404 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
405 .collect();
406
407 let body_text = response.text().await.unwrap_or_default();
408 let body_json: Option<Value> = serde_json::from_str(&body_text).ok();
409
410 let duration_ms = request_start.elapsed().as_millis() as u64;
411 let executed_at = Utc::now().to_rfc3339();
412
413 let chain_response = ChainResponse {
414 status: status.as_u16(),
415 headers,
416 body: body_json,
417 duration_ms,
418 executed_at,
419 error: None,
420 };
421
422 if let Some(expected) = &link.request.expected_status {
424 if !expected.contains(&status.as_u16()) {
425 let error_msg = format!(
426 "Request '{}' returned status {} but expected one of {:?}",
427 link.request.id,
428 status.as_u16(),
429 expected
430 );
431 return Err(Error::generic(error_msg));
432 }
433 }
434
435 if let Some(store_name) = &link.store_as {
437 execution_context
438 .templating
439 .chain_context
440 .store_response(store_name.clone(), chain_response.clone());
441 }
442
443 for (var_name, extraction_path) in &link.extract {
445 if let Some(value) = self.extract_from_response(&chain_response, extraction_path) {
446 execution_context.templating.chain_context.set_variable(var_name.clone(), value);
447 }
448 }
449
450 if let Some(scripting) = &link.request.scripting {
452 if let Some(post_script) = &scripting.post_script {
453 let script_context = ScriptContext {
454 request: Some(link.request.clone()),
455 response: Some(chain_response.clone()),
456 chain_context: execution_context.templating.chain_context.variables.clone(),
457 variables: HashMap::new(),
458 env_vars: std::env::vars().collect(),
459 };
460
461 match self
462 .script_engine
463 .execute_script(post_script, &script_context, scripting.timeout_ms)
464 .await
465 {
466 Ok(script_result) => {
467 for (key, value) in script_result.modified_variables {
469 execution_context.templating.chain_context.set_variable(key, value);
470 }
471 }
472 Err(e) => {
473 tracing::warn!(
474 "Post-script execution failed for request '{}': {}",
475 link.request.id,
476 e
477 );
478 }
480 }
481 }
482 }
483
484 execution_context
486 .templating
487 .chain_context
488 .store_response(link.request.id.clone(), chain_response);
489
490 Ok(())
491 }
492
493 fn build_dependency_graph(&self, links: &[ChainLink]) -> HashMap<String, Vec<String>> {
495 let mut graph = HashMap::new();
496
497 for link in links {
498 graph
499 .entry(link.request.id.clone())
500 .or_insert_with(Vec::new)
501 .extend(link.request.depends_on.iter().cloned());
502 }
503
504 graph
505 }
506
507 fn topological_sort(&self, graph: &HashMap<String, Vec<String>>) -> Result<Vec<String>> {
509 let mut visited = HashSet::new();
510 let mut rec_stack = HashSet::new();
511 let mut result = Vec::new();
512
513 for node in graph.keys() {
514 if !visited.contains(node) {
515 self.topo_sort_util(node, graph, &mut visited, &mut rec_stack, &mut result)?;
516 }
517 }
518
519 result.reverse();
520 Ok(result)
521 }
522
523 #[allow(clippy::only_used_in_recursion)]
525 fn topo_sort_util(
526 &self,
527 node: &str,
528 graph: &HashMap<String, Vec<String>>,
529 visited: &mut HashSet<String>,
530 rec_stack: &mut HashSet<String>,
531 result: &mut Vec<String>,
532 ) -> Result<()> {
533 visited.insert(node.to_string());
534 rec_stack.insert(node.to_string());
535
536 if let Some(dependencies) = graph.get(node) {
537 for dep in dependencies {
538 if !visited.contains(dep) {
539 self.topo_sort_util(dep, graph, visited, rec_stack, result)?;
540 } else if rec_stack.contains(dep) {
541 return Err(Error::generic(format!(
542 "Circular dependency detected involving '{}'",
543 node
544 )));
545 }
546 }
547 }
548
549 rec_stack.remove(node);
550 result.push(node.to_string());
551 Ok(())
552 }
553
554 fn collect_dependency_level(
556 &self,
557 request_id: String,
558 _graph: &HashMap<String, Vec<String>>,
559 level: &mut Vec<String>,
560 processed: &mut HashSet<String>,
561 ) {
562 level.push(request_id.clone());
563 processed.insert(request_id);
564 }
565
566 fn expand_template(&self, template: &str, context: &ChainTemplatingContext) -> String {
568 let templating_context = TemplatingContext {
569 chain_context: Some(context.clone()),
570 env_context: None,
571 virtual_clock: None,
572 };
573 expand_str_with_context(template, &templating_context)
574 }
575
576 fn expand_template_in_json(&self, value: &Value, context: &ChainTemplatingContext) -> Value {
578 match value {
579 Value::String(s) => Value::String(self.expand_template(s, context)),
580 Value::Array(arr) => {
581 Value::Array(arr.iter().map(|v| self.expand_template_in_json(v, context)).collect())
582 }
583 Value::Object(map) => {
584 let mut new_map = serde_json::Map::new();
585 for (k, v) in map {
586 new_map.insert(
587 self.expand_template(k, context),
588 self.expand_template_in_json(v, context),
589 );
590 }
591 Value::Object(new_map)
592 }
593 _ => value.clone(),
594 }
595 }
596
597 fn extract_from_response(&self, response: &ChainResponse, path: &str) -> Option<Value> {
599 let parts: Vec<&str> = path.split('.').collect();
600
601 if parts.is_empty() || parts[0] != "body" {
602 return None;
603 }
604
605 let mut current = response.body.as_ref()?;
606
607 for part in &parts[1..] {
608 match current {
609 Value::Object(map) => {
610 current = map.get(*part)?;
611 }
612 Value::Array(arr) => {
613 if part.starts_with('[') && part.ends_with(']') {
614 let index_str = &part[1..part.len() - 1];
615 if let Ok(index) = index_str.parse::<usize>() {
616 current = arr.get(index)?;
617 } else {
618 return None;
619 }
620 } else {
621 return None;
622 }
623 }
624 _ => return None,
625 }
626 }
627
628 Some(current.clone())
629 }
630}
631
632#[derive(Debug, Clone)]
634pub struct ChainExecutionResult {
635 pub chain_id: String,
637 pub status: ChainExecutionStatus,
639 pub total_duration_ms: u64,
641 pub request_results: HashMap<String, ChainResponse>,
643 pub error_message: Option<String>,
645}
646
647#[derive(Debug, Clone, PartialEq)]
649pub enum ChainExecutionStatus {
650 Successful,
652 PartialSuccess,
654 Failed,
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use crate::request_chaining::{ChainRequest, ChainResponse};
662 use serde_json::json;
663 use std::sync::Arc;
664
665 fn create_test_engine() -> ChainExecutionEngine {
666 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
667 ChainExecutionEngine::new(registry, ChainConfig::default())
668 }
669
670 fn create_test_chain_response() -> ChainResponse {
671 ChainResponse {
672 status: 200,
673 headers: {
674 let mut h = HashMap::new();
675 h.insert("content-type".to_string(), "application/json".to_string());
676 h
677 },
678 body: Some(json!({
679 "user": {
680 "id": 123,
681 "name": "test",
682 "roles": ["admin", "user"]
683 },
684 "items": [
685 {"id": 1, "value": "a"},
686 {"id": 2, "value": "b"}
687 ]
688 })),
689 duration_ms: 50,
690 executed_at: "2024-01-15T10:00:00Z".to_string(),
691 error: None,
692 }
693 }
694
695 #[test]
697 fn test_execution_record_debug() {
698 let record = ExecutionRecord {
699 executed_at: "2024-01-15T10:00:00Z".to_string(),
700 result: ChainExecutionResult {
701 chain_id: "test-chain".to_string(),
702 status: ChainExecutionStatus::Successful,
703 total_duration_ms: 100,
704 request_results: HashMap::new(),
705 error_message: None,
706 },
707 };
708
709 let debug = format!("{:?}", record);
710 assert!(debug.contains("ExecutionRecord"));
711 assert!(debug.contains("executed_at"));
712 }
713
714 #[test]
715 fn test_execution_record_clone() {
716 let record = ExecutionRecord {
717 executed_at: "2024-01-15T10:00:00Z".to_string(),
718 result: ChainExecutionResult {
719 chain_id: "test-chain".to_string(),
720 status: ChainExecutionStatus::Successful,
721 total_duration_ms: 100,
722 request_results: HashMap::new(),
723 error_message: None,
724 },
725 };
726
727 let cloned = record.clone();
728 assert_eq!(cloned.executed_at, record.executed_at);
729 assert_eq!(cloned.result.chain_id, record.result.chain_id);
730 }
731
732 #[test]
734 fn test_chain_execution_result_debug() {
735 let result = ChainExecutionResult {
736 chain_id: "test-chain".to_string(),
737 status: ChainExecutionStatus::Successful,
738 total_duration_ms: 100,
739 request_results: HashMap::new(),
740 error_message: None,
741 };
742
743 let debug = format!("{:?}", result);
744 assert!(debug.contains("ChainExecutionResult"));
745 assert!(debug.contains("chain_id"));
746 }
747
748 #[test]
749 fn test_chain_execution_result_clone() {
750 let mut request_results = HashMap::new();
751 request_results.insert("req1".to_string(), create_test_chain_response());
752
753 let result = ChainExecutionResult {
754 chain_id: "test-chain".to_string(),
755 status: ChainExecutionStatus::Successful,
756 total_duration_ms: 100,
757 request_results,
758 error_message: Some("test error".to_string()),
759 };
760
761 let cloned = result.clone();
762 assert_eq!(cloned.chain_id, result.chain_id);
763 assert_eq!(cloned.total_duration_ms, result.total_duration_ms);
764 assert_eq!(cloned.error_message, result.error_message);
765 }
766
767 #[test]
769 fn test_chain_execution_status_debug() {
770 let status = ChainExecutionStatus::Successful;
771 let debug = format!("{:?}", status);
772 assert!(debug.contains("Successful"));
773
774 let status = ChainExecutionStatus::PartialSuccess;
775 let debug = format!("{:?}", status);
776 assert!(debug.contains("PartialSuccess"));
777
778 let status = ChainExecutionStatus::Failed;
779 let debug = format!("{:?}", status);
780 assert!(debug.contains("Failed"));
781 }
782
783 #[test]
784 fn test_chain_execution_status_clone() {
785 let status = ChainExecutionStatus::Successful;
786 let cloned = status.clone();
787 assert_eq!(cloned, ChainExecutionStatus::Successful);
788 }
789
790 #[test]
791 fn test_chain_execution_status_eq() {
792 assert_eq!(ChainExecutionStatus::Successful, ChainExecutionStatus::Successful);
793 assert_eq!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::PartialSuccess);
794 assert_eq!(ChainExecutionStatus::Failed, ChainExecutionStatus::Failed);
795
796 assert_ne!(ChainExecutionStatus::Successful, ChainExecutionStatus::Failed);
797 assert_ne!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::Successful);
798 }
799
800 #[tokio::test]
802 async fn test_engine_creation() {
803 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
804 let _engine = ChainExecutionEngine::new(registry, ChainConfig::default());
805
806 }
808
809 #[tokio::test]
810 async fn test_engine_try_new() {
811 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
812 let result = ChainExecutionEngine::try_new(registry, ChainConfig::default());
813 assert!(result.is_ok());
814 }
815
816 #[tokio::test]
817 async fn test_engine_debug() {
818 let engine = create_test_engine();
819 let debug = format!("{:?}", engine);
820 assert!(debug.contains("ChainExecutionEngine"));
821 }
822
823 #[tokio::test]
824 async fn test_topological_sort() {
825 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
826 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
827
828 let mut graph = HashMap::new();
829 graph.insert("A".to_string(), vec![]);
830 graph.insert("B".to_string(), vec!["A".to_string()]);
831 graph.insert("C".to_string(), vec!["A".to_string()]);
832 graph.insert("D".to_string(), vec!["B".to_string(), "C".to_string()]);
833
834 let topo_order = engine.topological_sort(&graph).unwrap();
835
836 let d_pos = topo_order.iter().position(|x| x == "D").unwrap();
841 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
842 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
843 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
844
845 assert!(d_pos < b_pos, "D should come before B");
846 assert!(d_pos < c_pos, "D should come before C");
847 assert!(b_pos < a_pos, "B should come before A");
848 assert!(c_pos < a_pos, "C should come before A");
849 assert_eq!(topo_order.len(), 4, "Should have all 4 nodes");
850 }
851
852 #[tokio::test]
853 async fn test_topological_sort_single_node() {
854 let engine = create_test_engine();
855
856 let mut graph = HashMap::new();
857 graph.insert("A".to_string(), vec![]);
858
859 let topo_order = engine.topological_sort(&graph).unwrap();
860 assert_eq!(topo_order, vec!["A".to_string()]);
861 }
862
863 #[tokio::test]
864 async fn test_topological_sort_linear_chain() {
865 let engine = create_test_engine();
866
867 let mut graph = HashMap::new();
868 graph.insert("A".to_string(), vec![]);
869 graph.insert("B".to_string(), vec!["A".to_string()]);
870 graph.insert("C".to_string(), vec!["B".to_string()]);
871
872 let topo_order = engine.topological_sort(&graph).unwrap();
873
874 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
875 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
876 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
877
878 assert!(c_pos < b_pos);
879 assert!(b_pos < a_pos);
880 }
881
882 #[tokio::test]
883 async fn test_topological_sort_empty_graph() {
884 let engine = create_test_engine();
885 let graph = HashMap::new();
886
887 let topo_order = engine.topological_sort(&graph).unwrap();
888 assert!(topo_order.is_empty());
889 }
890
891 #[tokio::test]
892 async fn test_circular_dependency_detection() {
893 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
894 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
895
896 let mut graph = HashMap::new();
897 graph.insert("A".to_string(), vec!["B".to_string()]);
898 graph.insert("B".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
901 assert!(result.is_err());
902 }
903
904 #[tokio::test]
905 async fn test_circular_dependency_self_reference() {
906 let engine = create_test_engine();
907
908 let mut graph = HashMap::new();
909 graph.insert("A".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
912 assert!(result.is_err());
913 }
914
915 #[tokio::test]
916 async fn test_circular_dependency_chain() {
917 let engine = create_test_engine();
918
919 let mut graph = HashMap::new();
920 graph.insert("A".to_string(), vec!["C".to_string()]);
921 graph.insert("B".to_string(), vec!["A".to_string()]);
922 graph.insert("C".to_string(), vec!["B".to_string()]); let result = engine.topological_sort(&graph);
925 assert!(result.is_err());
926 }
927
928 #[tokio::test]
929 async fn test_build_dependency_graph() {
930 let engine = create_test_engine();
931
932 let links = vec![
933 ChainLink {
934 request: ChainRequest {
935 id: "req1".to_string(),
936 method: "GET".to_string(),
937 url: "http://example.com/1".to_string(),
938 headers: HashMap::new(),
939 body: None,
940 depends_on: vec![],
941 timeout_secs: None,
942 expected_status: None,
943 scripting: None,
944 },
945 store_as: None,
946 extract: HashMap::new(),
947 },
948 ChainLink {
949 request: ChainRequest {
950 id: "req2".to_string(),
951 method: "GET".to_string(),
952 url: "http://example.com/2".to_string(),
953 headers: HashMap::new(),
954 body: None,
955 depends_on: vec!["req1".to_string()],
956 timeout_secs: None,
957 expected_status: None,
958 scripting: None,
959 },
960 store_as: None,
961 extract: HashMap::new(),
962 },
963 ChainLink {
964 request: ChainRequest {
965 id: "req3".to_string(),
966 method: "GET".to_string(),
967 url: "http://example.com/3".to_string(),
968 headers: HashMap::new(),
969 body: None,
970 depends_on: vec!["req1".to_string(), "req2".to_string()],
971 timeout_secs: None,
972 expected_status: None,
973 scripting: None,
974 },
975 store_as: None,
976 extract: HashMap::new(),
977 },
978 ];
979
980 let graph = engine.build_dependency_graph(&links);
981
982 assert!(graph.contains_key("req1"));
983 assert!(graph.contains_key("req2"));
984 assert!(graph.contains_key("req3"));
985 assert_eq!(graph.get("req1").unwrap().len(), 0);
986 assert_eq!(graph.get("req2").unwrap(), &vec!["req1".to_string()]);
987 assert_eq!(graph.get("req3").unwrap(), &vec!["req1".to_string(), "req2".to_string()]);
988 }
989
990 #[tokio::test]
992 async fn test_extract_from_response_simple_field() {
993 let engine = create_test_engine();
994 let response = create_test_chain_response();
995
996 let value = engine.extract_from_response(&response, "body.user.id");
997 assert!(value.is_some());
998 assert_eq!(value.unwrap(), json!(123));
999 }
1000
1001 #[tokio::test]
1002 async fn test_extract_from_response_nested_field() {
1003 let engine = create_test_engine();
1004 let response = create_test_chain_response();
1005
1006 let value = engine.extract_from_response(&response, "body.user.name");
1007 assert!(value.is_some());
1008 assert_eq!(value.unwrap(), json!("test"));
1009 }
1010
1011 #[tokio::test]
1012 async fn test_extract_from_response_array_element() {
1013 let engine = create_test_engine();
1014 let response = create_test_chain_response();
1015
1016 let value = engine.extract_from_response(&response, "body.items.[0].value");
1017 assert!(value.is_some());
1018 assert_eq!(value.unwrap(), json!("a"));
1019 }
1020
1021 #[tokio::test]
1022 async fn test_extract_from_response_array_element_second() {
1023 let engine = create_test_engine();
1024 let response = create_test_chain_response();
1025
1026 let value = engine.extract_from_response(&response, "body.items.[1].id");
1027 assert!(value.is_some());
1028 assert_eq!(value.unwrap(), json!(2));
1029 }
1030
1031 #[tokio::test]
1032 async fn test_extract_from_response_invalid_path() {
1033 let engine = create_test_engine();
1034 let response = create_test_chain_response();
1035
1036 let value = engine.extract_from_response(&response, "body.nonexistent");
1037 assert!(value.is_none());
1038 }
1039
1040 #[tokio::test]
1041 async fn test_extract_from_response_non_body_path() {
1042 let engine = create_test_engine();
1043 let response = create_test_chain_response();
1044
1045 let value = engine.extract_from_response(&response, "headers.content-type");
1046 assert!(value.is_none()); }
1048
1049 #[tokio::test]
1050 async fn test_extract_from_response_empty_path() {
1051 let engine = create_test_engine();
1052 let response = create_test_chain_response();
1053
1054 let value = engine.extract_from_response(&response, "");
1055 assert!(value.is_none());
1056 }
1057
1058 #[tokio::test]
1059 async fn test_extract_from_response_invalid_array_index() {
1060 let engine = create_test_engine();
1061 let response = create_test_chain_response();
1062
1063 let value = engine.extract_from_response(&response, "body.items.[invalid].value");
1064 assert!(value.is_none());
1065 }
1066
1067 #[tokio::test]
1068 async fn test_extract_from_response_array_out_of_bounds() {
1069 let engine = create_test_engine();
1070 let response = create_test_chain_response();
1071
1072 let value = engine.extract_from_response(&response, "body.items.[100].value");
1073 assert!(value.is_none());
1074 }
1075
1076 #[tokio::test]
1077 async fn test_extract_from_response_no_body() {
1078 let engine = create_test_engine();
1079 let response = ChainResponse {
1080 status: 200,
1081 headers: HashMap::new(),
1082 body: None,
1083 duration_ms: 50,
1084 executed_at: "2024-01-15T10:00:00Z".to_string(),
1085 error: None,
1086 };
1087
1088 let value = engine.extract_from_response(&response, "body.user.id");
1089 assert!(value.is_none());
1090 }
1091
1092 #[tokio::test]
1094 async fn test_expand_template_simple() {
1095 use crate::request_chaining::ChainContext;
1096 let engine = create_test_engine();
1097 let context = ChainTemplatingContext::new(ChainContext::new());
1098
1099 let result = engine.expand_template("hello world", &context);
1100 assert_eq!(result, "hello world");
1101 }
1102
1103 #[tokio::test]
1104 async fn test_expand_template_with_variable() {
1105 use crate::request_chaining::ChainContext;
1106 let engine = create_test_engine();
1107 let mut context = ChainTemplatingContext::new(ChainContext::new());
1108 context.chain_context.set_variable("name".to_string(), json!("test"));
1109
1110 let result = engine.expand_template("hello {{chain.name}}", &context);
1111 assert!(result.contains("hello"));
1113 }
1114
1115 #[tokio::test]
1117 async fn test_expand_template_in_json_string() {
1118 use crate::request_chaining::ChainContext;
1119 let engine = create_test_engine();
1120 let context = ChainTemplatingContext::new(ChainContext::new());
1121
1122 let input = json!("hello world");
1123 let result = engine.expand_template_in_json(&input, &context);
1124 assert_eq!(result, json!("hello world"));
1125 }
1126
1127 #[tokio::test]
1128 async fn test_expand_template_in_json_number() {
1129 use crate::request_chaining::ChainContext;
1130 let engine = create_test_engine();
1131 let context = ChainTemplatingContext::new(ChainContext::new());
1132
1133 let input = json!(42);
1134 let result = engine.expand_template_in_json(&input, &context);
1135 assert_eq!(result, json!(42));
1136 }
1137
1138 #[tokio::test]
1139 async fn test_expand_template_in_json_boolean() {
1140 use crate::request_chaining::ChainContext;
1141 let engine = create_test_engine();
1142 let context = ChainTemplatingContext::new(ChainContext::new());
1143
1144 let input = json!(true);
1145 let result = engine.expand_template_in_json(&input, &context);
1146 assert_eq!(result, json!(true));
1147 }
1148
1149 #[tokio::test]
1150 async fn test_expand_template_in_json_null() {
1151 use crate::request_chaining::ChainContext;
1152 let engine = create_test_engine();
1153 let context = ChainTemplatingContext::new(ChainContext::new());
1154
1155 let input = json!(null);
1156 let result = engine.expand_template_in_json(&input, &context);
1157 assert_eq!(result, json!(null));
1158 }
1159
1160 #[tokio::test]
1161 async fn test_expand_template_in_json_array() {
1162 use crate::request_chaining::ChainContext;
1163 let engine = create_test_engine();
1164 let context = ChainTemplatingContext::new(ChainContext::new());
1165
1166 let input = json!(["a", "b", "c"]);
1167 let result = engine.expand_template_in_json(&input, &context);
1168 assert_eq!(result, json!(["a", "b", "c"]));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_expand_template_in_json_object() {
1173 use crate::request_chaining::ChainContext;
1174 let engine = create_test_engine();
1175 let context = ChainTemplatingContext::new(ChainContext::new());
1176
1177 let input = json!({"key": "value", "nested": {"inner": "data"}});
1178 let result = engine.expand_template_in_json(&input, &context);
1179 assert_eq!(result, json!({"key": "value", "nested": {"inner": "data"}}));
1180 }
1181
1182 #[tokio::test]
1184 async fn test_get_chain_history_empty() {
1185 let engine = create_test_engine();
1186
1187 let history = engine.get_chain_history("nonexistent").await;
1188 assert!(history.is_empty());
1189 }
1190
1191 #[tokio::test]
1193 async fn test_collect_dependency_level() {
1194 let engine = create_test_engine();
1195 let graph = HashMap::new();
1196 let mut level = vec![];
1197 let mut processed = HashSet::new();
1198
1199 engine.collect_dependency_level("req1".to_string(), &graph, &mut level, &mut processed);
1200
1201 assert_eq!(level, vec!["req1".to_string()]);
1202 assert!(processed.contains("req1"));
1203 }
1204
1205 #[tokio::test]
1207 async fn test_execute_chain_not_found() {
1208 let engine = create_test_engine();
1209
1210 let result = engine.execute_chain("nonexistent", None).await;
1211 assert!(result.is_err());
1212 let err = result.unwrap_err().to_string();
1213 assert!(err.contains("not found"));
1214 }
1215
1216 #[tokio::test]
1218 async fn test_engine_with_custom_config() {
1219 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1220 let config = ChainConfig {
1221 enabled: true,
1222 max_chain_length: 50,
1223 global_timeout_secs: 60,
1224 enable_parallel_execution: false,
1225 };
1226
1227 let result = ChainExecutionEngine::try_new(registry, config);
1228 assert!(result.is_ok());
1229 }
1230
1231 #[tokio::test]
1232 async fn test_engine_with_default_config() {
1233 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1234 let config = ChainConfig::default();
1235
1236 let result = ChainExecutionEngine::try_new(registry, config);
1237 assert!(result.is_ok());
1238 }
1239}