use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::{fmt, thread};
use super::job::{Job, JobResult};
use super::message::{CompletedMsg, InputReadErrorMsg, ParsingErrorMsg, RunningMsg, WorkerMessage};
use crate::runner;
use crate::runner::EventListener;
use crate::util::logger::Logger;
use crate::util::term::{Stderr, Stdout, WriteMode};
use hurl_core::error::{DisplaySourceError, OutputFormat};
use hurl_core::parser;
use hurl_core::types::Index;
pub struct Worker {
worker_id: WorkerId,
thread: Option<thread::JoinHandle<()>>,
}
impl fmt::Display for Worker {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "id: {}", self.worker_id)
}
}
#[derive(Copy, Clone, Debug)]
pub struct WorkerId(pub usize);
impl From<usize> for WorkerId {
fn from(value: usize) -> Self {
WorkerId(value)
}
}
impl fmt::Display for WorkerId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl Worker {
pub fn new(
worker_id: WorkerId,
tx: &Sender<WorkerMessage>,
rx: &Arc<Mutex<Receiver<Job>>>,
) -> Self {
let rx = Arc::clone(rx);
let tx = tx.clone();
let thread = thread::spawn(move || {
loop {
let Ok(job) = rx.lock().unwrap().recv() else {
return;
};
let mut stdout = Stdout::new(WriteMode::Buffered);
let stderr = Stderr::new(WriteMode::Buffered);
let secrets = job.variables.secrets();
let mut logger = Logger::new(&job.logger_options, stderr, &secrets);
let progress = WorkerProgress::new(worker_id, &job, &tx);
let content = job.filename.read_to_string();
let content = match content {
Ok(c) => c,
Err(e) => {
let msg = InputReadErrorMsg::new(worker_id, &job, e);
_ = tx.send(WorkerMessage::InputReadError(msg));
return;
}
};
let hurl_file = parser::parse_hurl_file(&content);
let hurl_file = match hurl_file {
Ok(h) => h,
Err(error) => {
let filename = job.filename.to_string();
let message = error.render(
&filename,
&content,
None,
OutputFormat::Terminal(logger.color),
);
logger.error_rich(&message);
let msg = ParsingErrorMsg::new(worker_id, &job, &logger.stderr);
_ = tx.send(WorkerMessage::ParsingError(msg));
return;
}
};
let result = runner::run_entries(
&hurl_file.entries,
&content,
Some(&job.filename),
&job.runner_options,
&job.variables,
&mut stdout,
Some(&progress),
&mut logger,
);
if result.success && result.entries.last().is_none() {
logger.warning(&format!(
"No entry have been executed for file {}",
job.filename
));
}
let job_result = JobResult::new(job, content, result);
let msg = CompletedMsg::new(worker_id, job_result, stdout, logger.stderr);
_ = tx.send(WorkerMessage::Completed(msg));
}
});
Worker {
worker_id,
thread: Some(thread),
}
}
pub fn take_thread(&mut self) -> Option<thread::JoinHandle<()>> {
self.thread.take()
}
}
struct WorkerProgress {
worker_id: WorkerId,
job: Job,
tx: Sender<WorkerMessage>,
}
impl WorkerProgress {
fn new(worker_id: WorkerId, job: &Job, tx: &Sender<WorkerMessage>) -> Self {
WorkerProgress {
worker_id,
job: job.clone(),
tx: tx.clone(),
}
}
}
impl EventListener for WorkerProgress {
fn on_entry_running(&self, current: Index, last: Index, retry_count: usize) {
let msg = RunningMsg::new(self.worker_id, &self.job, current, last, retry_count);
_ = self.tx.send(WorkerMessage::Running(msg));
}
}