use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use tokio::task::JoinSet;
use http::Method;
use serde_json::Value;
use super::{
AssertionOutcome, ExecutionArtifact, Request, RequestExecution, RequestExecutionError,
RequestProtocol, SessionRegistry,
};
#[derive(Debug, Clone)]
pub struct ExecutionRecord {
pub index: usize,
pub description: String,
pub method: Method,
pub url: String,
pub sensitive_values: Vec<String>,
pub sensitive_header_names: Vec<String>,
pub execution: RequestExecution,
pub started_at: SystemTime,
pub duration: Duration,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionOptions {
pub parallel: bool,
pub max_concurrency: Option<usize>,
pub continue_on_error: bool,
}
impl ExecutionOptions {
fn parallel_enabled(&self) -> bool {
self.parallel || self.max_concurrency.is_some()
}
fn concurrency_limit(&self) -> usize {
self.max_concurrency.unwrap_or(usize::MAX)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum InterruptSignal {
Sigint,
Sigterm,
}
impl InterruptSignal {
pub fn as_str(self) -> &'static str {
match self {
Self::Sigint => "SIGINT",
Self::Sigterm => "SIGTERM",
}
}
pub fn exit_code(self) -> i32 {
match self {
Self::Sigint => 130,
Self::Sigterm => 143,
}
}
}
pub type ExecutionObserver = Arc<dyn Fn(ExecutionEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub enum ExecutionEvent {
RequestWaiting {
index: usize,
request: String,
dependencies: Vec<String>,
waiting_on: Vec<String>,
protocol: RequestProtocol,
protocol_context: Option<Value>,
sensitive_values: Vec<String>,
},
RequestStarted {
index: usize,
request: String,
dependencies: Vec<String>,
protocol: RequestProtocol,
protocol_context: Option<Value>,
sensitive_values: Vec<String>,
},
RequestCompleted { record: ExecutionRecord },
RequestFailed { failure: RequestFailure },
AssertionPassed { request: String, assertion: String },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ExecutionTraceKind {
Waiting,
Started,
Completed,
Failed,
Skipped,
Interrupted,
}
impl ExecutionTraceKind {
pub fn as_str(self) -> &'static str {
match self {
Self::Waiting => "waiting",
Self::Started => "started",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Skipped => "skipped",
Self::Interrupted => "interrupted",
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct ExecutionTraceEntry {
pub seq: usize,
pub kind: ExecutionTraceKind,
pub request_index: Option<usize>,
pub request: Option<String>,
pub dependencies: Vec<String>,
pub waiting_on: Vec<String>,
pub protocol: Option<RequestProtocol>,
pub protocol_context: Option<Value>,
pub sensitive_values: Vec<String>,
pub sensitive_header_names: Vec<String>,
pub duration: Option<Duration>,
pub reason: Option<String>,
pub related_request: Option<String>,
pub group: Option<String>,
pub cause: Option<String>,
pub message: Option<String>,
pub signal: Option<InterruptSignal>,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionTraceCollector {
next_seq: usize,
entries: Vec<ExecutionTraceEntry>,
}
impl ExecutionTraceCollector {
pub fn record_event(&mut self, event: &ExecutionEvent) {
let mut entry = match event {
ExecutionEvent::RequestWaiting {
index,
request,
dependencies,
waiting_on,
protocol,
protocol_context,
sensitive_values,
} => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Waiting,
request_index: Some(*index),
request: Some(request.clone()),
dependencies: dependencies.clone(),
waiting_on: waiting_on.clone(),
protocol: Some(*protocol),
protocol_context: protocol_context.clone(),
sensitive_values: sensitive_values.clone(),
sensitive_header_names: Vec::new(),
duration: None,
reason: Some("dependencies_pending".to_string()),
related_request: None,
group: None,
cause: None,
message: None,
signal: None,
},
ExecutionEvent::RequestStarted {
index,
request,
dependencies,
protocol,
protocol_context,
sensitive_values,
} => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Started,
request_index: Some(*index),
request: Some(request.clone()),
dependencies: dependencies.clone(),
waiting_on: Vec::new(),
protocol: Some(*protocol),
protocol_context: protocol_context.clone(),
sensitive_values: sensitive_values.clone(),
sensitive_header_names: Vec::new(),
duration: None,
reason: None,
related_request: None,
group: None,
cause: None,
message: None,
signal: None,
},
ExecutionEvent::RequestCompleted { record } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Completed,
request_index: Some(record.index),
request: Some(record.description.clone()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: Some(record.execution.artifact.protocol),
protocol_context: None,
sensitive_values: record.sensitive_values.clone(),
sensitive_header_names: record.sensitive_header_names.clone(),
duration: Some(record.duration),
reason: None,
related_request: None,
group: None,
cause: None,
message: None,
signal: None,
},
ExecutionEvent::RequestFailed { failure } => trace_entry_from_failure(failure),
ExecutionEvent::AssertionPassed { .. } => return,
};
entry.seq = self.next_seq;
self.next_seq += 1;
self.entries.push(entry);
}
pub fn record_interrupt(&mut self, signal: InterruptSignal) {
let entry = ExecutionTraceEntry {
seq: self.next_seq,
kind: ExecutionTraceKind::Interrupted,
request_index: None,
request: None,
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: None,
protocol_context: None,
sensitive_values: Vec::new(),
sensitive_header_names: Vec::new(),
duration: None,
reason: Some(signal.as_str().to_string()),
related_request: None,
group: None,
cause: None,
message: None,
signal: Some(signal),
};
self.next_seq += 1;
self.entries.push(entry);
}
pub fn snapshot(&self) -> Vec<ExecutionTraceEntry> {
self.entries.clone()
}
pub fn into_entries(self) -> Vec<ExecutionTraceEntry> {
self.entries
}
}
fn trace_entry_from_failure(failure: &RequestFailure) -> ExecutionTraceEntry {
match failure.kind() {
RequestFailureKind::Execution { message, .. } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Failed,
request_index: failure.index(),
request: Some(failure.request().to_string()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: failure.protocol(),
protocol_context: failure.protocol_context().cloned(),
sensitive_values: failure.sensitive_values().to_vec(),
sensitive_header_names: failure.sensitive_header_names().to_vec(),
duration: failure.duration(),
reason: Some("execution_failed".to_string()),
related_request: None,
group: None,
cause: None,
message: Some(message.clone()),
signal: None,
},
RequestFailureKind::Dependency { dependency } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Skipped,
request_index: failure.index(),
request: Some(failure.request().to_string()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: failure.protocol(),
protocol_context: failure.protocol_context().cloned(),
sensitive_values: failure.sensitive_values().to_vec(),
sensitive_header_names: failure.sensitive_header_names().to_vec(),
duration: failure.duration(),
reason: Some("dependency_failed".to_string()),
related_request: Some(dependency.clone()),
group: None,
cause: None,
message: None,
signal: None,
},
RequestFailureKind::MissingDependency { dependency } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Skipped,
request_index: failure.index(),
request: Some(failure.request().to_string()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: failure.protocol(),
protocol_context: failure.protocol_context().cloned(),
sensitive_values: failure.sensitive_values().to_vec(),
sensitive_header_names: failure.sensitive_header_names().to_vec(),
duration: failure.duration(),
reason: Some("missing_dependency".to_string()),
related_request: Some(dependency.clone()),
group: None,
cause: None,
message: None,
signal: None,
},
RequestFailureKind::Join { message } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Failed,
request_index: failure.index(),
request: Some(failure.request().to_string()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: failure.protocol(),
protocol_context: failure.protocol_context().cloned(),
sensitive_values: failure.sensitive_values().to_vec(),
sensitive_header_names: failure.sensitive_header_names().to_vec(),
duration: failure.duration(),
reason: Some("join_error".to_string()),
related_request: None,
group: None,
cause: None,
message: Some(message.clone()),
signal: None,
},
RequestFailureKind::MapAborted { group, cause } => ExecutionTraceEntry {
seq: 0,
kind: ExecutionTraceKind::Skipped,
request_index: failure.index(),
request: Some(failure.request().to_string()),
dependencies: Vec::new(),
waiting_on: Vec::new(),
protocol: failure.protocol(),
protocol_context: failure.protocol_context().cloned(),
sensitive_values: failure.sensitive_values().to_vec(),
sensitive_header_names: failure.sensitive_header_names().to_vec(),
duration: failure.duration(),
reason: Some("map_aborted".to_string()),
related_request: None,
group: Some(group.clone()),
cause: Some(cause.clone()),
message: None,
signal: None,
},
}
}
#[derive(Debug)]
pub struct ExecutionError {
failures: Vec<RequestFailure>,
completed: Vec<ExecutionRecord>,
}
impl ExecutionError {
pub fn new(failures: Vec<RequestFailure>, completed: Vec<ExecutionRecord>) -> Self {
Self {
failures,
completed,
}
}
pub fn from_failure(failure: RequestFailure, completed: Vec<ExecutionRecord>) -> Self {
Self::new(vec![failure], completed)
}
pub fn into_parts(self) -> (Vec<RequestFailure>, Vec<ExecutionRecord>) {
(self.failures, self.completed)
}
}
impl std::fmt::Display for ExecutionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.failures.is_empty() {
return write!(f, "Execution failed for unknown reason");
}
writeln!(f, "{}", self.failures[0])?;
for failure in self.failures.iter().skip(1) {
writeln!(f, " - {}", failure)?;
}
Ok(())
}
}
impl std::error::Error for ExecutionError {}
#[derive(Debug, Clone)]
pub struct RequestFailure {
index: usize,
request: String,
protocol: Option<RequestProtocol>,
protocol_context: Option<Value>,
sensitive_values: Vec<String>,
sensitive_header_names: Vec<String>,
artifact: Option<ExecutionArtifact>,
started_at: Option<SystemTime>,
duration: Option<Duration>,
kind: RequestFailureKind,
}
impl RequestFailure {
fn execution(
index: usize,
request: &Request,
message: String,
assertions: Vec<AssertionOutcome>,
artifact: Option<ExecutionArtifact>,
sensitive_values: Vec<String>,
started_at: SystemTime,
duration: Duration,
) -> Self {
Self {
index,
request: request.description.clone(),
protocol: Some(request.protocol()),
protocol_context: request.protocol_context_json(),
sensitive_values,
sensitive_header_names: request.redaction_rules.additional_header_names().to_vec(),
artifact,
started_at: Some(started_at),
duration: Some(duration),
kind: RequestFailureKind::Execution {
message,
assertions,
},
}
}
fn dependency(index: usize, request: &Request, dependency: String) -> Self {
Self {
index,
request: request.description.clone(),
protocol: Some(request.protocol()),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
sensitive_header_names: request.redaction_rules.additional_header_names().to_vec(),
artifact: None,
started_at: None,
duration: None,
kind: RequestFailureKind::Dependency { dependency },
}
}
fn missing_dependency(index: usize, request: &Request, dependency: String) -> Self {
Self {
index,
request: request.description.clone(),
protocol: Some(request.protocol()),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
sensitive_header_names: request.redaction_rules.additional_header_names().to_vec(),
artifact: None,
started_at: None,
duration: None,
kind: RequestFailureKind::MissingDependency { dependency },
}
}
fn join_error(message: String) -> Self {
Self {
index: usize::MAX,
request: "<unknown>".to_string(),
protocol: None,
protocol_context: None,
sensitive_values: Vec::new(),
sensitive_header_names: Vec::new(),
artifact: None,
started_at: None,
duration: None,
kind: RequestFailureKind::Join { message },
}
}
fn map_aborted(index: usize, request: &Request, group: String, cause: String) -> Self {
Self {
index,
request: request.description.clone(),
protocol: Some(request.protocol()),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
sensitive_header_names: request.redaction_rules.additional_header_names().to_vec(),
artifact: None,
started_at: None,
duration: None,
kind: RequestFailureKind::MapAborted { group, cause },
}
}
pub fn index(&self) -> Option<usize> {
if self.index == usize::MAX {
None
} else {
Some(self.index)
}
}
pub fn request(&self) -> &str {
&self.request
}
pub fn kind(&self) -> &RequestFailureKind {
&self.kind
}
pub fn assertions(&self) -> &[AssertionOutcome] {
match &self.kind {
RequestFailureKind::Execution { assertions, .. } => assertions,
_ => &[],
}
}
pub fn protocol(&self) -> Option<RequestProtocol> {
self.protocol
}
pub fn protocol_context(&self) -> Option<&Value> {
self.protocol_context.as_ref()
}
pub fn artifact(&self) -> Option<&ExecutionArtifact> {
self.artifact.as_ref()
}
pub fn sensitive_values(&self) -> &[String] {
&self.sensitive_values
}
pub fn sensitive_header_names(&self) -> &[String] {
&self.sensitive_header_names
}
pub fn started_at(&self) -> Option<SystemTime> {
self.started_at
}
pub fn duration(&self) -> Option<Duration> {
self.duration
}
}
impl std::fmt::Display for RequestFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.kind {
RequestFailureKind::Execution { message, .. } => write!(
f,
"Request '{}' (index {}) failed during execution: {}",
self.request, self.index, message
),
RequestFailureKind::Dependency { dependency } => write!(
f,
"Request '{}' (index {}) skipped because dependency '{}' failed",
self.request, self.index, dependency
),
RequestFailureKind::MissingDependency { dependency } => write!(
f,
"Request '{}' (index {}) expected dependency '{}' to complete, but no execution record was found",
self.request, self.index, dependency
),
RequestFailureKind::Join { message } => write!(
f,
"Background task failed unexpectedly: {}",
message
),
RequestFailureKind::MapAborted { group, cause } => write!(
f,
"Request '{}' (index {}) skipped because '{}' aborted after failure in '{}'",
self.request, self.index, group, cause
),
}
}
}
#[derive(Debug, Clone)]
pub enum RequestFailureKind {
Execution {
message: String,
assertions: Vec<AssertionOutcome>,
},
Dependency { dependency: String },
MissingDependency { dependency: String },
Join { message: String },
MapAborted { group: String, cause: String },
}
pub async fn execute_plan(
requests: &[Request],
order: &[usize],
options: ExecutionOptions,
) -> Result<Vec<ExecutionRecord>, ExecutionError> {
execute_plan_with_observer(requests, order, options, None).await
}
pub async fn execute_plan_with_observer(
requests: &[Request],
order: &[usize],
options: ExecutionOptions,
observer: Option<ExecutionObserver>,
) -> Result<Vec<ExecutionRecord>, ExecutionError> {
let observer_ref = observer.as_ref();
let sessions = SessionRegistry::new();
let result = if options.parallel_enabled() {
execute_parallel(requests, order, &options, observer_ref, &sessions).await
} else {
execute_sequential(requests, order, &options, observer_ref, &sessions).await
};
sessions.clear();
result
}
async fn execute_sequential(
requests: &[Request],
order: &[usize],
options: &ExecutionOptions,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<Vec<ExecutionRecord>, ExecutionError> {
let mut results: HashMap<String, RequestExecution> = HashMap::new();
let mut records: Vec<ExecutionRecord> = Vec::new();
let mut failures: Vec<RequestFailure> = Vec::new();
let mut failed_requests: HashSet<String> = HashSet::new();
let mut aborted_maps: HashMap<String, String> = HashMap::new();
for &idx in order {
let request = requests
.get(idx)
.expect("planner produced invalid request index");
if request.map_iteration.is_some() {
if let Some(cause) = aborted_maps.get(&request.base_description) {
let failure = RequestFailure::map_aborted(
idx,
request,
request.base_description.clone(),
cause.clone(),
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
failed_requests.insert(request.description.clone());
if options.continue_on_error {
failures.push(failure);
continue;
} else {
records.sort_by_key(|record| record.index);
return Err(ExecutionError::from_failure(failure, records));
}
}
}
if let Some(dep) = request
.dependencies
.iter()
.find(|dep| failed_requests.contains(*dep))
{
let failure = RequestFailure::dependency(idx, request, dep.clone());
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
if request.map_iteration.is_some() {
aborted_maps.insert(
request.base_description.clone(),
request.description.clone(),
);
}
failed_requests.insert(request.description.clone());
if options.continue_on_error {
failures.push(failure);
continue;
} else {
records.sort_by_key(|record| record.index);
return Err(ExecutionError::from_failure(failure, records));
}
}
let (inherited_context, dependency_snapshots, inherited_sensitive_values) =
match build_dependency_context(request, &results) {
Ok(ctx) => ctx,
Err(missing) => {
let failure = RequestFailure::missing_dependency(
idx,
request,
missing,
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
if request.map_iteration.is_some() {
aborted_maps.insert(
request.base_description.clone(),
request.description.clone(),
);
}
failed_requests.insert(request.description.clone());
if options.continue_on_error {
failures.push(failure);
continue;
} else {
records.sort_by_key(|record| record.index);
return Err(ExecutionError::from_failure(failure, records));
}
}
};
let started_at = SystemTime::now();
let timer = Instant::now();
emit_event(
observer,
ExecutionEvent::RequestStarted {
index: idx,
request: request.description.clone(),
dependencies: request.dependencies.clone(),
protocol: request.protocol(),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
},
);
match request
.exec(
&inherited_context,
&inherited_sensitive_values,
&dependency_snapshots,
sessions,
observer,
)
.await
{
Ok(execution) => {
let duration = timer.elapsed();
results.insert(request.description.clone(), execution.clone());
let http = request.http_operation();
let record = ExecutionRecord {
index: idx,
description: request.description.clone(),
method: http.method.clone(),
url: http.url.clone(),
sensitive_values: execution.sensitive_values.clone(),
sensitive_header_names: request.redaction_rules.additional_header_names().to_vec(),
execution,
started_at,
duration,
};
emit_event(
observer,
ExecutionEvent::RequestCompleted {
record: record.clone(),
},
);
records.push(record);
}
Err(err) => {
let duration = timer.elapsed();
let (message, assertions, artifact, sensitive_values) = err.into_parts();
let failure = RequestFailure::execution(
idx,
request,
message,
assertions,
artifact,
sensitive_values,
started_at,
duration,
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
if request.map_iteration.is_some() {
aborted_maps.insert(
request.base_description.clone(),
request.description.clone(),
);
}
failed_requests.insert(request.description.clone());
if options.continue_on_error {
failures.push(failure);
continue;
} else {
records.sort_by_key(|record| record.index);
return Err(ExecutionError::from_failure(failure, records));
}
}
}
}
records.sort_by_key(|record| record.index);
if failures.is_empty() {
Ok(records)
} else {
Err(ExecutionError::new(failures, records))
}
}
async fn execute_parallel(
requests: &[Request],
order: &[usize],
options: &ExecutionOptions,
observer: Option<&ExecutionObserver>,
sessions: &SessionRegistry,
) -> Result<Vec<ExecutionRecord>, ExecutionError> {
let mut results: HashMap<String, RequestExecution> = HashMap::new();
let mut records: Vec<ExecutionRecord> = Vec::new();
let mut failures: Vec<RequestFailure> = Vec::new();
let mut failed_requests: HashSet<String> = HashSet::new();
let mut completed: HashSet<usize> = HashSet::new();
let mut in_flight: HashSet<usize> = HashSet::new();
let mut waiting_emitted: HashSet<usize> = HashSet::new();
let mut map_failures: HashMap<String, String> = HashMap::new();
let mut map_in_progress: HashSet<String> = HashSet::new();
let total = order.len();
let limit = options.concurrency_limit().max(1);
let mut join_set: JoinSet<(
usize,
String,
Method,
String,
SystemTime,
Duration,
Result<RequestExecution, RequestExecutionError>,
)> = JoinSet::new();
while completed.len() < total {
let mut scheduled_this_round = false;
for &idx in order {
if completed.contains(&idx) || in_flight.contains(&idx) {
continue;
}
let request = requests
.get(idx)
.expect("planner produced invalid request index");
if request.map_iteration.is_some() {
if let Some(cause) = map_failures.get(&request.base_description) {
let failure = RequestFailure::map_aborted(
idx,
request,
request.base_description.clone(),
cause.clone(),
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
failed_requests.insert(request.description.clone());
completed.insert(idx);
if options.continue_on_error {
failures.push(failure);
continue;
} else {
failures.push(failure);
records.sort_by_key(|record| record.index);
join_set.abort_all();
return Err(ExecutionError::new(failures, records));
}
}
if map_in_progress.contains(&request.base_description) {
continue;
}
}
if let Some(dep) = request
.dependencies
.iter()
.find(|dep| failed_requests.contains(*dep))
{
let failure = RequestFailure::dependency(idx, request, dep.clone());
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
failed_requests.insert(request.description.clone());
completed.insert(idx);
if request.map_iteration.is_some() {
map_failures.insert(
request.base_description.clone(),
request.description.clone(),
);
}
if options.continue_on_error {
failures.push(failure);
continue;
} else {
failures.push(failure);
records.sort_by_key(|record| record.index);
join_set.abort_all();
return Err(ExecutionError::new(failures, records));
}
}
if !request
.dependencies
.iter()
.all(|dep| results.contains_key(dep))
{
if waiting_emitted.insert(idx) {
let waiting_on = request
.dependencies
.iter()
.filter(|dep| !results.contains_key(*dep))
.cloned()
.collect::<Vec<_>>();
emit_event(
observer,
ExecutionEvent::RequestWaiting {
index: idx,
request: request.description.clone(),
dependencies: request.dependencies.clone(),
waiting_on,
protocol: request.protocol(),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
},
);
}
continue;
}
if in_flight.len() >= limit {
break;
}
let (inherited_context, dependency_snapshots, inherited_sensitive_values) =
match build_dependency_context(request, &results) {
Ok(ctx) => ctx,
Err(missing) => {
let failure = RequestFailure::missing_dependency(
idx,
request,
missing,
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
failed_requests.insert(request.description.clone());
completed.insert(idx);
if request.map_iteration.is_some() {
map_failures.insert(
request.base_description.clone(),
request.description.clone(),
);
}
failures.push(failure);
records.sort_by_key(|record| record.index);
join_set.abort_all();
return Err(ExecutionError::new(failures, records));
}
};
if request.map_iteration.is_some() {
map_in_progress.insert(request.base_description.clone());
}
waiting_emitted.remove(&idx);
emit_event(
observer,
ExecutionEvent::RequestStarted {
index: idx,
request: request.description.clone(),
dependencies: request.dependencies.clone(),
protocol: request.protocol(),
protocol_context: request.protocol_context_json(),
sensitive_values: request.sensitive_values.clone(),
},
);
let observer_for_exec = observer.cloned();
let req_clone = request.clone();
let sessions = sessions.clone();
join_set.spawn(async move {
let started_at = SystemTime::now();
let timer = Instant::now();
let desc = req_clone.description.clone();
let http = req_clone.http_operation();
let method = http.method.clone();
let url = http.url.clone();
let result = req_clone
.exec(
&inherited_context,
&inherited_sensitive_values,
&dependency_snapshots,
&sessions,
observer_for_exec.as_ref(),
)
.await;
let duration = timer.elapsed();
(idx, desc, method, url, started_at, duration, result)
});
in_flight.insert(idx);
scheduled_this_round = true;
}
if in_flight.is_empty() {
if !scheduled_this_round {
break;
}
continue;
}
if let Some(join_result) = join_set.join_next().await {
match join_result {
Ok((idx, desc, method, url, started_at, duration, outcome)) => {
in_flight.remove(&idx);
completed.insert(idx);
let base = requests
.get(idx)
.map(|req| req.base_description.clone())
.unwrap_or_default();
if requests
.get(idx)
.and_then(|req| req.map_iteration.as_ref())
.is_some()
{
map_in_progress.remove(&base);
}
match outcome {
Ok(execution) => {
results.insert(desc.clone(), execution.clone());
let record = ExecutionRecord {
index: idx,
description: desc,
method,
url,
sensitive_values: execution.sensitive_values.clone(),
sensitive_header_names: requests
.get(idx)
.map(|request| {
request.redaction_rules.additional_header_names().to_vec()
})
.unwrap_or_default(),
execution,
started_at,
duration,
};
emit_event(
observer,
ExecutionEvent::RequestCompleted {
record: record.clone(),
},
);
records.push(record);
}
Err(err) => {
failed_requests.insert(desc.clone());
let (message, assertions, artifact, sensitive_values) =
err.into_parts();
let failure = RequestFailure::execution(
idx,
requests
.get(idx)
.expect("parallel failure should reference request"),
message,
assertions,
artifact,
sensitive_values,
started_at,
duration,
);
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
if requests
.get(idx)
.and_then(|req| req.map_iteration.as_ref())
.is_some()
{
map_failures.insert(base.clone(), desc.clone());
}
if options.continue_on_error {
failures.push(failure);
} else {
failures.push(failure);
records.sort_by_key(|record| record.index);
join_set.abort_all();
return Err(ExecutionError::new(failures, records));
}
}
}
}
Err(err) => {
let failure = RequestFailure::join_error(err.to_string());
emit_event(
observer,
ExecutionEvent::RequestFailed {
failure: failure.clone(),
},
);
failures.push(failure);
records.sort_by_key(|record| record.index);
join_set.abort_all();
return Err(ExecutionError::new(failures, records));
}
}
}
}
records.sort_by_key(|record| record.index);
if failures.is_empty() {
Ok(records)
} else {
Err(ExecutionError::new(failures, records))
}
}
fn emit_event(observer: Option<&ExecutionObserver>, event: ExecutionEvent) {
if let Some(callback) = observer {
callback(event);
}
}
fn build_dependency_context(
request: &Request,
results: &HashMap<String, RequestExecution>,
) -> Result<(
HashMap<String, String>,
HashMap<String, ExecutionArtifact>,
Vec<String>,
), String> {
let mut inherited_context: HashMap<String, String> = HashMap::new();
let mut dependency_snapshots: HashMap<String, ExecutionArtifact> = HashMap::new();
let mut inherited_sensitive_values = Vec::new();
for dependency in &request.dependencies {
let dep_exec = results.get(dependency).ok_or_else(|| dependency.clone())?;
for (key, value) in &dep_exec.export_env {
inherited_context.insert(key.clone(), value.clone());
}
dependency_snapshots.insert(dependency.clone(), dep_exec.artifact.clone());
inherited_sensitive_values.extend(dep_exec.sensitive_export_values.iter().cloned());
}
inherited_sensitive_values.sort_by(|left, right| {
right
.len()
.cmp(&left.len())
.then_with(|| left.cmp(right))
});
inherited_sensitive_values.dedup();
Ok((
inherited_context,
dependency_snapshots,
inherited_sensitive_values,
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parser::context::{self, PromptMode};
use crate::request::{HttpOperation, RequestOperation};
use std::{
collections::HashMap,
path::PathBuf,
sync::{Arc, Mutex, OnceLock},
};
fn test_guard() -> std::sync::MutexGuard<'static, ()> {
static TEST_GUARD: OnceLock<Mutex<()>> = OnceLock::new();
TEST_GUARD.get_or_init(|| Mutex::new(())).lock().unwrap()
}
fn request_with_missing_prompt(name: &str) -> Request {
Request {
description: name.to_string(),
base_description: name.to_string(),
operation: RequestOperation::Http(HttpOperation {
method: Method::GET,
url: "https://example.com/[[ missing ]]".to_string(),
headers: HashMap::new(),
query_params: HashMap::new(),
form_data: HashMap::new(),
body: None,
body_content_type: None,
}),
session_name: None,
auth_profile_name: None,
auth_profile: None,
callback_src: vec![],
response_captures: vec![],
assertions: vec![],
declared_dependencies: vec![],
dependencies: vec![],
context: HashMap::new(),
redaction_rules: crate::request::RedactionRules::default(),
sensitive_values: vec![],
working_dir: PathBuf::new(),
map_iteration: None,
}
}
fn request_with_dependency(name: &str, dependency: &str) -> Request {
let mut request = Request {
description: name.to_string(),
base_description: name.to_string(),
operation: RequestOperation::Http(HttpOperation {
method: Method::GET,
url: "https://example.com".to_string(),
headers: HashMap::new(),
query_params: HashMap::new(),
form_data: HashMap::new(),
body: None,
body_content_type: None,
}),
session_name: None,
auth_profile_name: None,
auth_profile: None,
callback_src: vec![],
response_captures: vec![],
assertions: vec![],
declared_dependencies: vec![dependency.to_string()],
dependencies: vec![dependency.to_string()],
context: HashMap::new(),
redaction_rules: crate::request::RedactionRules::default(),
sensitive_values: vec![],
working_dir: PathBuf::new(),
map_iteration: None,
};
request.dependencies = vec![dependency.to_string()];
request
}
#[tokio::test]
async fn sequential_execution_stops_after_first_failure() {
let _guard = test_guard();
context::set_prompt_mode(PromptMode::NonInteractive);
context::set_prompt_inputs(HashMap::new());
let requests = vec![
request_with_missing_prompt("A"),
request_with_dependency("B", "A"),
request_with_missing_prompt("C"),
];
let err = execute_plan(&requests, &[0, 1, 2], ExecutionOptions::default())
.await
.expect_err("execution should fail");
let (failures, completed) = err.into_parts();
assert!(completed.is_empty());
assert_eq!(failures.len(), 1);
assert_eq!(failures[0].request(), "A");
assert!(matches!(
failures[0].kind(),
RequestFailureKind::Execution { .. }
));
context::set_prompt_inputs(HashMap::new());
context::set_prompt_mode(PromptMode::Interactive);
}
#[tokio::test]
async fn sequential_continue_on_error_collects_dependency_failures() {
let _guard = test_guard();
context::set_prompt_mode(PromptMode::NonInteractive);
context::set_prompt_inputs(HashMap::new());
let requests = vec![
request_with_missing_prompt("A"),
request_with_dependency("B", "A"),
request_with_missing_prompt("C"),
];
let err = execute_plan(
&requests,
&[0, 1, 2],
ExecutionOptions {
continue_on_error: true,
..ExecutionOptions::default()
},
)
.await
.expect_err("execution should fail");
let (failures, completed) = err.into_parts();
assert!(completed.is_empty());
assert_eq!(failures.len(), 3);
assert!(failures.iter().any(|failure| matches!(
failure.kind(),
RequestFailureKind::Dependency { dependency } if dependency == "A"
)));
context::set_prompt_inputs(HashMap::new());
context::set_prompt_mode(PromptMode::Interactive);
}
#[tokio::test]
async fn parallel_continue_on_error_collects_independent_failures() {
let _guard = test_guard();
context::set_prompt_mode(PromptMode::NonInteractive);
context::set_prompt_inputs(HashMap::new());
let requests = vec![
request_with_missing_prompt("A"),
request_with_dependency("B", "A"),
request_with_missing_prompt("C"),
];
let err = execute_plan(
&requests,
&[0, 1, 2],
ExecutionOptions {
parallel: true,
continue_on_error: true,
..ExecutionOptions::default()
},
)
.await
.expect_err("execution should fail");
let (failures, completed) = err.into_parts();
assert!(completed.is_empty());
assert_eq!(failures.len(), 3);
assert!(failures.iter().any(|failure| failure.request() == "A"));
assert!(failures.iter().any(|failure| failure.request() == "B"));
assert!(failures.iter().any(|failure| failure.request() == "C"));
context::set_prompt_inputs(HashMap::new());
context::set_prompt_mode(PromptMode::Interactive);
}
#[tokio::test]
async fn parallel_observer_emits_waiting_and_skip_trace_events() {
let _guard = test_guard();
context::set_prompt_mode(PromptMode::NonInteractive);
context::set_prompt_inputs(HashMap::new());
let requests = vec![
request_with_missing_prompt("A"),
request_with_dependency("B", "A"),
request_with_missing_prompt("C"),
];
let trace = Arc::new(Mutex::new(ExecutionTraceCollector::default()));
let observer: ExecutionObserver = {
let trace = Arc::clone(&trace);
Arc::new(move |event| {
trace.lock().unwrap().record_event(&event);
})
};
let _ = execute_plan_with_observer(
&requests,
&[0, 1, 2],
ExecutionOptions {
parallel: true,
continue_on_error: true,
..ExecutionOptions::default()
},
Some(observer),
)
.await;
let entries = trace.lock().unwrap().snapshot();
assert!(entries.iter().any(|entry| {
entry.kind == ExecutionTraceKind::Waiting
&& entry.request.as_deref() == Some("B")
&& entry.waiting_on == vec!["A"]
}));
assert!(entries.iter().any(|entry| {
entry.kind == ExecutionTraceKind::Skipped
&& entry.request.as_deref() == Some("B")
&& entry.reason.as_deref() == Some("dependency_failed")
&& entry.related_request.as_deref() == Some("A")
}));
context::set_prompt_inputs(HashMap::new());
context::set_prompt_mode(PromptMode::Interactive);
}
}