use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex, mpsc};
use super::error::JobError;
use super::job::{Job, JobQueue, JobResult};
use super::message::WorkerMessage;
use super::progress::{Mode, ParProgress};
use super::worker::{Worker, WorkerId};
use crate::output;
use crate::pretty::PrettyMode;
use crate::util::term::{Stderr, Stdout, WriteMode};
use hurl_core::error::{DisplaySourceError, OutputFormat};
use hurl_core::types::{Count, Index};
pub struct ParallelRunner {
workers: Vec<(Worker, WorkerState)>,
tx: Option<Sender<Job>>,
rx: Receiver<WorkerMessage>,
progress: ParProgress,
output_type: OutputType,
repeat: Count,
}
#[allow(clippy::large_enum_variant)]
pub enum WorkerState {
Idle,
Running {
job: Job,
current_entry: Index,
last_entry: Index,
retry_count: usize,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum OutputType {
ResponseBody {
include_headers: bool,
color: bool,
pretty: PrettyMode,
},
Json,
NoOutput,
}
const MAX_RUNNING_DISPLAYED: usize = 8;
impl ParallelRunner {
pub fn new(
workers_count: usize,
output_type: OutputType,
repeat: Count,
test: bool,
progress_bar: bool,
color_stderr: bool,
max_width: Option<usize>,
) -> Self {
let (tx_in, rx_in) = mpsc::channel();
let (tx_out, rx_out) = mpsc::channel();
let rx_out = Arc::new(Mutex::new(rx_out));
let workers = (0..workers_count)
.map(|i| {
let worker = Worker::new(WorkerId::from(i), &tx_in, &rx_out);
let state = WorkerState::Idle;
(worker, state)
})
.collect::<Vec<_>>();
let mode = Mode::new(test, progress_bar);
let progress = ParProgress::new(MAX_RUNNING_DISPLAYED, mode, color_stderr, max_width);
ParallelRunner {
workers,
tx: Some(tx_out),
rx: rx_in,
progress,
output_type,
repeat,
}
}
pub fn run(&mut self, jobs: &[Job]) -> Result<Vec<JobResult>, JobError> {
let mut stdout = Stdout::new(WriteMode::Immediate);
let mut stderr = Stderr::new(WriteMode::Immediate);
let mut queue = JobQueue::new(jobs, self.repeat);
let jobs_count = queue.jobs_count();
self.workers.iter().for_each(|_| {
if let Some(job) = queue.next() {
_ = self.tx.as_ref().unwrap().send(job);
}
});
let mut append = false;
let mut results = vec![];
for msg in self.rx.iter() {
match msg {
WorkerMessage::InputReadError(msg) => {
self.progress.clear_progress_bar(&mut stderr);
let filename = msg.job.filename;
let error = msg.error;
let message = format!("Issue reading from {filename}: {error}");
return Err(JobError::InputRead(message));
}
WorkerMessage::ParsingError(msg) => {
self.progress.clear_progress_bar(&mut stderr);
stderr.eprint(msg.stderr.buffer());
return Err(JobError::Parsing);
}
WorkerMessage::Running(msg) => {
self.workers[msg.worker_id.0].1 = WorkerState::Running {
job: msg.job,
current_entry: msg.current_entry,
last_entry: msg.last_entry,
retry_count: msg.retry_count,
};
if self.progress.can_update() {
self.progress.clear_progress_bar(&mut stderr);
self.progress.update_progress_bar(
&self.workers,
results.len(),
jobs_count,
&mut stderr,
);
}
}
WorkerMessage::Completed(msg) => {
self.progress.clear_progress_bar(&mut stderr);
self.workers[msg.worker_id.0].1 = WorkerState::Idle;
if !msg.stderr.buffer().is_empty() {
stderr.eprint(msg.stderr.buffer());
}
if !msg.stdout.buffer().is_empty() {
let ret = stdout.write_all(msg.stdout.buffer());
if ret.is_err() {
return Err(JobError::OutputWrite(
"Issue writing to stdout".to_string(),
));
}
}
self.print_output(&msg.result, &mut stdout, append)?;
append = true;
self.progress.print_completed(&msg.result, &mut stderr);
results.push(msg.result);
self.progress.update_progress_bar(
&self.workers,
results.len(),
jobs_count,
&mut stderr,
);
self.progress.force_next_update();
let job = queue.next();
match job {
Some(job) => {
_ = self.tx.as_ref().unwrap().send(job);
}
None => {
if let Count::Finite(jobs_count) = jobs_count
&& results.len() == jobs_count
{
break;
}
}
}
}
}
}
drop(self.tx.take());
for worker in &mut self.workers {
if let Some(thread) = worker.0.take_thread() {
thread.join().unwrap();
}
}
results.sort_unstable_by_key(|result| result.job.seq);
Ok(results)
}
fn print_output(
&self,
result: &JobResult,
stdout: &mut Stdout,
append: bool,
) -> Result<(), JobError> {
let job = &result.job;
let content = &result.content;
let hurl_result = &result.hurl_result;
let filename_in = &job.filename;
let filename_out = job.runner_options.output.as_ref();
match self.output_type {
OutputType::ResponseBody {
include_headers,
color,
pretty,
} => {
if hurl_result.success {
let result = output::write_last_body(
hurl_result,
include_headers,
color,
pretty,
filename_out,
stdout,
append,
);
if let Err(e) = result {
let message = e.render(
&filename_in.to_string(),
content,
None,
OutputFormat::Terminal(color),
);
return Err(JobError::OutputWrite(message));
}
}
}
OutputType::Json => {
let result = output::write_json(
hurl_result,
content,
filename_in,
filename_out,
stdout,
append,
);
if let Err(error) = result {
return Err(JobError::OutputWrite(error.to_string()));
}
}
OutputType::NoOutput => {}
}
Ok(())
}
}