use crate::request_chaining::{
ChainConfig, ChainDefinition, ChainExecutionContext, ChainLink, ChainResponse,
ChainTemplatingContext, RequestChainRegistry,
};
#[cfg(feature = "scripting")]
use crate::request_scripting::{ScriptContext, ScriptEngine};
use crate::templating::{expand_str_with_context, TemplatingContext};
use crate::{Error, Result};
use chrono::Utc;
use futures::future::join_all;
use reqwest::{
header::{HeaderMap, HeaderName, HeaderValue},
Client, Method,
};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{timeout, Duration};
#[derive(Debug, Clone)]
pub struct ExecutionRecord {
pub executed_at: String,
pub result: ChainExecutionResult,
}
#[derive(Debug)]
pub struct ChainExecutionEngine {
http_client: Client,
registry: Arc<RequestChainRegistry>,
config: ChainConfig,
execution_history: Arc<Mutex<HashMap<String, Vec<ExecutionRecord>>>>,
#[cfg(feature = "scripting")]
script_engine: ScriptEngine,
}
impl ChainExecutionEngine {
pub fn new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Self {
Self::try_new(registry, config)
.unwrap_or_else(|e| {
panic!(
"Failed to create HTTP client for chain execution engine: {}. \
This typically indicates a system configuration issue (e.g., invalid timeout value).",
e
)
})
}
pub fn try_new(registry: Arc<RequestChainRegistry>, config: ChainConfig) -> Result<Self> {
let http_client = Client::builder()
.timeout(Duration::from_secs(config.global_timeout_secs))
.build()
.map_err(|e| {
Error::internal(format!(
"Failed to create HTTP client: {}. \
Check that the timeout value ({}) is valid.",
e, config.global_timeout_secs
))
})?;
Ok(Self {
http_client,
registry,
config,
execution_history: Arc::new(Mutex::new(HashMap::new())),
#[cfg(feature = "scripting")]
script_engine: ScriptEngine::new(),
})
}
pub async fn execute_chain(
&self,
chain_id: &str,
variables: Option<Value>,
) -> Result<ChainExecutionResult> {
let chain = self
.registry
.get_chain(chain_id)
.await
.ok_or_else(|| Error::internal(format!("Chain '{}' not found", chain_id)))?;
let result = self.execute_chain_definition(&chain, variables).await?;
let record = ExecutionRecord {
executed_at: Utc::now().to_rfc3339(),
result: result.clone(),
};
let mut history = self.execution_history.lock().await;
history.entry(chain_id.to_string()).or_insert_with(Vec::new).push(record);
Ok(result)
}
pub async fn get_chain_history(&self, chain_id: &str) -> Vec<ExecutionRecord> {
let history = self.execution_history.lock().await;
history.get(chain_id).cloned().unwrap_or_default()
}
pub async fn execute_chain_definition(
&self,
chain_def: &ChainDefinition,
variables: Option<Value>,
) -> Result<ChainExecutionResult> {
self.registry.validate_chain(chain_def).await?;
let start_time = std::time::Instant::now();
let mut execution_context = ChainExecutionContext::new(chain_def.clone());
for (key, value) in &chain_def.variables {
execution_context
.templating
.chain_context
.set_variable(key.clone(), value.clone());
}
if let Some(Value::Object(map)) = variables {
for (key, value) in map {
execution_context.templating.chain_context.set_variable(key, value);
}
}
if self.config.enable_parallel_execution {
self.execute_with_parallelism(&mut execution_context).await
} else {
self.execute_sequential(&mut execution_context).await
}
.map(|_| ChainExecutionResult {
chain_id: chain_def.id.clone(),
status: ChainExecutionStatus::Successful,
total_duration_ms: start_time.elapsed().as_millis() as u64,
request_results: execution_context.templating.chain_context.responses.clone(),
error_message: None,
})
}
async fn execute_with_parallelism(
&self,
execution_context: &mut ChainExecutionContext,
) -> Result<()> {
let dep_graph = self.build_dependency_graph(&execution_context.definition.links);
let topo_order = self.topological_sort(&dep_graph)?;
let mut level_groups = vec![];
let mut processed = HashSet::new();
for request_id in topo_order {
if !processed.contains(&request_id) {
let mut level = vec![];
self.collect_dependency_level(request_id, &dep_graph, &mut level, &mut processed);
level_groups.push(level);
}
}
for level in level_groups {
if level.len() == 1 {
let request_id = &level[0];
let link = execution_context
.definition
.links
.iter()
.find(|l| l.request.id == *request_id)
.ok_or_else(|| {
Error::internal(format!(
"Chain link not found for request_id '{}' during parallel execution",
request_id
))
})?;
let link_clone = link.clone();
self.execute_request(&link_clone, execution_context).await?;
} else {
let tasks = level
.into_iter()
.filter_map(|request_id| {
let link = execution_context
.definition
.links
.iter()
.find(|l| l.request.id == request_id);
let link = match link {
Some(l) => l.clone(),
None => {
tracing::error!(
"Chain link not found for request_id '{}' during parallel execution",
request_id
);
return None;
}
};
let parallel_context = ChainExecutionContext {
definition: execution_context.definition.clone(),
templating: execution_context.templating.clone(),
start_time: std::time::Instant::now(),
config: execution_context.config.clone(),
};
let context = Arc::new(Mutex::new(parallel_context));
let engine =
ChainExecutionEngine::new(self.registry.clone(), self.config.clone());
Some(tokio::spawn(async move {
let mut ctx = context.lock().await;
engine.execute_request(&link, &mut ctx).await
}))
})
.collect::<Vec<_>>();
let results = join_all(tasks).await;
for result in results {
result
.map_err(|e| Error::internal(format!("Task join error: {}", e)))?
.map_err(|e| Error::internal(format!("Request execution error: {}", e)))?;
}
}
}
Ok(())
}
async fn execute_sequential(
&self,
execution_context: &mut ChainExecutionContext,
) -> Result<()> {
let links = execution_context.definition.links.clone();
for link in &links {
self.execute_request(link, execution_context).await?;
}
Ok(())
}
async fn execute_request(
&self,
link: &ChainLink,
execution_context: &mut ChainExecutionContext,
) -> Result<()> {
let request_start = std::time::Instant::now();
execution_context.templating.set_current_request(link.request.clone());
let method = Method::from_bytes(link.request.method.as_bytes()).map_err(|e| {
Error::internal(format!("Invalid HTTP method '{}': {}", link.request.method, e))
})?;
let url = self.expand_template(&link.request.url, &execution_context.templating);
let mut headers = HeaderMap::new();
for (key, value) in &link.request.headers {
let expanded_value = self.expand_template(value, &execution_context.templating);
let header_name = HeaderName::from_str(key)
.map_err(|e| Error::internal(format!("Invalid header name '{}': {}", key, e)))?;
let header_value = HeaderValue::from_str(&expanded_value).map_err(|e| {
Error::internal(format!("Invalid header value for '{}': {}", key, e))
})?;
headers.insert(header_name, header_value);
}
let mut request_builder = self.http_client.request(method, &url).headers(headers.clone());
if let Some(body) = &link.request.body {
match body {
crate::request_chaining::RequestBody::Json(json_value) => {
let expanded_body =
self.expand_template_in_json(json_value, &execution_context.templating);
request_builder = request_builder.json(&expanded_body);
}
crate::request_chaining::RequestBody::BinaryFile { path, content_type } => {
let templating_context =
TemplatingContext::with_chain(execution_context.templating.clone());
let expanded_path = expand_str_with_context(path, &templating_context);
let binary_body = crate::request_chaining::RequestBody::binary_file(
expanded_path,
content_type.clone(),
);
match binary_body.to_bytes().await {
Ok(file_bytes) => {
request_builder = request_builder.body(file_bytes);
if let Some(ct) = content_type {
let mut headers = headers.clone();
headers.insert(
"content-type",
ct.parse().unwrap_or_else(|_| {
HeaderValue::from_static("application/octet-stream")
}),
);
request_builder = request_builder.headers(headers);
}
}
Err(e) => {
return Err(e);
}
}
}
}
}
if let Some(timeout_secs) = link.request.timeout_secs {
request_builder = request_builder.timeout(Duration::from_secs(timeout_secs));
}
#[cfg(feature = "scripting")]
if let Some(scripting) = &link.request.scripting {
if let Some(pre_script) = &scripting.pre_script {
let script_context = ScriptContext {
request: Some(link.request.clone()),
response: None,
chain_context: execution_context.templating.chain_context.variables.clone(),
variables: HashMap::new(),
env_vars: std::env::vars().collect(),
};
match self
.script_engine
.execute_script(pre_script, &script_context, scripting.timeout_ms)
.await
{
Ok(script_result) => {
for (key, value) in script_result.modified_variables {
execution_context.templating.chain_context.set_variable(key, value);
}
}
Err(e) => {
tracing::warn!(
"Pre-script execution failed for request '{}': {}",
link.request.id,
e
);
}
}
}
}
let response_result =
timeout(Duration::from_secs(self.config.global_timeout_secs), request_builder.send())
.await;
let response = match response_result {
Ok(Ok(resp)) => resp,
Ok(Err(e)) => {
return Err(Error::internal(format!(
"Request '{}' failed: {}",
link.request.id, e
)));
}
Err(_) => {
return Err(Error::internal(format!("Request '{}' timed out", link.request.id)));
}
};
let status = response.status();
let headers: HashMap<String, String> = response
.headers()
.iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
.collect();
let body_text = response.text().await.unwrap_or_default();
let body_json: Option<Value> = serde_json::from_str(&body_text).ok();
let duration_ms = request_start.elapsed().as_millis() as u64;
let executed_at = Utc::now().to_rfc3339();
let chain_response = ChainResponse {
status: status.as_u16(),
headers,
body: body_json,
duration_ms,
executed_at,
error: None,
};
if let Some(expected) = &link.request.expected_status {
if !expected.contains(&status.as_u16()) {
let error_msg = format!(
"Request '{}' returned status {} but expected one of {:?}",
link.request.id,
status.as_u16(),
expected
);
return Err(Error::internal(error_msg));
}
}
if let Some(store_name) = &link.store_as {
execution_context
.templating
.chain_context
.store_response(store_name.clone(), chain_response.clone());
}
for (var_name, extraction_path) in &link.extract {
if let Some(value) = self.extract_from_response(&chain_response, extraction_path) {
execution_context.templating.chain_context.set_variable(var_name.clone(), value);
}
}
#[cfg(feature = "scripting")]
if let Some(scripting) = &link.request.scripting {
if let Some(post_script) = &scripting.post_script {
let script_context = ScriptContext {
request: Some(link.request.clone()),
response: Some(chain_response.clone()),
chain_context: execution_context.templating.chain_context.variables.clone(),
variables: HashMap::new(),
env_vars: std::env::vars().collect(),
};
match self
.script_engine
.execute_script(post_script, &script_context, scripting.timeout_ms)
.await
{
Ok(script_result) => {
for (key, value) in script_result.modified_variables {
execution_context.templating.chain_context.set_variable(key, value);
}
}
Err(e) => {
tracing::warn!(
"Post-script execution failed for request '{}': {}",
link.request.id,
e
);
}
}
}
}
execution_context
.templating
.chain_context
.store_response(link.request.id.clone(), chain_response);
Ok(())
}
fn build_dependency_graph(&self, links: &[ChainLink]) -> HashMap<String, Vec<String>> {
let mut graph = HashMap::new();
for link in links {
graph
.entry(link.request.id.clone())
.or_insert_with(Vec::new)
.extend(link.request.depends_on.iter().cloned());
}
graph
}
fn topological_sort(&self, graph: &HashMap<String, Vec<String>>) -> Result<Vec<String>> {
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
let mut result = Vec::new();
for node in graph.keys() {
if !visited.contains(node) {
self.topo_sort_util(node, graph, &mut visited, &mut rec_stack, &mut result)?;
}
}
result.reverse();
Ok(result)
}
#[allow(clippy::only_used_in_recursion)]
fn topo_sort_util(
&self,
node: &str,
graph: &HashMap<String, Vec<String>>,
visited: &mut HashSet<String>,
rec_stack: &mut HashSet<String>,
result: &mut Vec<String>,
) -> Result<()> {
visited.insert(node.to_string());
rec_stack.insert(node.to_string());
if let Some(dependencies) = graph.get(node) {
for dep in dependencies {
if !visited.contains(dep) {
self.topo_sort_util(dep, graph, visited, rec_stack, result)?;
} else if rec_stack.contains(dep) {
return Err(Error::internal(format!(
"Circular dependency detected involving '{}'",
node
)));
}
}
}
rec_stack.remove(node);
result.push(node.to_string());
Ok(())
}
fn collect_dependency_level(
&self,
request_id: String,
_graph: &HashMap<String, Vec<String>>,
level: &mut Vec<String>,
processed: &mut HashSet<String>,
) {
level.push(request_id.clone());
processed.insert(request_id);
}
fn expand_template(&self, template: &str, context: &ChainTemplatingContext) -> String {
let templating_context = TemplatingContext {
chain_context: Some(context.clone()),
env_context: None,
virtual_clock: None,
};
expand_str_with_context(template, &templating_context)
}
fn expand_template_in_json(&self, value: &Value, context: &ChainTemplatingContext) -> Value {
match value {
Value::String(s) => Value::String(self.expand_template(s, context)),
Value::Array(arr) => {
Value::Array(arr.iter().map(|v| self.expand_template_in_json(v, context)).collect())
}
Value::Object(map) => {
let mut new_map = serde_json::Map::new();
for (k, v) in map {
new_map.insert(
self.expand_template(k, context),
self.expand_template_in_json(v, context),
);
}
Value::Object(new_map)
}
_ => value.clone(),
}
}
fn extract_from_response(&self, response: &ChainResponse, path: &str) -> Option<Value> {
let parts: Vec<&str> = path.split('.').collect();
if parts.is_empty() || parts[0] != "body" {
return None;
}
let mut current = response.body.as_ref()?;
for part in &parts[1..] {
match current {
Value::Object(map) => {
current = map.get(*part)?;
}
Value::Array(arr) => {
if part.starts_with('[') && part.ends_with(']') {
let index_str = &part[1..part.len() - 1];
if let Ok(index) = index_str.parse::<usize>() {
current = arr.get(index)?;
} else {
return None;
}
} else {
return None;
}
}
_ => return None,
}
}
Some(current.clone())
}
}
#[derive(Debug, Clone)]
pub struct ChainExecutionResult {
pub chain_id: String,
pub status: ChainExecutionStatus,
pub total_duration_ms: u64,
pub request_results: HashMap<String, ChainResponse>,
pub error_message: Option<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum ChainExecutionStatus {
Successful,
PartialSuccess,
Failed,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::request_chaining::{ChainRequest, ChainResponse};
use serde_json::json;
use std::sync::Arc;
fn create_test_engine() -> ChainExecutionEngine {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
ChainExecutionEngine::new(registry, ChainConfig::default())
}
fn create_test_chain_response() -> ChainResponse {
ChainResponse {
status: 200,
headers: {
let mut h = HashMap::new();
h.insert("content-type".to_string(), "application/json".to_string());
h
},
body: Some(json!({
"user": {
"id": 123,
"name": "test",
"roles": ["admin", "user"]
},
"items": [
{"id": 1, "value": "a"},
{"id": 2, "value": "b"}
]
})),
duration_ms: 50,
executed_at: "2024-01-15T10:00:00Z".to_string(),
error: None,
}
}
#[test]
fn test_execution_record_debug() {
let record = ExecutionRecord {
executed_at: "2024-01-15T10:00:00Z".to_string(),
result: ChainExecutionResult {
chain_id: "test-chain".to_string(),
status: ChainExecutionStatus::Successful,
total_duration_ms: 100,
request_results: HashMap::new(),
error_message: None,
},
};
let debug = format!("{:?}", record);
assert!(debug.contains("ExecutionRecord"));
assert!(debug.contains("executed_at"));
}
#[test]
fn test_execution_record_clone() {
let record = ExecutionRecord {
executed_at: "2024-01-15T10:00:00Z".to_string(),
result: ChainExecutionResult {
chain_id: "test-chain".to_string(),
status: ChainExecutionStatus::Successful,
total_duration_ms: 100,
request_results: HashMap::new(),
error_message: None,
},
};
let cloned = record.clone();
assert_eq!(cloned.executed_at, record.executed_at);
assert_eq!(cloned.result.chain_id, record.result.chain_id);
}
#[test]
fn test_chain_execution_result_debug() {
let result = ChainExecutionResult {
chain_id: "test-chain".to_string(),
status: ChainExecutionStatus::Successful,
total_duration_ms: 100,
request_results: HashMap::new(),
error_message: None,
};
let debug = format!("{:?}", result);
assert!(debug.contains("ChainExecutionResult"));
assert!(debug.contains("chain_id"));
}
#[test]
fn test_chain_execution_result_clone() {
let mut request_results = HashMap::new();
request_results.insert("req1".to_string(), create_test_chain_response());
let result = ChainExecutionResult {
chain_id: "test-chain".to_string(),
status: ChainExecutionStatus::Successful,
total_duration_ms: 100,
request_results,
error_message: Some("test error".to_string()),
};
let cloned = result.clone();
assert_eq!(cloned.chain_id, result.chain_id);
assert_eq!(cloned.total_duration_ms, result.total_duration_ms);
assert_eq!(cloned.error_message, result.error_message);
}
#[test]
fn test_chain_execution_status_debug() {
let status = ChainExecutionStatus::Successful;
let debug = format!("{:?}", status);
assert!(debug.contains("Successful"));
let status = ChainExecutionStatus::PartialSuccess;
let debug = format!("{:?}", status);
assert!(debug.contains("PartialSuccess"));
let status = ChainExecutionStatus::Failed;
let debug = format!("{:?}", status);
assert!(debug.contains("Failed"));
}
#[test]
fn test_chain_execution_status_clone() {
let status = ChainExecutionStatus::Successful;
let cloned = status.clone();
assert_eq!(cloned, ChainExecutionStatus::Successful);
}
#[test]
fn test_chain_execution_status_eq() {
assert_eq!(ChainExecutionStatus::Successful, ChainExecutionStatus::Successful);
assert_eq!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::PartialSuccess);
assert_eq!(ChainExecutionStatus::Failed, ChainExecutionStatus::Failed);
assert_ne!(ChainExecutionStatus::Successful, ChainExecutionStatus::Failed);
assert_ne!(ChainExecutionStatus::PartialSuccess, ChainExecutionStatus::Successful);
}
#[tokio::test]
async fn test_engine_creation() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let _engine = ChainExecutionEngine::new(registry, ChainConfig::default());
}
#[tokio::test]
async fn test_engine_try_new() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let result = ChainExecutionEngine::try_new(registry, ChainConfig::default());
assert!(result.is_ok());
}
#[tokio::test]
async fn test_engine_debug() {
let engine = create_test_engine();
let debug = format!("{:?}", engine);
assert!(debug.contains("ChainExecutionEngine"));
}
#[tokio::test]
async fn test_topological_sort() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec![]);
graph.insert("B".to_string(), vec!["A".to_string()]);
graph.insert("C".to_string(), vec!["A".to_string()]);
graph.insert("D".to_string(), vec!["B".to_string(), "C".to_string()]);
let topo_order = engine.topological_sort(&graph).unwrap();
let d_pos = topo_order.iter().position(|x| x == "D").unwrap();
let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
assert!(d_pos < b_pos, "D should come before B");
assert!(d_pos < c_pos, "D should come before C");
assert!(b_pos < a_pos, "B should come before A");
assert!(c_pos < a_pos, "C should come before A");
assert_eq!(topo_order.len(), 4, "Should have all 4 nodes");
}
#[tokio::test]
async fn test_topological_sort_single_node() {
let engine = create_test_engine();
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec![]);
let topo_order = engine.topological_sort(&graph).unwrap();
assert_eq!(topo_order, vec!["A".to_string()]);
}
#[tokio::test]
async fn test_topological_sort_linear_chain() {
let engine = create_test_engine();
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec![]);
graph.insert("B".to_string(), vec!["A".to_string()]);
graph.insert("C".to_string(), vec!["B".to_string()]);
let topo_order = engine.topological_sort(&graph).unwrap();
let c_pos = topo_order.iter().position(|x| x == "C").unwrap();
let b_pos = topo_order.iter().position(|x| x == "B").unwrap();
let a_pos = topo_order.iter().position(|x| x == "A").unwrap();
assert!(c_pos < b_pos);
assert!(b_pos < a_pos);
}
#[tokio::test]
async fn test_topological_sort_empty_graph() {
let engine = create_test_engine();
let graph = HashMap::new();
let topo_order = engine.topological_sort(&graph).unwrap();
assert!(topo_order.is_empty());
}
#[tokio::test]
async fn test_circular_dependency_detection() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let engine = ChainExecutionEngine::new(registry, ChainConfig::default());
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec!["B".to_string()]);
graph.insert("B".to_string(), vec!["A".to_string()]);
let result = engine.topological_sort(&graph);
assert!(result.is_err());
}
#[tokio::test]
async fn test_circular_dependency_self_reference() {
let engine = create_test_engine();
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec!["A".to_string()]);
let result = engine.topological_sort(&graph);
assert!(result.is_err());
}
#[tokio::test]
async fn test_circular_dependency_chain() {
let engine = create_test_engine();
let mut graph = HashMap::new();
graph.insert("A".to_string(), vec!["C".to_string()]);
graph.insert("B".to_string(), vec!["A".to_string()]);
graph.insert("C".to_string(), vec!["B".to_string()]);
let result = engine.topological_sort(&graph);
assert!(result.is_err());
}
#[tokio::test]
async fn test_build_dependency_graph() {
let engine = create_test_engine();
let links = vec![
ChainLink {
request: ChainRequest {
id: "req1".to_string(),
method: "GET".to_string(),
url: "http://example.com/1".to_string(),
headers: HashMap::new(),
body: None,
depends_on: vec![],
timeout_secs: None,
expected_status: None,
scripting: None,
},
store_as: None,
extract: HashMap::new(),
},
ChainLink {
request: ChainRequest {
id: "req2".to_string(),
method: "GET".to_string(),
url: "http://example.com/2".to_string(),
headers: HashMap::new(),
body: None,
depends_on: vec!["req1".to_string()],
timeout_secs: None,
expected_status: None,
scripting: None,
},
store_as: None,
extract: HashMap::new(),
},
ChainLink {
request: ChainRequest {
id: "req3".to_string(),
method: "GET".to_string(),
url: "http://example.com/3".to_string(),
headers: HashMap::new(),
body: None,
depends_on: vec!["req1".to_string(), "req2".to_string()],
timeout_secs: None,
expected_status: None,
scripting: None,
},
store_as: None,
extract: HashMap::new(),
},
];
let graph = engine.build_dependency_graph(&links);
assert!(graph.contains_key("req1"));
assert!(graph.contains_key("req2"));
assert!(graph.contains_key("req3"));
assert_eq!(graph.get("req1").unwrap().len(), 0);
assert_eq!(graph.get("req2").unwrap(), &vec!["req1".to_string()]);
assert_eq!(graph.get("req3").unwrap(), &vec!["req1".to_string(), "req2".to_string()]);
}
#[tokio::test]
async fn test_extract_from_response_simple_field() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.user.id");
assert!(value.is_some());
assert_eq!(value.unwrap(), json!(123));
}
#[tokio::test]
async fn test_extract_from_response_nested_field() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.user.name");
assert!(value.is_some());
assert_eq!(value.unwrap(), json!("test"));
}
#[tokio::test]
async fn test_extract_from_response_array_element() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.items.[0].value");
assert!(value.is_some());
assert_eq!(value.unwrap(), json!("a"));
}
#[tokio::test]
async fn test_extract_from_response_array_element_second() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.items.[1].id");
assert!(value.is_some());
assert_eq!(value.unwrap(), json!(2));
}
#[tokio::test]
async fn test_extract_from_response_invalid_path() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.nonexistent");
assert!(value.is_none());
}
#[tokio::test]
async fn test_extract_from_response_non_body_path() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "headers.content-type");
assert!(value.is_none()); }
#[tokio::test]
async fn test_extract_from_response_empty_path() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "");
assert!(value.is_none());
}
#[tokio::test]
async fn test_extract_from_response_invalid_array_index() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.items.[invalid].value");
assert!(value.is_none());
}
#[tokio::test]
async fn test_extract_from_response_array_out_of_bounds() {
let engine = create_test_engine();
let response = create_test_chain_response();
let value = engine.extract_from_response(&response, "body.items.[100].value");
assert!(value.is_none());
}
#[tokio::test]
async fn test_extract_from_response_no_body() {
let engine = create_test_engine();
let response = ChainResponse {
status: 200,
headers: HashMap::new(),
body: None,
duration_ms: 50,
executed_at: "2024-01-15T10:00:00Z".to_string(),
error: None,
};
let value = engine.extract_from_response(&response, "body.user.id");
assert!(value.is_none());
}
#[tokio::test]
async fn test_expand_template_simple() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let result = engine.expand_template("hello world", &context);
assert_eq!(result, "hello world");
}
#[tokio::test]
async fn test_expand_template_with_variable() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let mut context = ChainTemplatingContext::new(ChainContext::new());
context.chain_context.set_variable("name".to_string(), json!("test"));
let result = engine.expand_template("hello {{chain.name}}", &context);
assert!(result.contains("hello"));
}
#[tokio::test]
async fn test_expand_template_in_json_string() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!("hello world");
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!("hello world"));
}
#[tokio::test]
async fn test_expand_template_in_json_number() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!(42);
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!(42));
}
#[tokio::test]
async fn test_expand_template_in_json_boolean() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!(true);
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!(true));
}
#[tokio::test]
async fn test_expand_template_in_json_null() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!(null);
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!(null));
}
#[tokio::test]
async fn test_expand_template_in_json_array() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!(["a", "b", "c"]);
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!(["a", "b", "c"]));
}
#[tokio::test]
async fn test_expand_template_in_json_object() {
use crate::request_chaining::ChainContext;
let engine = create_test_engine();
let context = ChainTemplatingContext::new(ChainContext::new());
let input = json!({"key": "value", "nested": {"inner": "data"}});
let result = engine.expand_template_in_json(&input, &context);
assert_eq!(result, json!({"key": "value", "nested": {"inner": "data"}}));
}
#[tokio::test]
async fn test_get_chain_history_empty() {
let engine = create_test_engine();
let history = engine.get_chain_history("nonexistent").await;
assert!(history.is_empty());
}
#[tokio::test]
async fn test_collect_dependency_level() {
let engine = create_test_engine();
let graph = HashMap::new();
let mut level = vec![];
let mut processed = HashSet::new();
engine.collect_dependency_level("req1".to_string(), &graph, &mut level, &mut processed);
assert_eq!(level, vec!["req1".to_string()]);
assert!(processed.contains("req1"));
}
#[tokio::test]
async fn test_execute_chain_not_found() {
let engine = create_test_engine();
let result = engine.execute_chain("nonexistent", None).await;
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("not found"));
}
#[tokio::test]
async fn test_engine_with_custom_config() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let config = ChainConfig {
enabled: true,
max_chain_length: 50,
global_timeout_secs: 60,
enable_parallel_execution: false,
};
let result = ChainExecutionEngine::try_new(registry, config);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_engine_with_default_config() {
let registry = Arc::new(RequestChainRegistry::new(ChainConfig::default()));
let config = ChainConfig::default();
let result = ChainExecutionEngine::try_new(registry, config);
assert!(result.is_ok());
}
}