1use crate::request_chaining::{
7 ChainConfig, ChainDefinition, ChainExecutionContext, ChainLink, ChainResponse,
8 ChainTemplatingContext, RequestChainRegistry,
9};
10#[cfg(feature = "scripting")]
11use crate::request_scripting::{ScriptContext, ScriptEngine};
12use crate::templating::{expand_str_with_context, TemplatingContext};
13use crate::{Error, Result};
14use chrono::Utc;
15use futures::future::join_all;
16use reqwest::{
17 header::{HeaderMap, HeaderName, HeaderValue},
18 Client, Method,
19};
20use serde_json::Value;
21use std::collections::{HashMap, HashSet};
22use std::str::FromStr;
23use std::sync::Arc;
24use tokio::sync::Mutex;
25use tokio::time::{timeout, Duration};
26
27#[derive(Debug, Clone)]
29pub struct ExecutionRecord {
30 pub executed_at: String,
32 pub result: ChainExecutionResult,
34}
35
36#[derive(Debug)]
38pub struct ChainExecutionEngine {
39 http_client: Client,
41 registry: Arc<RequestChainRegistry>,
43 config: ChainConfig,
45 execution_history: Arc<Mutex<HashMap<String, Vec<ExecutionRecord>>>>,
47 #[cfg(feature = "scripting")]
49 script_engine: ScriptEngine,
50}
51
52impl ChainExecutionEngine {
53 pub fn new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Self {
60 Self::try_new(registry, config)
61 .unwrap_or_else(|e| {
62 panic!(
63 "Failed to create HTTP client for chain execution engine: {}. \
64 This typically indicates a system configuration issue (e.g., invalid timeout value).",
65 e
66 )
67 })
68 }
69
70 pub fn try_new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Result<Self> {
74 let http_client = Client::builder()
75 .timeout(Duration::from_secs(config.global_timeout_secs))
76 .build()
77 .map_err(|e| {
78 Error::internal(format!(
79 "Failed to create HTTP client: {}. \
80 Check that the timeout value ({}) is valid.",
81 e, config.global_timeout_secs
82 ))
83 })?;
84
85 Ok(Self {
86 http_client,
87 registry,
88 config,
89 execution_history: Arc::new(Mutex::new(HashMap::new())),
90 #[cfg(feature = "scripting")]
91 script_engine: ScriptEngine::new(),
92 })
93 }
94
95 pub async fn execute_chain(
97 &self,
98 chain_id: &str,
99 variables: Option<Value>,
100 ) -> Result<ChainExecutionResult> {
101 let chain = self
102 .registry
103 .get_chain(chain_id)
104 .await
105 .ok_or_else(|| Error::internal(format!("Chain '{}' not found", chain_id)))?;
106
107 let result = self.execute_chain_definition(&chain, variables).await?;
108
109 let record = ExecutionRecord {
111 executed_at: Utc::now().to_rfc3339(),
112 result: result.clone(),
113 };
114
115 let mut history = self.execution_history.lock().await;
116 history.entry(chain_id.to_string()).or_insert_with(Vec::new).push(record);
117
118 Ok(result)
119 }
120
121 pub async fn get_chain_history(&self, chain_id: &str) -> Vec<ExecutionRecord> {
123 let history = self.execution_history.lock().await;
124 history.get(chain_id).cloned().unwrap_or_default()
125 }
126
127 pub async fn execute_chain_definition(
129 &self,
130 chain_def: &ChainDefinition,
131 variables: Option<Value>,
132 ) -> Result<ChainExecutionResult> {
133 self.registry.validate_chain(chain_def).await?;
135
136 let start_time = std::time::Instant::now();
137 let mut execution_context = ChainExecutionContext::new(chain_def.clone());
138
139 for (key, value) in &chain_def.variables {
141 execution_context
142 .templating
143 .chain_context
144 .set_variable(key.clone(), value.clone());
145 }
146
147 if let Some(Value::Object(map)) = variables {
149 for (key, value) in map {
150 execution_context.templating.chain_context.set_variable(key, value);
151 }
152 }
153
154 if self.config.enable_parallel_execution {
155 self.execute_with_parallelism(&mut execution_context).await
156 } else {
157 self.execute_sequential(&mut execution_context).await
158 }
159 .map(|_| ChainExecutionResult {
160 chain_id: chain_def.id.clone(),
161 status: ChainExecutionStatus::Successful,
162 total_duration_ms: start_time.elapsed().as_millis() as u64,
163 request_results: execution_context.templating.chain_context.responses.clone(),
164 error_message: None,
165 })
166 }
167
168 async fn execute_with_parallelism(
170 &self,
171 execution_context: &mut ChainExecutionContext,
172 ) -> Result<()> {
173 let dep_graph = self.build_dependency_graph(&execution_context.definition.links);
174 let topo_order = self.topological_sort(&dep_graph)?;
175
176 let mut level_groups = vec![];
178 let mut processed = HashSet::new();
179
180 for request_id in topo_order {
181 if !processed.contains(&request_id) {
182 let mut level = vec![];
183 self.collect_dependency_level(request_id, &dep_graph, &mut level, &mut processed);
184 level_groups.push(level);
185 }
186 }
187
188 for level in level_groups {
190 if level.len() == 1 {
191 let request_id = &level[0];
193 let link = execution_context
194 .definition
195 .links
196 .iter()
197 .find(|l| l.request.id == *request_id)
198 .ok_or_else(|| {
199 Error::internal(format!(
200 "Chain link not found for request_id '{}' during parallel execution",
201 request_id
202 ))
203 })?;
204
205 let link_clone = link.clone();
206 self.execute_request(&link_clone, execution_context).await?;
207 } else {
208 let tasks = level
210 .into_iter()
211 .filter_map(|request_id| {
212 let link = execution_context
213 .definition
214 .links
215 .iter()
216 .find(|l| l.request.id == request_id);
217 let link = match link {
218 Some(l) => l.clone(),
219 None => {
220 tracing::error!(
221 "Chain link not found for request_id '{}' during parallel execution",
222 request_id
223 );
224 return None;
225 }
226 };
227 let parallel_context = ChainExecutionContext {
229 definition: execution_context.definition.clone(),
230 templating: execution_context.templating.clone(),
231 start_time: std::time::Instant::now(),
232 config: execution_context.config.clone(),
233 };
234
235 let context = Arc::new(Mutex::new(parallel_context));
236 let engine =
237 ChainExecutionEngine::new(self.registry.clone(), self.config.clone());
238
239 Some(tokio::spawn(async move {
240 let mut ctx = context.lock().await;
241 engine.execute_request(&link, &mut ctx).await
242 }))
243 })
244 .collect::<Vec<_>>();
245
246 let results = join_all(tasks).await;
247 for result in results {
248 result
249 .map_err(|e| Error::internal(format!("Task join error: {}", e)))?
250 .map_err(|e| Error::internal(format!("Request execution error: {}", e)))?;
251 }
252 }
253 }
254
255 Ok(())
256 }
257
258 async fn execute_sequential(
260 &self,
261 execution_context: &mut ChainExecutionContext,
262 ) -> Result<()> {
263 let links = execution_context.definition.links.clone();
264 for link in &links {
265 self.execute_request(link, execution_context).await?;
266 }
267 Ok(())
268 }
269
270 async fn execute_request(
272 &self,
273 link: &ChainLink,
274 execution_context: &mut ChainExecutionContext,
275 ) -> Result<()> {
276 let request_start = std::time::Instant::now();
277
278 execution_context.templating.set_current_request(link.request.clone());
280
281 let method = Method::from_bytes(link.request.method.as_bytes()).map_err(|e| {
282 Error::internal(format!("Invalid HTTP method '{}': {}", link.request.method, e))
283 })?;
284
285 let url = self.expand_template(&link.request.url, &execution_context.templating);
286
287 let mut headers = HeaderMap::new();
289 for (key, value) in &link.request.headers {
290 let expanded_value = self.expand_template(value, &execution_context.templating);
291 let header_name = HeaderName::from_str(key)
292 .map_err(|e| Error::internal(format!("Invalid header name '{}': {}", key, e)))?;
293 let header_value = HeaderValue::from_str(&expanded_value).map_err(|e| {
294 Error::internal(format!("Invalid header value for '{}': {}", key, e))
295 })?;
296 headers.insert(header_name, header_value);
297 }
298
299 let mut request_builder = self.http_client.request(method, &url).headers(headers.clone());
301
302 if let Some(body) = &link.request.body {
304 match body {
305 crate::request_chaining::RequestBody::Json(json_value) => {
306 let expanded_body =
307 self.expand_template_in_json(json_value, &execution_context.templating);
308 request_builder = request_builder.json(&expanded_body);
309 }
310 crate::request_chaining::RequestBody::BinaryFile { path, content_type } => {
311 let templating_context =
313 TemplatingContext::with_chain(execution_context.templating.clone());
314
315 let expanded_path = expand_str_with_context(path, &templating_context);
317
318 let binary_body = crate::request_chaining::RequestBody::binary_file(
320 expanded_path,
321 content_type.clone(),
322 );
323
324 match binary_body.to_bytes().await {
326 Ok(file_bytes) => {
327 request_builder = request_builder.body(file_bytes);
328
329 if let Some(ct) = content_type {
331 let mut headers = headers.clone();
332 headers.insert(
333 "content-type",
334 ct.parse().unwrap_or_else(|_| {
335 HeaderValue::from_static("application/octet-stream")
336 }),
337 );
338 request_builder = request_builder.headers(headers);
339 }
340 }
341 Err(e) => {
342 return Err(e);
343 }
344 }
345 }
346 }
347 }
348
349 if let Some(timeout_secs) = link.request.timeout_secs {
351 request_builder = request_builder.timeout(Duration::from_secs(timeout_secs));
352 }
353
354 #[cfg(feature = "scripting")]
356 if let Some(scripting) = &link.request.scripting {
357 if let Some(pre_script) = &scripting.pre_script {
358 let script_context = ScriptContext {
359 request: Some(link.request.clone()),
360 response: None,
361 chain_context: execution_context.templating.chain_context.variables.clone(),
362 variables: HashMap::new(),
363 env_vars: std::env::vars().collect(),
364 };
365
366 match self
367 .script_engine
368 .execute_script(pre_script, &script_context, scripting.timeout_ms)
369 .await
370 {
371 Ok(script_result) => {
372 for (key, value) in script_result.modified_variables {
374 execution_context.templating.chain_context.set_variable(key, value);
375 }
376 }
377 Err(e) => {
378 tracing::warn!(
379 "Pre-script execution failed for request '{}': {}",
380 link.request.id,
381 e
382 );
383 }
385 }
386 }
387 }
388
389 let response_result =
391 timeout(Duration::from_secs(self.config.global_timeout_secs), request_builder.send())
392 .await;
393
394 let response = match response_result {
395 Ok(Ok(resp)) => resp,
396 Ok(Err(e)) => {
397 return Err(Error::internal(format!(
398 "Request '{}' failed: {}",
399 link.request.id, e
400 )));
401 }
402 Err(_) => {
403 return Err(Error::internal(format!("Request '{}' timed out", link.request.id)));
404 }
405 };
406
407 let status = response.status();
408 let headers: HashMap<String, String> = response
409 .headers()
410 .iter()
411 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
412 .collect();
413
414 let body_text = response.text().await.unwrap_or_default();
415 let body_json: Option<Value> = serde_json::from_str(&body_text).ok();
416
417 let duration_ms = request_start.elapsed().as_millis() as u64;
418 let executed_at = Utc::now().to_rfc3339();
419
420 let chain_response = ChainResponse {
421 status: status.as_u16(),
422 headers,
423 body: body_json,
424 duration_ms,
425 executed_at,
426 error: None,
427 };
428
429 if let Some(expected) = &link.request.expected_status {
431 if !expected.contains(&status.as_u16()) {
432 let error_msg = format!(
433 "Request '{}' returned status {} but expected one of {:?}",
434 link.request.id,
435 status.as_u16(),
436 expected
437 );
438 return Err(Error::internal(error_msg));
439 }
440 }
441
442 if let Some(store_name) = &link.store_as {
444 execution_context
445 .templating
446 .chain_context
447 .store_response(store_name.clone(), chain_response.clone());
448 }
449
450 for (var_name, extraction_path) in &link.extract {
452 if let Some(value) = self.extract_from_response(&chain_response, extraction_path) {
453 execution_context.templating.chain_context.set_variable(var_name.clone(), value);
454 }
455 }
456
457 #[cfg(feature = "scripting")]
459 if let Some(scripting) = &link.request.scripting {
460 if let Some(post_script) = &scripting.post_script {
461 let script_context = ScriptContext {
462 request: Some(link.request.clone()),
463 response: Some(chain_response.clone()),
464 chain_context: execution_context.templating.chain_context.variables.clone(),
465 variables: HashMap::new(),
466 env_vars: std::env::vars().collect(),
467 };
468
469 match self
470 .script_engine
471 .execute_script(post_script, &script_context, scripting.timeout_ms)
472 .await
473 {
474 Ok(script_result) => {
475 for (key, value) in script_result.modified_variables {
477 execution_context.templating.chain_context.set_variable(key, value);
478 }
479 }
480 Err(e) => {
481 tracing::warn!(
482 "Post-script execution failed for request '{}': {}",
483 link.request.id,
484 e
485 );
486 }
488 }
489 }
490 }
491
492 execution_context
494 .templating
495 .chain_context
496 .store_response(link.request.id.clone(), chain_response);
497
498 Ok(())
499 }
500
501 fn build_dependency_graph(&self, links: &[ChainLink]) -> HashMap<String, Vec<String>> {
503 let mut graph = HashMap::new();
504
505 for link in links {
506 graph
507 .entry(link.request.id.clone())
508 .or_insert_with(Vec::new)
509 .extend(link.request.depends_on.iter().cloned());
510 }
511
512 graph
513 }
514
515 fn topological_sort(&self, graph: &HashMap<String, Vec<String>>) -> Result<Vec<String>> {
517 let mut visited = HashSet::new();
518 let mut rec_stack = HashSet::new();
519 let mut result = Vec::new();
520
521 for node in graph.keys() {
522 if !visited.contains(node) {
523 self.topo_sort_util(node, graph, &mut visited, &mut rec_stack, &mut result)?;
524 }
525 }
526
527 result.reverse();
528 Ok(result)
529 }
530
531 #[allow(clippy::only_used_in_recursion)]
533 fn topo_sort_util(
534 &self,
535 node: &str,
536 graph: &HashMap<String, Vec<String>>,
537 visited: &mut HashSet<String>,
538 rec_stack: &mut HashSet<String>,
539 result: &mut Vec<String>,
540 ) -> Result<()> {
541 visited.insert(node.to_string());
542 rec_stack.insert(node.to_string());
543
544 if let Some(dependencies) = graph.get(node) {
545 for dep in dependencies {
546 if !visited.contains(dep) {
547 self.topo_sort_util(dep, graph, visited, rec_stack, result)?;
548 } else if rec_stack.contains(dep) {
549 return Err(Error::internal(format!(
550 "Circular dependency detected involving '{}'",
551 node
552 )));
553 }
554 }
555 }
556
557 rec_stack.remove(node);
558 result.push(node.to_string());
559 Ok(())
560 }
561
562 fn collect_dependency_level(
564 &self,
565 request_id: String,
566 _graph: &HashMap<String, Vec<String>>,
567 level: &mut Vec<String>,
568 processed: &mut HashSet<String>,
569 ) {
570 level.push(request_id.clone());
571 processed.insert(request_id);
572 }
573
574 fn expand_template(&self, template: &str, context: &ChainTemplatingContext) -> String {
576 let templating_context = TemplatingContext {
577 chain_context: Some(context.clone()),
578 env_context: None,
579 virtual_clock: None,
580 };
581 expand_str_with_context(template, &templating_context)
582 }
583
584 fn expand_template_in_json(&self, value: &Value, context: &ChainTemplatingContext) -> Value {
586 match value {
587 Value::String(s) => Value::String(self.expand_template(s, context)),
588 Value::Array(arr) => {
589 Value::Array(arr.iter().map(|v| self.expand_template_in_json(v, context)).collect())
590 }
591 Value::Object(map) => {
592 let mut new_map = serde_json::Map::new();
593 for (k, v) in map {
594 new_map.insert(
595 self.expand_template(k, context),
596 self.expand_template_in_json(v, context),
597 );
598 }
599 Value::Object(new_map)
600 }
601 _ => value.clone(),
602 }
603 }
604
605 fn extract_from_response(&self, response: &ChainResponse, path: &str) -> Option<Value> {
607 let parts: Vec<&str> = path.split('.').collect();
608
609 if parts.is_empty() || parts[0] != "body" {
610 return None;
611 }
612
613 let mut current = response.body.as_ref()?;
614
615 for part in &parts[1..] {
616 match current {
617 Value::Object(map) => {
618 current = map.get(*part)?;
619 }
620 Value::Array(arr) => {
621 if part.starts_with('[') && part.ends_with(']') {
622 let index_str = &part[1..part.len() - 1];
623 if let Ok(index) = index_str.parse::<usize>() {
624 current = arr.get(index)?;
625 } else {
626 return None;
627 }
628 } else {
629 return None;
630 }
631 }
632 _ => return None,
633 }
634 }
635
636 Some(current.clone())
637 }
638}
639
640#[derive(Debug, Clone)]
642pub struct ChainExecutionResult {
643 pub chain_id: String,
645 pub status: ChainExecutionStatus,
647 pub total_duration_ms: u64,
649 pub request_results: HashMap<String, ChainResponse>,
651 pub error_message: Option<String>,
653}
654
655#[derive(Debug, Clone, PartialEq)]
657pub enum ChainExecutionStatus {
658 Successful,
660 PartialSuccess,
662 Failed,
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use crate::request_chaining::{ChainRequest, ChainResponse};
670 use serde_json::json;
671 use std::sync::Arc;
672
673 fn create_test_engine() -> ChainExecutionEngine {
674 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
675 ChainExecutionEngine::new(registry, ChainConfig::default())
676 }
677
678 fn create_test_chain_response() -> ChainResponse {
679 ChainResponse {
680 status: 200,
681 headers: {
682 let mut h = HashMap::new();
683 h.insert("content-type".to_string(), "application/json".to_string());
684 h
685 },
686 body: Some(json!({
687 "user": {
688 "id": 123,
689 "name": "test",
690 "roles": ["admin", "user"]
691 },
692 "items": [
693 {"id": 1, "value": "a"},
694 {"id": 2, "value": "b"}
695 ]
696 })),
697 duration_ms: 50,
698 executed_at: "2024-01-15T10:00:00Z".to_string(),
699 error: None,
700 }
701 }
702
703 #[test]
705 fn test_execution_record_debug() {
706 let record = ExecutionRecord {
707 executed_at: "2024-01-15T10:00:00Z".to_string(),
708 result: ChainExecutionResult {
709 chain_id: "test-chain".to_string(),
710 status: ChainExecutionStatus::Successful,
711 total_duration_ms: 100,
712 request_results: HashMap::new(),
713 error_message: None,
714 },
715 };
716
717 let debug = format!("{:?}", record);
718 assert!(debug.contains("ExecutionRecord"));
719 assert!(debug.contains("executed_at"));
720 }
721
722 #[test]
723 fn test_execution_record_clone() {
724 let record = ExecutionRecord {
725 executed_at: "2024-01-15T10:00:00Z".to_string(),
726 result: ChainExecutionResult {
727 chain_id: "test-chain".to_string(),
728 status: ChainExecutionStatus::Successful,
729 total_duration_ms: 100,
730 request_results: HashMap::new(),
731 error_message: None,
732 },
733 };
734
735 let cloned = record.clone();
736 assert_eq!(cloned.executed_at, record.executed_at);
737 assert_eq!(cloned.result.chain_id, record.result.chain_id);
738 }
739
740 #[test]
742 fn test_chain_execution_result_debug() {
743 let result = ChainExecutionResult {
744 chain_id: "test-chain".to_string(),
745 status: ChainExecutionStatus::Successful,
746 total_duration_ms: 100,
747 request_results: HashMap::new(),
748 error_message: None,
749 };
750
751 let debug = format!("{:?}", result);
752 assert!(debug.contains("ChainExecutionResult"));
753 assert!(debug.contains("chain_id"));
754 }
755
756 #[test]
757 fn test_chain_execution_result_clone() {
758 let mut request_results = HashMap::new();
759 request_results.insert("req1".to_string(), create_test_chain_response());
760
761 let result = ChainExecutionResult {
762 chain_id: "test-chain".to_string(),
763 status: ChainExecutionStatus::Successful,
764 total_duration_ms: 100,
765 request_results,
766 error_message: Some("test error".to_string()),
767 };
768
769 let cloned = result.clone();
770 assert_eq!(cloned.chain_id, result.chain_id);
771 assert_eq!(cloned.total_duration_ms, result.total_duration_ms);
772 assert_eq!(cloned.error_message, result.error_message);
773 }
774
775 #[test]
777 fn test_chain_execution_status_debug() {
778 let status = ChainExecutionStatus::Successful;
779 let debug = format!("{:?}", status);
780 assert!(debug.contains("Successful"));
781
782 let status = ChainExecutionStatus::PartialSuccess;
783 let debug = format!("{:?}", status);
784 assert!(debug.contains("PartialSuccess"));
785
786 let status = ChainExecutionStatus::Failed;
787 let debug = format!("{:?}", status);
788 assert!(debug.contains("Failed"));
789 }
790
791 #[test]
792 fn test_chain_execution_status_clone() {
793 let status = ChainExecutionStatus::Successful;
794 let cloned = status.clone();
795 assert_eq!(cloned, ChainExecutionStatus::Successful);
796 }
797
798 #[test]
799 fn test_chain_execution_status_eq() {
800 assert_eq!(ChainExecutionStatus::Successful, ChainExecutionStatus::Successful);
801 assert_eq!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::PartialSuccess);
802 assert_eq!(ChainExecutionStatus::Failed, ChainExecutionStatus::Failed);
803
804 assert_ne!(ChainExecutionStatus::Successful, ChainExecutionStatus::Failed);
805 assert_ne!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::Successful);
806 }
807
808 #[tokio::test]
810 async fn test_engine_creation() {
811 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
812 let _engine = ChainExecutionEngine::new(registry, ChainConfig::default());
813
814 }
816
817 #[tokio::test]
818 async fn test_engine_try_new() {
819 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
820 let result = ChainExecutionEngine::try_new(registry, ChainConfig::default());
821 assert!(result.is_ok());
822 }
823
824 #[tokio::test]
825 async fn test_engine_debug() {
826 let engine = create_test_engine();
827 let debug = format!("{:?}", engine);
828 assert!(debug.contains("ChainExecutionEngine"));
829 }
830
831 #[tokio::test]
832 async fn test_topological_sort() {
833 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
834 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
835
836 let mut graph = HashMap::new();
837 graph.insert("A".to_string(), vec![]);
838 graph.insert("B".to_string(), vec!["A".to_string()]);
839 graph.insert("C".to_string(), vec!["A".to_string()]);
840 graph.insert("D".to_string(), vec!["B".to_string(), "C".to_string()]);
841
842 let topo_order = engine.topological_sort(&graph).unwrap();
843
844 let d_pos = topo_order.iter().position(|x| x == "D").unwrap();
849 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
850 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
851 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
852
853 assert!(d_pos < b_pos, "D should come before B");
854 assert!(d_pos < c_pos, "D should come before C");
855 assert!(b_pos < a_pos, "B should come before A");
856 assert!(c_pos < a_pos, "C should come before A");
857 assert_eq!(topo_order.len(), 4, "Should have all 4 nodes");
858 }
859
860 #[tokio::test]
861 async fn test_topological_sort_single_node() {
862 let engine = create_test_engine();
863
864 let mut graph = HashMap::new();
865 graph.insert("A".to_string(), vec![]);
866
867 let topo_order = engine.topological_sort(&graph).unwrap();
868 assert_eq!(topo_order, vec!["A".to_string()]);
869 }
870
871 #[tokio::test]
872 async fn test_topological_sort_linear_chain() {
873 let engine = create_test_engine();
874
875 let mut graph = HashMap::new();
876 graph.insert("A".to_string(), vec![]);
877 graph.insert("B".to_string(), vec!["A".to_string()]);
878 graph.insert("C".to_string(), vec!["B".to_string()]);
879
880 let topo_order = engine.topological_sort(&graph).unwrap();
881
882 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
883 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
884 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
885
886 assert!(c_pos < b_pos);
887 assert!(b_pos < a_pos);
888 }
889
890 #[tokio::test]
891 async fn test_topological_sort_empty_graph() {
892 let engine = create_test_engine();
893 let graph = HashMap::new();
894
895 let topo_order = engine.topological_sort(&graph).unwrap();
896 assert!(topo_order.is_empty());
897 }
898
899 #[tokio::test]
900 async fn test_circular_dependency_detection() {
901 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
902 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
903
904 let mut graph = HashMap::new();
905 graph.insert("A".to_string(), vec!["B".to_string()]);
906 graph.insert("B".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
909 assert!(result.is_err());
910 }
911
912 #[tokio::test]
913 async fn test_circular_dependency_self_reference() {
914 let engine = create_test_engine();
915
916 let mut graph = HashMap::new();
917 graph.insert("A".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
920 assert!(result.is_err());
921 }
922
923 #[tokio::test]
924 async fn test_circular_dependency_chain() {
925 let engine = create_test_engine();
926
927 let mut graph = HashMap::new();
928 graph.insert("A".to_string(), vec!["C".to_string()]);
929 graph.insert("B".to_string(), vec!["A".to_string()]);
930 graph.insert("C".to_string(), vec!["B".to_string()]); let result = engine.topological_sort(&graph);
933 assert!(result.is_err());
934 }
935
936 #[tokio::test]
937 async fn test_build_dependency_graph() {
938 let engine = create_test_engine();
939
940 let links = vec![
941 ChainLink {
942 request: ChainRequest {
943 id: "req1".to_string(),
944 method: "GET".to_string(),
945 url: "http://example.com/1".to_string(),
946 headers: HashMap::new(),
947 body: None,
948 depends_on: vec![],
949 timeout_secs: None,
950 expected_status: None,
951 scripting: None,
952 },
953 store_as: None,
954 extract: HashMap::new(),
955 },
956 ChainLink {
957 request: ChainRequest {
958 id: "req2".to_string(),
959 method: "GET".to_string(),
960 url: "http://example.com/2".to_string(),
961 headers: HashMap::new(),
962 body: None,
963 depends_on: vec!["req1".to_string()],
964 timeout_secs: None,
965 expected_status: None,
966 scripting: None,
967 },
968 store_as: None,
969 extract: HashMap::new(),
970 },
971 ChainLink {
972 request: ChainRequest {
973 id: "req3".to_string(),
974 method: "GET".to_string(),
975 url: "http://example.com/3".to_string(),
976 headers: HashMap::new(),
977 body: None,
978 depends_on: vec!["req1".to_string(), "req2".to_string()],
979 timeout_secs: None,
980 expected_status: None,
981 scripting: None,
982 },
983 store_as: None,
984 extract: HashMap::new(),
985 },
986 ];
987
988 let graph = engine.build_dependency_graph(&links);
989
990 assert!(graph.contains_key("req1"));
991 assert!(graph.contains_key("req2"));
992 assert!(graph.contains_key("req3"));
993 assert_eq!(graph.get("req1").unwrap().len(), 0);
994 assert_eq!(graph.get("req2").unwrap(), &vec!["req1".to_string()]);
995 assert_eq!(graph.get("req3").unwrap(), &vec!["req1".to_string(), "req2".to_string()]);
996 }
997
998 #[tokio::test]
1000 async fn test_extract_from_response_simple_field() {
1001 let engine = create_test_engine();
1002 let response = create_test_chain_response();
1003
1004 let value = engine.extract_from_response(&response, "body.user.id");
1005 assert!(value.is_some());
1006 assert_eq!(value.unwrap(), json!(123));
1007 }
1008
1009 #[tokio::test]
1010 async fn test_extract_from_response_nested_field() {
1011 let engine = create_test_engine();
1012 let response = create_test_chain_response();
1013
1014 let value = engine.extract_from_response(&response, "body.user.name");
1015 assert!(value.is_some());
1016 assert_eq!(value.unwrap(), json!("test"));
1017 }
1018
1019 #[tokio::test]
1020 async fn test_extract_from_response_array_element() {
1021 let engine = create_test_engine();
1022 let response = create_test_chain_response();
1023
1024 let value = engine.extract_from_response(&response, "body.items.[0].value");
1025 assert!(value.is_some());
1026 assert_eq!(value.unwrap(), json!("a"));
1027 }
1028
1029 #[tokio::test]
1030 async fn test_extract_from_response_array_element_second() {
1031 let engine = create_test_engine();
1032 let response = create_test_chain_response();
1033
1034 let value = engine.extract_from_response(&response, "body.items.[1].id");
1035 assert!(value.is_some());
1036 assert_eq!(value.unwrap(), json!(2));
1037 }
1038
1039 #[tokio::test]
1040 async fn test_extract_from_response_invalid_path() {
1041 let engine = create_test_engine();
1042 let response = create_test_chain_response();
1043
1044 let value = engine.extract_from_response(&response, "body.nonexistent");
1045 assert!(value.is_none());
1046 }
1047
1048 #[tokio::test]
1049 async fn test_extract_from_response_non_body_path() {
1050 let engine = create_test_engine();
1051 let response = create_test_chain_response();
1052
1053 let value = engine.extract_from_response(&response, "headers.content-type");
1054 assert!(value.is_none()); }
1056
1057 #[tokio::test]
1058 async fn test_extract_from_response_empty_path() {
1059 let engine = create_test_engine();
1060 let response = create_test_chain_response();
1061
1062 let value = engine.extract_from_response(&response, "");
1063 assert!(value.is_none());
1064 }
1065
1066 #[tokio::test]
1067 async fn test_extract_from_response_invalid_array_index() {
1068 let engine = create_test_engine();
1069 let response = create_test_chain_response();
1070
1071 let value = engine.extract_from_response(&response, "body.items.[invalid].value");
1072 assert!(value.is_none());
1073 }
1074
1075 #[tokio::test]
1076 async fn test_extract_from_response_array_out_of_bounds() {
1077 let engine = create_test_engine();
1078 let response = create_test_chain_response();
1079
1080 let value = engine.extract_from_response(&response, "body.items.[100].value");
1081 assert!(value.is_none());
1082 }
1083
1084 #[tokio::test]
1085 async fn test_extract_from_response_no_body() {
1086 let engine = create_test_engine();
1087 let response = ChainResponse {
1088 status: 200,
1089 headers: HashMap::new(),
1090 body: None,
1091 duration_ms: 50,
1092 executed_at: "2024-01-15T10:00:00Z".to_string(),
1093 error: None,
1094 };
1095
1096 let value = engine.extract_from_response(&response, "body.user.id");
1097 assert!(value.is_none());
1098 }
1099
1100 #[tokio::test]
1102 async fn test_expand_template_simple() {
1103 use crate::request_chaining::ChainContext;
1104 let engine = create_test_engine();
1105 let context = ChainTemplatingContext::new(ChainContext::new());
1106
1107 let result = engine.expand_template("hello world", &context);
1108 assert_eq!(result, "hello world");
1109 }
1110
1111 #[tokio::test]
1112 async fn test_expand_template_with_variable() {
1113 use crate::request_chaining::ChainContext;
1114 let engine = create_test_engine();
1115 let mut context = ChainTemplatingContext::new(ChainContext::new());
1116 context.chain_context.set_variable("name".to_string(), json!("test"));
1117
1118 let result = engine.expand_template("hello {{chain.name}}", &context);
1119 assert!(result.contains("hello"));
1121 }
1122
1123 #[tokio::test]
1125 async fn test_expand_template_in_json_string() {
1126 use crate::request_chaining::ChainContext;
1127 let engine = create_test_engine();
1128 let context = ChainTemplatingContext::new(ChainContext::new());
1129
1130 let input = json!("hello world");
1131 let result = engine.expand_template_in_json(&input, &context);
1132 assert_eq!(result, json!("hello world"));
1133 }
1134
1135 #[tokio::test]
1136 async fn test_expand_template_in_json_number() {
1137 use crate::request_chaining::ChainContext;
1138 let engine = create_test_engine();
1139 let context = ChainTemplatingContext::new(ChainContext::new());
1140
1141 let input = json!(42);
1142 let result = engine.expand_template_in_json(&input, &context);
1143 assert_eq!(result, json!(42));
1144 }
1145
1146 #[tokio::test]
1147 async fn test_expand_template_in_json_boolean() {
1148 use crate::request_chaining::ChainContext;
1149 let engine = create_test_engine();
1150 let context = ChainTemplatingContext::new(ChainContext::new());
1151
1152 let input = json!(true);
1153 let result = engine.expand_template_in_json(&input, &context);
1154 assert_eq!(result, json!(true));
1155 }
1156
1157 #[tokio::test]
1158 async fn test_expand_template_in_json_null() {
1159 use crate::request_chaining::ChainContext;
1160 let engine = create_test_engine();
1161 let context = ChainTemplatingContext::new(ChainContext::new());
1162
1163 let input = json!(null);
1164 let result = engine.expand_template_in_json(&input, &context);
1165 assert_eq!(result, json!(null));
1166 }
1167
1168 #[tokio::test]
1169 async fn test_expand_template_in_json_array() {
1170 use crate::request_chaining::ChainContext;
1171 let engine = create_test_engine();
1172 let context = ChainTemplatingContext::new(ChainContext::new());
1173
1174 let input = json!(["a", "b", "c"]);
1175 let result = engine.expand_template_in_json(&input, &context);
1176 assert_eq!(result, json!(["a", "b", "c"]));
1177 }
1178
1179 #[tokio::test]
1180 async fn test_expand_template_in_json_object() {
1181 use crate::request_chaining::ChainContext;
1182 let engine = create_test_engine();
1183 let context = ChainTemplatingContext::new(ChainContext::new());
1184
1185 let input = json!({"key": "value", "nested": {"inner": "data"}});
1186 let result = engine.expand_template_in_json(&input, &context);
1187 assert_eq!(result, json!({"key": "value", "nested": {"inner": "data"}}));
1188 }
1189
1190 #[tokio::test]
1192 async fn test_get_chain_history_empty() {
1193 let engine = create_test_engine();
1194
1195 let history = engine.get_chain_history("nonexistent").await;
1196 assert!(history.is_empty());
1197 }
1198
1199 #[tokio::test]
1201 async fn test_collect_dependency_level() {
1202 let engine = create_test_engine();
1203 let graph = HashMap::new();
1204 let mut level = vec![];
1205 let mut processed = HashSet::new();
1206
1207 engine.collect_dependency_level("req1".to_string(), &graph, &mut level, &mut processed);
1208
1209 assert_eq!(level, vec!["req1".to_string()]);
1210 assert!(processed.contains("req1"));
1211 }
1212
1213 #[tokio::test]
1215 async fn test_execute_chain_not_found() {
1216 let engine = create_test_engine();
1217
1218 let result = engine.execute_chain("nonexistent", None).await;
1219 assert!(result.is_err());
1220 let err = result.unwrap_err().to_string();
1221 assert!(err.contains("not found"));
1222 }
1223
1224 #[tokio::test]
1226 async fn test_engine_with_custom_config() {
1227 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1228 let config = ChainConfig {
1229 enabled: true,
1230 max_chain_length: 50,
1231 global_timeout_secs: 60,
1232 enable_parallel_execution: false,
1233 };
1234
1235 let result = ChainExecutionEngine::try_new(registry, config);
1236 assert!(result.is_ok());
1237 }
1238
1239 #[tokio::test]
1240 async fn test_engine_with_default_config() {
1241 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1242 let config = ChainConfig::default();
1243
1244 let result = ChainExecutionEngine::try_new(registry, config);
1245 assert!(result.is_ok());
1246 }
1247}