use simple_logger;
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use hen::{
automation, benchmark,
error::{HenError, HenErrorKind, HenResult},
parser, request,
};
use crate::{
cli::{BodyMode, CommandOutcome, RunArgs},
cli_interrupt::execute_plan_with_interrupt,
cli_load::{load_collection, resolve_execution_plan},
cli_output::{
print_body_preview, print_execution_trace, print_failure_body, print_failure_line,
print_machine_error, print_run_report, print_status_line, print_summary,
print_timing_line,
},
};
struct PromptSession {
previous_mode: parser::context::PromptMode,
}
impl PromptSession {
fn configure(args: &RunArgs) -> Self {
let previous_mode = parser::context::prompt_mode();
let prompt_inputs: HashMap<String, String> = args.inputs.iter().cloned().collect();
let prompt_mode = if args.non_interactive {
parser::context::PromptMode::NonInteractive
} else {
parser::context::PromptMode::Interactive
};
parser::context::set_prompt_mode(prompt_mode);
parser::context::set_prompt_inputs(prompt_inputs);
Self { previous_mode }
}
}
impl Drop for PromptSession {
fn drop(&mut self) {
parser::context::set_prompt_inputs(HashMap::new());
parser::context::set_prompt_mode(self.previous_mode);
}
}
pub(crate) async fn run(args: RunArgs) -> HenResult<CommandOutcome> {
if args.output.is_text() {
run_text(args).await?;
return Ok(CommandOutcome::success());
}
Ok(run_machine_output(args).await)
}
async fn run_text(args: RunArgs) -> HenResult<()> {
let _prompt_session = PromptSession::configure(&args);
if args.verbose {
simple_logger::init_with_level(log::Level::Debug).map_err(|err| {
HenError::new(HenErrorKind::Cli, "Failed to initialize logger")
.with_detail(err.to_string())
})?;
}
log::debug!("Starting hen with args {:?}", args);
let cwd = std::env::current_dir().map_err(|err| {
HenError::new(HenErrorKind::Io, "Failed to determine current directory")
.with_detail(err.to_string())
})?;
let collection = load_collection(&args, cwd.clone()).map_err(|err| match err.kind() {
HenErrorKind::Parse | HenErrorKind::Io => err,
_ => err.with_detail("While loading collection"),
})?;
log::debug!("PARSED COLLECTION\n{:#?}", collection);
let planner = request::RequestPlanner::new(&collection.requests).map_err(|err| {
HenError::new(
HenErrorKind::Planner,
"Failed to build request dependency graph",
)
.with_detail(err.to_string())
})?;
let (plan, display_targets, primary_target) =
resolve_execution_plan(&collection, &planner, &args)?;
automation::validate_plan_prompt_inputs(&collection.requests, &plan)?;
if args.export {
for idx in &display_targets {
if let Some(request) = collection.requests.get(*idx) {
println!("{}", request.as_curl());
}
}
return Ok(());
}
if let Some(count) = args.benchmark {
if let Some(target) = primary_target {
benchmark::benchmark(&collection.requests, &planner, target, count)
.await
.map_err(|err| {
HenError::new(HenErrorKind::Benchmark, "Benchmark execution failed")
.with_detail(err.to_string())
})?;
} else {
return Err(HenError::new(
HenErrorKind::Benchmark,
"Benchmark requires selecting a specific request",
)
.with_exit_code(2));
}
return Ok(());
}
let parallel_enabled = args.parallel || args.max_concurrency > 0;
let max_concurrency = if args.max_concurrency == 0 {
None
} else {
Some(args.max_concurrency)
};
let execution_options = request::ExecutionOptions {
parallel: parallel_enabled,
max_concurrency,
continue_on_error: args.continue_on_error,
};
log::debug!(
"Execution options: parallel={}, max_concurrency={:?}, continue_on_error={}",
execution_options.parallel,
execution_options.max_concurrency,
execution_options.continue_on_error
);
let display_set: HashSet<usize> = display_targets.iter().copied().collect();
let display_set_shared = Arc::new(display_set.clone());
let verbose = args.verbose;
let body_mode = args.body;
#[derive(Default)]
struct StreamPrinterState {
needs_separator: bool,
}
let printer_state = Arc::new(Mutex::new(StreamPrinterState::default()));
let events_emitted = Arc::new(AtomicBool::new(false));
let observer: request::ExecutionObserver = Arc::new({
let display_set = Arc::clone(&display_set_shared);
let printer_state = Arc::clone(&printer_state);
let events_emitted = Arc::clone(&events_emitted);
move |event: request::ExecutionEvent| match event {
request::ExecutionEvent::RequestWaiting { .. }
| request::ExecutionEvent::RequestStarted { .. } => {}
request::ExecutionEvent::RequestCompleted { record } => {
let mut state = printer_state.lock().unwrap();
events_emitted.store(true, Ordering::Relaxed);
if state.needs_separator {
println!();
} else {
state.needs_separator = true;
}
print_status_line(&record);
if verbose {
print_timing_line(record.execution.artifact.timing_phases.as_slice());
}
if should_print_record_body(body_mode, record.index, &display_set) {
print_body_preview(&record, verbose);
}
}
request::ExecutionEvent::RequestFailed { failure } => {
let mut state = printer_state.lock().unwrap();
events_emitted.store(true, Ordering::Relaxed);
if state.needs_separator {
println!();
} else {
state.needs_separator = true;
}
print_failure_line(&failure);
if should_print_failure_body(body_mode, &failure, &display_set) {
print_failure_body(&failure, verbose);
}
if verbose {
if let Some(artifact) = failure.artifact() {
print_timing_line(artifact.timing_phases.as_slice());
}
}
}
request::ExecutionEvent::AssertionPassed { request, assertion } => {
let _guard = printer_state.lock().unwrap();
events_emitted.store(true, Ordering::Relaxed);
println!("✅ [{}] [{}]", request, assertion);
}
}
});
let execution_result = execute_plan_with_interrupt(
&collection.requests,
&plan,
execution_options,
Some(observer),
)
.await;
let execution_result = execution_result?;
let records = execution_result.records;
let failures = execution_result.failures;
let trace = execution_result.trace;
let execution_failed = execution_result.execution_failed;
let interrupted = execution_result.interrupted;
if events_emitted.load(Ordering::Relaxed) {
println!();
} else {
if !records.is_empty() {
for (idx, record) in records.iter().enumerate() {
if idx > 0 {
println!();
}
print_status_line(record);
if verbose {
print_timing_line(record.execution.artifact.timing_phases.as_slice());
}
if should_print_record_body(body_mode, record.index, &display_set) {
print_body_preview(record, verbose);
}
}
}
if !failures.is_empty() {
if !records.is_empty() {
println!();
}
for failure in &failures {
print_failure_line(failure);
if should_print_failure_body(body_mode, failure, &display_set) {
print_failure_body(failure, verbose);
}
if verbose {
if let Some(artifact) = failure.artifact() {
print_timing_line(artifact.timing_phases.as_slice());
}
}
}
}
if !records.is_empty() || !failures.is_empty() {
println!();
}
}
if args.verbose && !trace.is_empty() {
print_execution_trace(&trace);
println!();
}
print_summary(
&records,
&failures,
interrupted,
plan.len(),
collection.selected_environment.as_deref(),
);
if let Some(signal) = interrupted {
return Err(
HenError::new(
HenErrorKind::Execution,
format!("Execution interrupted by {}", signal.as_str()),
)
.with_exit_code(signal.exit_code()),
);
}
if !failures.is_empty() {
let failure_details = failures
.iter()
.map(|failure| failure.to_string())
.collect::<Vec<_>>()
.join("\n");
return Err(
HenError::new(HenErrorKind::Execution, "One or more requests failed")
.with_detail(failure_details),
);
}
if execution_failed {
return Err(HenError::new(
HenErrorKind::Execution,
"Execution terminated before completing all requests",
));
}
Ok(())
}
fn should_print_record_body(
body_mode: BodyMode,
record_index: usize,
display_set: &HashSet<usize>,
) -> bool {
body_mode == BodyMode::All
|| (body_mode.shows_selected() && display_set.contains(&record_index))
}
fn should_print_failure_body(
body_mode: BodyMode,
failure: &request::RequestFailure,
display_set: &HashSet<usize>,
) -> bool {
let is_selected = failure.index().is_some_and(|idx| display_set.contains(&idx));
failure.artifact().is_some()
&& (body_mode.shows_failed() || (body_mode.shows_selected() && is_selected))
}
async fn run_machine_output(args: RunArgs) -> CommandOutcome {
let output = args.output;
let suite_name = args.path.as_deref().unwrap_or("hen run").to_string();
match build_machine_run_outcome(&args).await {
Ok(outcome) => {
print_run_report(output, &outcome);
let exit_code = if let Some(signal) = outcome.interrupted {
signal.exit_code()
} else if outcome.failures.is_empty() && !outcome.execution_failed {
0
} else {
1
};
CommandOutcome::with_exit_code(exit_code)
}
Err(err) => {
print_machine_error(output, &suite_name, "run", &err);
CommandOutcome::with_exit_code(err.exit_code())
}
}
}
async fn build_machine_run_outcome(args: &RunArgs) -> HenResult<automation::RunOutcome> {
if args.export {
return Err(HenError::new(
HenErrorKind::Input,
format!(
"--output {} cannot be combined with --export",
args.output.as_str()
),
)
.with_exit_code(2));
}
if args.benchmark.is_some() {
return Err(HenError::new(
HenErrorKind::Input,
format!(
"--output {} cannot be combined with --benchmark",
args.output.as_str()
),
)
.with_exit_code(2));
}
let path = match args.path.as_ref() {
Some(path) => PathBuf::from(path),
None => std::env::current_dir().map_err(|err| {
HenError::new(HenErrorKind::Io, "Failed to determine current directory")
.with_detail(err.to_string())
})?,
};
let parallel_enabled = args.parallel || args.max_concurrency > 0;
let max_concurrency = if args.max_concurrency == 0 {
None
} else {
Some(args.max_concurrency)
};
let execution_options = request::ExecutionOptions {
parallel: parallel_enabled,
max_concurrency,
continue_on_error: args.continue_on_error,
};
let prepared = automation::prepare_run_path(automation::RunRequest {
path,
selector: args.selector.clone(),
environment: args.env.clone(),
inputs: args.inputs.iter().cloned().collect(),
execution_options: execution_options.clone(),
})
.await?;
let execution = execute_plan_with_interrupt(
&prepared.requests,
&prepared.plan,
execution_options,
None,
)
.await?;
Ok(automation::finish_run(
prepared,
execution.records,
execution.failures,
execution.trace,
execution.execution_failed,
execution.interrupted,
))
}
#[cfg(test)]
mod tests {
use super::*;
fn test_guard() -> std::sync::MutexGuard<'static, ()> {
parser::context::prompt_state_test_guard()
}
#[test]
fn prompt_session_is_interactive_for_text_runs_by_default() {
let _guard = test_guard();
let previous_mode = parser::context::prompt_mode();
parser::context::set_prompt_mode(parser::context::PromptMode::NonInteractive);
let session = PromptSession::configure(&RunArgs::default());
assert_eq!(
parser::context::prompt_mode(),
parser::context::PromptMode::Interactive
);
drop(session);
parser::context::set_prompt_mode(previous_mode);
}
#[test]
fn prompt_session_remains_non_interactive_when_requested() {
let _guard = test_guard();
let previous_mode = parser::context::prompt_mode();
parser::context::set_prompt_mode(parser::context::PromptMode::Interactive);
let session = PromptSession::configure(&RunArgs {
non_interactive: true,
..RunArgs::default()
});
assert_eq!(
parser::context::prompt_mode(),
parser::context::PromptMode::NonInteractive
);
drop(session);
parser::context::set_prompt_mode(previous_mode);
}
}