1use crate::request_chaining::{
7 ChainConfig, ChainDefinition, ChainExecutionContext, ChainLink, ChainResponse,
8 ChainTemplatingContext, RequestChainRegistry,
9};
10#[cfg(feature = "scripting")]
11use crate::request_scripting::{ScriptContext, ScriptEngine};
12use crate::templating::{expand_str_with_context, TemplatingContext};
13use crate::{Error, Result};
14use chrono::Utc;
15use futures::future::join_all;
16use reqwest::{
17 header::{HeaderMap, HeaderName, HeaderValue},
18 Client, Method,
19};
20use serde_json::Value;
21use std::collections::{HashMap, HashSet};
22use std::str::FromStr;
23use std::sync::Arc;
24use tokio::sync::Mutex;
25use tokio::time::{timeout, Duration};
26
27#[derive(Debug, Clone)]
29pub struct ExecutionRecord {
30 pub executed_at: String,
32 pub result: ChainExecutionResult,
34}
35
36#[derive(Debug)]
38pub struct ChainExecutionEngine {
39 http_client: Client,
41 registry: Arc<RequestChainRegistry>,
43 config: ChainConfig,
45 execution_history: Arc<Mutex<HashMap<String, Vec<ExecutionRecord>>>>,
47 #[cfg(feature = "scripting")]
49 script_engine: ScriptEngine,
50}
51
52impl ChainExecutionEngine {
53 pub fn new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Self {
60 Self::try_new(registry, config)
61 .unwrap_or_else(|e| {
62 panic!(
63 "Failed to create HTTP client for chain execution engine: {}. \
64 This typically indicates a system configuration issue (e.g., invalid timeout value).",
65 e
66 )
67 })
68 }
69
70 pub fn try_new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Result<Self> {
74 let http_client = Client::builder()
75 .timeout(Duration::from_secs(config.global_timeout_secs))
76 .build()
77 .map_err(|e| {
78 Error::generic(format!(
79 "Failed to create HTTP client: {}. \
80 Check that the timeout value ({}) is valid.",
81 e, config.global_timeout_secs
82 ))
83 })?;
84
85 Ok(Self {
86 http_client,
87 registry,
88 config,
89 execution_history: Arc::new(Mutex::new(HashMap::new())),
90 #[cfg(feature = "scripting")]
91 script_engine: ScriptEngine::new(),
92 })
93 }
94
95 pub async fn execute_chain(
97 &self,
98 chain_id: &str,
99 variables: Option<Value>,
100 ) -> Result<ChainExecutionResult> {
101 let chain = self
102 .registry
103 .get_chain(chain_id)
104 .await
105 .ok_or_else(|| Error::generic(format!("Chain '{}' not found", chain_id)))?;
106
107 let result = self.execute_chain_definition(&chain, variables).await?;
108
109 let record = ExecutionRecord {
111 executed_at: Utc::now().to_rfc3339(),
112 result: result.clone(),
113 };
114
115 let mut history = self.execution_history.lock().await;
116 history.entry(chain_id.to_string()).or_insert_with(Vec::new).push(record);
117
118 Ok(result)
119 }
120
121 pub async fn get_chain_history(&self, chain_id: &str) -> Vec<ExecutionRecord> {
123 let history = self.execution_history.lock().await;
124 history.get(chain_id).cloned().unwrap_or_default()
125 }
126
127 pub async fn execute_chain_definition(
129 &self,
130 chain_def: &ChainDefinition,
131 variables: Option<Value>,
132 ) -> Result<ChainExecutionResult> {
133 self.registry.validate_chain(chain_def).await?;
135
136 let start_time = std::time::Instant::now();
137 let mut execution_context = ChainExecutionContext::new(chain_def.clone());
138
139 for (key, value) in &chain_def.variables {
141 execution_context
142 .templating
143 .chain_context
144 .set_variable(key.clone(), value.clone());
145 }
146
147 if let Some(Value::Object(map)) = variables {
149 for (key, value) in map {
150 execution_context.templating.chain_context.set_variable(key, value);
151 }
152 }
153
154 if self.config.enable_parallel_execution {
155 self.execute_with_parallelism(&mut execution_context).await
156 } else {
157 self.execute_sequential(&mut execution_context).await
158 }
159 .map(|_| ChainExecutionResult {
160 chain_id: chain_def.id.clone(),
161 status: ChainExecutionStatus::Successful,
162 total_duration_ms: start_time.elapsed().as_millis() as u64,
163 request_results: execution_context.templating.chain_context.responses.clone(),
164 error_message: None,
165 })
166 }
167
168 async fn execute_with_parallelism(
170 &self,
171 execution_context: &mut ChainExecutionContext,
172 ) -> Result<()> {
173 let dep_graph = self.build_dependency_graph(&execution_context.definition.links);
174 let topo_order = self.topological_sort(&dep_graph)?;
175
176 let mut level_groups = vec![];
178 let mut processed = HashSet::new();
179
180 for request_id in topo_order {
181 if !processed.contains(&request_id) {
182 let mut level = vec![];
183 self.collect_dependency_level(request_id, &dep_graph, &mut level, &mut processed);
184 level_groups.push(level);
185 }
186 }
187
188 for level in level_groups {
190 if level.len() == 1 {
191 let request_id = &level[0];
193 let link = execution_context
194 .definition
195 .links
196 .iter()
197 .find(|l| l.request.id == *request_id)
198 .ok_or_else(|| {
199 Error::generic(format!(
200 "Chain link not found for request_id '{}' during parallel execution",
201 request_id
202 ))
203 })?;
204
205 let link_clone = link.clone();
206 self.execute_request(&link_clone, execution_context).await?;
207 } else {
208 let tasks = level
210 .into_iter()
211 .filter_map(|request_id| {
212 let link = execution_context
213 .definition
214 .links
215 .iter()
216 .find(|l| l.request.id == request_id);
217 let link = match link {
218 Some(l) => l.clone(),
219 None => {
220 tracing::error!(
221 "Chain link not found for request_id '{}' during parallel execution",
222 request_id
223 );
224 return None;
225 }
226 };
227 let parallel_context = ChainExecutionContext {
229 definition: execution_context.definition.clone(),
230 templating: execution_context.templating.clone(),
231 start_time: std::time::Instant::now(),
232 config: execution_context.config.clone(),
233 };
234
235 let context = Arc::new(Mutex::new(parallel_context));
236 let engine =
237 ChainExecutionEngine::new(self.registry.clone(), self.config.clone());
238
239 Some(tokio::spawn(async move {
240 let mut ctx = context.lock().await;
241 engine.execute_request(&link, &mut ctx).await
242 }))
243 })
244 .collect::<Vec<_>>();
245
246 let results = join_all(tasks).await;
247 for result in results {
248 result
249 .map_err(|e| Error::generic(format!("Task join error: {}", e)))?
250 .map_err(|e| Error::generic(format!("Request execution error: {}", e)))?;
251 }
252 }
253 }
254
255 Ok(())
256 }
257
258 async fn execute_sequential(
260 &self,
261 execution_context: &mut ChainExecutionContext,
262 ) -> Result<()> {
263 let links = execution_context.definition.links.clone();
264 for link in &links {
265 self.execute_request(link, execution_context).await?;
266 }
267 Ok(())
268 }
269
270 async fn execute_request(
272 &self,
273 link: &ChainLink,
274 execution_context: &mut ChainExecutionContext,
275 ) -> Result<()> {
276 let request_start = std::time::Instant::now();
277
278 execution_context.templating.set_current_request(link.request.clone());
280
281 let method = Method::from_bytes(link.request.method.as_bytes()).map_err(|e| {
282 Error::generic(format!("Invalid HTTP method '{}': {}", link.request.method, e))
283 })?;
284
285 let url = self.expand_template(&link.request.url, &execution_context.templating);
286
287 let mut headers = HeaderMap::new();
289 for (key, value) in &link.request.headers {
290 let expanded_value = self.expand_template(value, &execution_context.templating);
291 let header_name = HeaderName::from_str(key)
292 .map_err(|e| Error::generic(format!("Invalid header name '{}': {}", key, e)))?;
293 let header_value = HeaderValue::from_str(&expanded_value).map_err(|e| {
294 Error::generic(format!("Invalid header value for '{}': {}", key, e))
295 })?;
296 headers.insert(header_name, header_value);
297 }
298
299 let mut request_builder = self.http_client.request(method, &url).headers(headers.clone());
301
302 if let Some(body) = &link.request.body {
304 match body {
305 crate::request_chaining::RequestBody::Json(json_value) => {
306 let expanded_body =
307 self.expand_template_in_json(json_value, &execution_context.templating);
308 request_builder = request_builder.json(&expanded_body);
309 }
310 crate::request_chaining::RequestBody::BinaryFile { path, content_type } => {
311 let templating_context =
313 TemplatingContext::with_chain(execution_context.templating.clone());
314
315 let expanded_path = expand_str_with_context(path, &templating_context);
317
318 let binary_body = crate::request_chaining::RequestBody::binary_file(
320 expanded_path,
321 content_type.clone(),
322 );
323
324 match binary_body.to_bytes().await {
326 Ok(file_bytes) => {
327 request_builder = request_builder.body(file_bytes);
328
329 if let Some(ct) = content_type {
331 let mut headers = headers.clone();
332 headers.insert(
333 "content-type",
334 ct.parse().unwrap_or_else(|_| {
335 HeaderValue::from_static("application/octet-stream")
336 }),
337 );
338 request_builder = request_builder.headers(headers);
339 }
340 }
341 Err(e) => {
342 return Err(e);
343 }
344 }
345 }
346 }
347 }
348
349 if let Some(timeout_secs) = link.request.timeout_secs {
351 request_builder = request_builder.timeout(Duration::from_secs(timeout_secs));
352 }
353
354 #[cfg(feature = "scripting")]
356 if let Some(scripting) = &link.request.scripting {
357 if let Some(pre_script) = &scripting.pre_script {
358 let script_context = ScriptContext {
359 request: Some(link.request.clone()),
360 response: None,
361 chain_context: execution_context.templating.chain_context.variables.clone(),
362 variables: HashMap::new(),
363 env_vars: std::env::vars().collect(),
364 };
365
366 match self
367 .script_engine
368 .execute_script(pre_script, &script_context, scripting.timeout_ms)
369 .await
370 {
371 Ok(script_result) => {
372 for (key, value) in script_result.modified_variables {
374 execution_context.templating.chain_context.set_variable(key, value);
375 }
376 }
377 Err(e) => {
378 tracing::warn!(
379 "Pre-script execution failed for request '{}': {}",
380 link.request.id,
381 e
382 );
383 }
385 }
386 }
387 }
388
389 let response_result =
391 timeout(Duration::from_secs(self.config.global_timeout_secs), request_builder.send())
392 .await;
393
394 let response = match response_result {
395 Ok(Ok(resp)) => resp,
396 Ok(Err(e)) => {
397 return Err(Error::generic(format!("Request '{}' failed: {}", link.request.id, e)));
398 }
399 Err(_) => {
400 return Err(Error::generic(format!("Request '{}' timed out", link.request.id)));
401 }
402 };
403
404 let status = response.status();
405 let headers: HashMap<String, String> = response
406 .headers()
407 .iter()
408 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
409 .collect();
410
411 let body_text = response.text().await.unwrap_or_default();
412 let body_json: Option<Value> = serde_json::from_str(&body_text).ok();
413
414 let duration_ms = request_start.elapsed().as_millis() as u64;
415 let executed_at = Utc::now().to_rfc3339();
416
417 let chain_response = ChainResponse {
418 status: status.as_u16(),
419 headers,
420 body: body_json,
421 duration_ms,
422 executed_at,
423 error: None,
424 };
425
426 if let Some(expected) = &link.request.expected_status {
428 if !expected.contains(&status.as_u16()) {
429 let error_msg = format!(
430 "Request '{}' returned status {} but expected one of {:?}",
431 link.request.id,
432 status.as_u16(),
433 expected
434 );
435 return Err(Error::generic(error_msg));
436 }
437 }
438
439 if let Some(store_name) = &link.store_as {
441 execution_context
442 .templating
443 .chain_context
444 .store_response(store_name.clone(), chain_response.clone());
445 }
446
447 for (var_name, extraction_path) in &link.extract {
449 if let Some(value) = self.extract_from_response(&chain_response, extraction_path) {
450 execution_context.templating.chain_context.set_variable(var_name.clone(), value);
451 }
452 }
453
454 #[cfg(feature = "scripting")]
456 if let Some(scripting) = &link.request.scripting {
457 if let Some(post_script) = &scripting.post_script {
458 let script_context = ScriptContext {
459 request: Some(link.request.clone()),
460 response: Some(chain_response.clone()),
461 chain_context: execution_context.templating.chain_context.variables.clone(),
462 variables: HashMap::new(),
463 env_vars: std::env::vars().collect(),
464 };
465
466 match self
467 .script_engine
468 .execute_script(post_script, &script_context, scripting.timeout_ms)
469 .await
470 {
471 Ok(script_result) => {
472 for (key, value) in script_result.modified_variables {
474 execution_context.templating.chain_context.set_variable(key, value);
475 }
476 }
477 Err(e) => {
478 tracing::warn!(
479 "Post-script execution failed for request '{}': {}",
480 link.request.id,
481 e
482 );
483 }
485 }
486 }
487 }
488
489 execution_context
491 .templating
492 .chain_context
493 .store_response(link.request.id.clone(), chain_response);
494
495 Ok(())
496 }
497
498 fn build_dependency_graph(&self, links: &[ChainLink]) -> HashMap<String, Vec<String>> {
500 let mut graph = HashMap::new();
501
502 for link in links {
503 graph
504 .entry(link.request.id.clone())
505 .or_insert_with(Vec::new)
506 .extend(link.request.depends_on.iter().cloned());
507 }
508
509 graph
510 }
511
512 fn topological_sort(&self, graph: &HashMap<String, Vec<String>>) -> Result<Vec<String>> {
514 let mut visited = HashSet::new();
515 let mut rec_stack = HashSet::new();
516 let mut result = Vec::new();
517
518 for node in graph.keys() {
519 if !visited.contains(node) {
520 self.topo_sort_util(node, graph, &mut visited, &mut rec_stack, &mut result)?;
521 }
522 }
523
524 result.reverse();
525 Ok(result)
526 }
527
528 #[allow(clippy::only_used_in_recursion)]
530 fn topo_sort_util(
531 &self,
532 node: &str,
533 graph: &HashMap<String, Vec<String>>,
534 visited: &mut HashSet<String>,
535 rec_stack: &mut HashSet<String>,
536 result: &mut Vec<String>,
537 ) -> Result<()> {
538 visited.insert(node.to_string());
539 rec_stack.insert(node.to_string());
540
541 if let Some(dependencies) = graph.get(node) {
542 for dep in dependencies {
543 if !visited.contains(dep) {
544 self.topo_sort_util(dep, graph, visited, rec_stack, result)?;
545 } else if rec_stack.contains(dep) {
546 return Err(Error::generic(format!(
547 "Circular dependency detected involving '{}'",
548 node
549 )));
550 }
551 }
552 }
553
554 rec_stack.remove(node);
555 result.push(node.to_string());
556 Ok(())
557 }
558
559 fn collect_dependency_level(
561 &self,
562 request_id: String,
563 _graph: &HashMap<String, Vec<String>>,
564 level: &mut Vec<String>,
565 processed: &mut HashSet<String>,
566 ) {
567 level.push(request_id.clone());
568 processed.insert(request_id);
569 }
570
571 fn expand_template(&self, template: &str, context: &ChainTemplatingContext) -> String {
573 let templating_context = TemplatingContext {
574 chain_context: Some(context.clone()),
575 env_context: None,
576 virtual_clock: None,
577 };
578 expand_str_with_context(template, &templating_context)
579 }
580
581 fn expand_template_in_json(&self, value: &Value, context: &ChainTemplatingContext) -> Value {
583 match value {
584 Value::String(s) => Value::String(self.expand_template(s, context)),
585 Value::Array(arr) => {
586 Value::Array(arr.iter().map(|v| self.expand_template_in_json(v, context)).collect())
587 }
588 Value::Object(map) => {
589 let mut new_map = serde_json::Map::new();
590 for (k, v) in map {
591 new_map.insert(
592 self.expand_template(k, context),
593 self.expand_template_in_json(v, context),
594 );
595 }
596 Value::Object(new_map)
597 }
598 _ => value.clone(),
599 }
600 }
601
602 fn extract_from_response(&self, response: &ChainResponse, path: &str) -> Option<Value> {
604 let parts: Vec<&str> = path.split('.').collect();
605
606 if parts.is_empty() || parts[0] != "body" {
607 return None;
608 }
609
610 let mut current = response.body.as_ref()?;
611
612 for part in &parts[1..] {
613 match current {
614 Value::Object(map) => {
615 current = map.get(*part)?;
616 }
617 Value::Array(arr) => {
618 if part.starts_with('[') && part.ends_with(']') {
619 let index_str = &part[1..part.len() - 1];
620 if let Ok(index) = index_str.parse::<usize>() {
621 current = arr.get(index)?;
622 } else {
623 return None;
624 }
625 } else {
626 return None;
627 }
628 }
629 _ => return None,
630 }
631 }
632
633 Some(current.clone())
634 }
635}
636
637#[derive(Debug, Clone)]
639pub struct ChainExecutionResult {
640 pub chain_id: String,
642 pub status: ChainExecutionStatus,
644 pub total_duration_ms: u64,
646 pub request_results: HashMap<String, ChainResponse>,
648 pub error_message: Option<String>,
650}
651
652#[derive(Debug, Clone, PartialEq)]
654pub enum ChainExecutionStatus {
655 Successful,
657 PartialSuccess,
659 Failed,
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666 use crate::request_chaining::{ChainRequest, ChainResponse};
667 use serde_json::json;
668 use std::sync::Arc;
669
670 fn create_test_engine() -> ChainExecutionEngine {
671 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
672 ChainExecutionEngine::new(registry, ChainConfig::default())
673 }
674
675 fn create_test_chain_response() -> ChainResponse {
676 ChainResponse {
677 status: 200,
678 headers: {
679 let mut h = HashMap::new();
680 h.insert("content-type".to_string(), "application/json".to_string());
681 h
682 },
683 body: Some(json!({
684 "user": {
685 "id": 123,
686 "name": "test",
687 "roles": ["admin", "user"]
688 },
689 "items": [
690 {"id": 1, "value": "a"},
691 {"id": 2, "value": "b"}
692 ]
693 })),
694 duration_ms: 50,
695 executed_at: "2024-01-15T10:00:00Z".to_string(),
696 error: None,
697 }
698 }
699
700 #[test]
702 fn test_execution_record_debug() {
703 let record = ExecutionRecord {
704 executed_at: "2024-01-15T10:00:00Z".to_string(),
705 result: ChainExecutionResult {
706 chain_id: "test-chain".to_string(),
707 status: ChainExecutionStatus::Successful,
708 total_duration_ms: 100,
709 request_results: HashMap::new(),
710 error_message: None,
711 },
712 };
713
714 let debug = format!("{:?}", record);
715 assert!(debug.contains("ExecutionRecord"));
716 assert!(debug.contains("executed_at"));
717 }
718
719 #[test]
720 fn test_execution_record_clone() {
721 let record = ExecutionRecord {
722 executed_at: "2024-01-15T10:00:00Z".to_string(),
723 result: ChainExecutionResult {
724 chain_id: "test-chain".to_string(),
725 status: ChainExecutionStatus::Successful,
726 total_duration_ms: 100,
727 request_results: HashMap::new(),
728 error_message: None,
729 },
730 };
731
732 let cloned = record.clone();
733 assert_eq!(cloned.executed_at, record.executed_at);
734 assert_eq!(cloned.result.chain_id, record.result.chain_id);
735 }
736
737 #[test]
739 fn test_chain_execution_result_debug() {
740 let result = ChainExecutionResult {
741 chain_id: "test-chain".to_string(),
742 status: ChainExecutionStatus::Successful,
743 total_duration_ms: 100,
744 request_results: HashMap::new(),
745 error_message: None,
746 };
747
748 let debug = format!("{:?}", result);
749 assert!(debug.contains("ChainExecutionResult"));
750 assert!(debug.contains("chain_id"));
751 }
752
753 #[test]
754 fn test_chain_execution_result_clone() {
755 let mut request_results = HashMap::new();
756 request_results.insert("req1".to_string(), create_test_chain_response());
757
758 let result = ChainExecutionResult {
759 chain_id: "test-chain".to_string(),
760 status: ChainExecutionStatus::Successful,
761 total_duration_ms: 100,
762 request_results,
763 error_message: Some("test error".to_string()),
764 };
765
766 let cloned = result.clone();
767 assert_eq!(cloned.chain_id, result.chain_id);
768 assert_eq!(cloned.total_duration_ms, result.total_duration_ms);
769 assert_eq!(cloned.error_message, result.error_message);
770 }
771
772 #[test]
774 fn test_chain_execution_status_debug() {
775 let status = ChainExecutionStatus::Successful;
776 let debug = format!("{:?}", status);
777 assert!(debug.contains("Successful"));
778
779 let status = ChainExecutionStatus::PartialSuccess;
780 let debug = format!("{:?}", status);
781 assert!(debug.contains("PartialSuccess"));
782
783 let status = ChainExecutionStatus::Failed;
784 let debug = format!("{:?}", status);
785 assert!(debug.contains("Failed"));
786 }
787
788 #[test]
789 fn test_chain_execution_status_clone() {
790 let status = ChainExecutionStatus::Successful;
791 let cloned = status.clone();
792 assert_eq!(cloned, ChainExecutionStatus::Successful);
793 }
794
795 #[test]
796 fn test_chain_execution_status_eq() {
797 assert_eq!(ChainExecutionStatus::Successful, ChainExecutionStatus::Successful);
798 assert_eq!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::PartialSuccess);
799 assert_eq!(ChainExecutionStatus::Failed, ChainExecutionStatus::Failed);
800
801 assert_ne!(ChainExecutionStatus::Successful, ChainExecutionStatus::Failed);
802 assert_ne!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::Successful);
803 }
804
805 #[tokio::test]
807 async fn test_engine_creation() {
808 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
809 let _engine = ChainExecutionEngine::new(registry, ChainConfig::default());
810
811 }
813
814 #[tokio::test]
815 async fn test_engine_try_new() {
816 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
817 let result = ChainExecutionEngine::try_new(registry, ChainConfig::default());
818 assert!(result.is_ok());
819 }
820
821 #[tokio::test]
822 async fn test_engine_debug() {
823 let engine = create_test_engine();
824 let debug = format!("{:?}", engine);
825 assert!(debug.contains("ChainExecutionEngine"));
826 }
827
828 #[tokio::test]
829 async fn test_topological_sort() {
830 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
831 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
832
833 let mut graph = HashMap::new();
834 graph.insert("A".to_string(), vec![]);
835 graph.insert("B".to_string(), vec!["A".to_string()]);
836 graph.insert("C".to_string(), vec!["A".to_string()]);
837 graph.insert("D".to_string(), vec!["B".to_string(), "C".to_string()]);
838
839 let topo_order = engine.topological_sort(&graph).unwrap();
840
841 let d_pos = topo_order.iter().position(|x| x == "D").unwrap();
846 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
847 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
848 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
849
850 assert!(d_pos < b_pos, "D should come before B");
851 assert!(d_pos < c_pos, "D should come before C");
852 assert!(b_pos < a_pos, "B should come before A");
853 assert!(c_pos < a_pos, "C should come before A");
854 assert_eq!(topo_order.len(), 4, "Should have all 4 nodes");
855 }
856
857 #[tokio::test]
858 async fn test_topological_sort_single_node() {
859 let engine = create_test_engine();
860
861 let mut graph = HashMap::new();
862 graph.insert("A".to_string(), vec![]);
863
864 let topo_order = engine.topological_sort(&graph).unwrap();
865 assert_eq!(topo_order, vec!["A".to_string()]);
866 }
867
868 #[tokio::test]
869 async fn test_topological_sort_linear_chain() {
870 let engine = create_test_engine();
871
872 let mut graph = HashMap::new();
873 graph.insert("A".to_string(), vec![]);
874 graph.insert("B".to_string(), vec!["A".to_string()]);
875 graph.insert("C".to_string(), vec!["B".to_string()]);
876
877 let topo_order = engine.topological_sort(&graph).unwrap();
878
879 let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
880 let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
881 let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
882
883 assert!(c_pos < b_pos);
884 assert!(b_pos < a_pos);
885 }
886
887 #[tokio::test]
888 async fn test_topological_sort_empty_graph() {
889 let engine = create_test_engine();
890 let graph = HashMap::new();
891
892 let topo_order = engine.topological_sort(&graph).unwrap();
893 assert!(topo_order.is_empty());
894 }
895
896 #[tokio::test]
897 async fn test_circular_dependency_detection() {
898 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
899 let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
900
901 let mut graph = HashMap::new();
902 graph.insert("A".to_string(), vec!["B".to_string()]);
903 graph.insert("B".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
906 assert!(result.is_err());
907 }
908
909 #[tokio::test]
910 async fn test_circular_dependency_self_reference() {
911 let engine = create_test_engine();
912
913 let mut graph = HashMap::new();
914 graph.insert("A".to_string(), vec!["A".to_string()]); let result = engine.topological_sort(&graph);
917 assert!(result.is_err());
918 }
919
920 #[tokio::test]
921 async fn test_circular_dependency_chain() {
922 let engine = create_test_engine();
923
924 let mut graph = HashMap::new();
925 graph.insert("A".to_string(), vec!["C".to_string()]);
926 graph.insert("B".to_string(), vec!["A".to_string()]);
927 graph.insert("C".to_string(), vec!["B".to_string()]); let result = engine.topological_sort(&graph);
930 assert!(result.is_err());
931 }
932
933 #[tokio::test]
934 async fn test_build_dependency_graph() {
935 let engine = create_test_engine();
936
937 let links = vec![
938 ChainLink {
939 request: ChainRequest {
940 id: "req1".to_string(),
941 method: "GET".to_string(),
942 url: "http://example.com/1".to_string(),
943 headers: HashMap::new(),
944 body: None,
945 depends_on: vec![],
946 timeout_secs: None,
947 expected_status: None,
948 scripting: None,
949 },
950 store_as: None,
951 extract: HashMap::new(),
952 },
953 ChainLink {
954 request: ChainRequest {
955 id: "req2".to_string(),
956 method: "GET".to_string(),
957 url: "http://example.com/2".to_string(),
958 headers: HashMap::new(),
959 body: None,
960 depends_on: vec!["req1".to_string()],
961 timeout_secs: None,
962 expected_status: None,
963 scripting: None,
964 },
965 store_as: None,
966 extract: HashMap::new(),
967 },
968 ChainLink {
969 request: ChainRequest {
970 id: "req3".to_string(),
971 method: "GET".to_string(),
972 url: "http://example.com/3".to_string(),
973 headers: HashMap::new(),
974 body: None,
975 depends_on: vec!["req1".to_string(), "req2".to_string()],
976 timeout_secs: None,
977 expected_status: None,
978 scripting: None,
979 },
980 store_as: None,
981 extract: HashMap::new(),
982 },
983 ];
984
985 let graph = engine.build_dependency_graph(&links);
986
987 assert!(graph.contains_key("req1"));
988 assert!(graph.contains_key("req2"));
989 assert!(graph.contains_key("req3"));
990 assert_eq!(graph.get("req1").unwrap().len(), 0);
991 assert_eq!(graph.get("req2").unwrap(), &vec!["req1".to_string()]);
992 assert_eq!(graph.get("req3").unwrap(), &vec!["req1".to_string(), "req2".to_string()]);
993 }
994
995 #[tokio::test]
997 async fn test_extract_from_response_simple_field() {
998 let engine = create_test_engine();
999 let response = create_test_chain_response();
1000
1001 let value = engine.extract_from_response(&response, "body.user.id");
1002 assert!(value.is_some());
1003 assert_eq!(value.unwrap(), json!(123));
1004 }
1005
1006 #[tokio::test]
1007 async fn test_extract_from_response_nested_field() {
1008 let engine = create_test_engine();
1009 let response = create_test_chain_response();
1010
1011 let value = engine.extract_from_response(&response, "body.user.name");
1012 assert!(value.is_some());
1013 assert_eq!(value.unwrap(), json!("test"));
1014 }
1015
1016 #[tokio::test]
1017 async fn test_extract_from_response_array_element() {
1018 let engine = create_test_engine();
1019 let response = create_test_chain_response();
1020
1021 let value = engine.extract_from_response(&response, "body.items.[0].value");
1022 assert!(value.is_some());
1023 assert_eq!(value.unwrap(), json!("a"));
1024 }
1025
1026 #[tokio::test]
1027 async fn test_extract_from_response_array_element_second() {
1028 let engine = create_test_engine();
1029 let response = create_test_chain_response();
1030
1031 let value = engine.extract_from_response(&response, "body.items.[1].id");
1032 assert!(value.is_some());
1033 assert_eq!(value.unwrap(), json!(2));
1034 }
1035
1036 #[tokio::test]
1037 async fn test_extract_from_response_invalid_path() {
1038 let engine = create_test_engine();
1039 let response = create_test_chain_response();
1040
1041 let value = engine.extract_from_response(&response, "body.nonexistent");
1042 assert!(value.is_none());
1043 }
1044
1045 #[tokio::test]
1046 async fn test_extract_from_response_non_body_path() {
1047 let engine = create_test_engine();
1048 let response = create_test_chain_response();
1049
1050 let value = engine.extract_from_response(&response, "headers.content-type");
1051 assert!(value.is_none()); }
1053
1054 #[tokio::test]
1055 async fn test_extract_from_response_empty_path() {
1056 let engine = create_test_engine();
1057 let response = create_test_chain_response();
1058
1059 let value = engine.extract_from_response(&response, "");
1060 assert!(value.is_none());
1061 }
1062
1063 #[tokio::test]
1064 async fn test_extract_from_response_invalid_array_index() {
1065 let engine = create_test_engine();
1066 let response = create_test_chain_response();
1067
1068 let value = engine.extract_from_response(&response, "body.items.[invalid].value");
1069 assert!(value.is_none());
1070 }
1071
1072 #[tokio::test]
1073 async fn test_extract_from_response_array_out_of_bounds() {
1074 let engine = create_test_engine();
1075 let response = create_test_chain_response();
1076
1077 let value = engine.extract_from_response(&response, "body.items.[100].value");
1078 assert!(value.is_none());
1079 }
1080
1081 #[tokio::test]
1082 async fn test_extract_from_response_no_body() {
1083 let engine = create_test_engine();
1084 let response = ChainResponse {
1085 status: 200,
1086 headers: HashMap::new(),
1087 body: None,
1088 duration_ms: 50,
1089 executed_at: "2024-01-15T10:00:00Z".to_string(),
1090 error: None,
1091 };
1092
1093 let value = engine.extract_from_response(&response, "body.user.id");
1094 assert!(value.is_none());
1095 }
1096
1097 #[tokio::test]
1099 async fn test_expand_template_simple() {
1100 use crate::request_chaining::ChainContext;
1101 let engine = create_test_engine();
1102 let context = ChainTemplatingContext::new(ChainContext::new());
1103
1104 let result = engine.expand_template("hello world", &context);
1105 assert_eq!(result, "hello world");
1106 }
1107
1108 #[tokio::test]
1109 async fn test_expand_template_with_variable() {
1110 use crate::request_chaining::ChainContext;
1111 let engine = create_test_engine();
1112 let mut context = ChainTemplatingContext::new(ChainContext::new());
1113 context.chain_context.set_variable("name".to_string(), json!("test"));
1114
1115 let result = engine.expand_template("hello {{chain.name}}", &context);
1116 assert!(result.contains("hello"));
1118 }
1119
1120 #[tokio::test]
1122 async fn test_expand_template_in_json_string() {
1123 use crate::request_chaining::ChainContext;
1124 let engine = create_test_engine();
1125 let context = ChainTemplatingContext::new(ChainContext::new());
1126
1127 let input = json!("hello world");
1128 let result = engine.expand_template_in_json(&input, &context);
1129 assert_eq!(result, json!("hello world"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_expand_template_in_json_number() {
1134 use crate::request_chaining::ChainContext;
1135 let engine = create_test_engine();
1136 let context = ChainTemplatingContext::new(ChainContext::new());
1137
1138 let input = json!(42);
1139 let result = engine.expand_template_in_json(&input, &context);
1140 assert_eq!(result, json!(42));
1141 }
1142
1143 #[tokio::test]
1144 async fn test_expand_template_in_json_boolean() {
1145 use crate::request_chaining::ChainContext;
1146 let engine = create_test_engine();
1147 let context = ChainTemplatingContext::new(ChainContext::new());
1148
1149 let input = json!(true);
1150 let result = engine.expand_template_in_json(&input, &context);
1151 assert_eq!(result, json!(true));
1152 }
1153
1154 #[tokio::test]
1155 async fn test_expand_template_in_json_null() {
1156 use crate::request_chaining::ChainContext;
1157 let engine = create_test_engine();
1158 let context = ChainTemplatingContext::new(ChainContext::new());
1159
1160 let input = json!(null);
1161 let result = engine.expand_template_in_json(&input, &context);
1162 assert_eq!(result, json!(null));
1163 }
1164
1165 #[tokio::test]
1166 async fn test_expand_template_in_json_array() {
1167 use crate::request_chaining::ChainContext;
1168 let engine = create_test_engine();
1169 let context = ChainTemplatingContext::new(ChainContext::new());
1170
1171 let input = json!(["a", "b", "c"]);
1172 let result = engine.expand_template_in_json(&input, &context);
1173 assert_eq!(result, json!(["a", "b", "c"]));
1174 }
1175
1176 #[tokio::test]
1177 async fn test_expand_template_in_json_object() {
1178 use crate::request_chaining::ChainContext;
1179 let engine = create_test_engine();
1180 let context = ChainTemplatingContext::new(ChainContext::new());
1181
1182 let input = json!({"key": "value", "nested": {"inner": "data"}});
1183 let result = engine.expand_template_in_json(&input, &context);
1184 assert_eq!(result, json!({"key": "value", "nested": {"inner": "data"}}));
1185 }
1186
1187 #[tokio::test]
1189 async fn test_get_chain_history_empty() {
1190 let engine = create_test_engine();
1191
1192 let history = engine.get_chain_history("nonexistent").await;
1193 assert!(history.is_empty());
1194 }
1195
1196 #[tokio::test]
1198 async fn test_collect_dependency_level() {
1199 let engine = create_test_engine();
1200 let graph = HashMap::new();
1201 let mut level = vec![];
1202 let mut processed = HashSet::new();
1203
1204 engine.collect_dependency_level("req1".to_string(), &graph, &mut level, &mut processed);
1205
1206 assert_eq!(level, vec!["req1".to_string()]);
1207 assert!(processed.contains("req1"));
1208 }
1209
1210 #[tokio::test]
1212 async fn test_execute_chain_not_found() {
1213 let engine = create_test_engine();
1214
1215 let result = engine.execute_chain("nonexistent", None).await;
1216 assert!(result.is_err());
1217 let err = result.unwrap_err().to_string();
1218 assert!(err.contains("not found"));
1219 }
1220
1221 #[tokio::test]
1223 async fn test_engine_with_custom_config() {
1224 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1225 let config = ChainConfig {
1226 enabled: true,
1227 max_chain_length: 50,
1228 global_timeout_secs: 60,
1229 enable_parallel_execution: false,
1230 };
1231
1232 let result = ChainExecutionEngine::try_new(registry, config);
1233 assert!(result.is_ok());
1234 }
1235
1236 #[tokio::test]
1237 async fn test_engine_with_default_config() {
1238 let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
1239 let config = ChainConfig::default();
1240
1241 let result = ChainExecutionEngine::try_new(registry, config);
1242 assert!(result.is_ok());
1243 }
1244}