use {
crate::*,
std::{
io::{
self,
BufRead,
BufReader,
},
process::{
Child,
Command,
},
thread,
time::Instant,
},
termimad::crossbeam::channel::{
self,
Receiver,
Sender,
},
};
pub struct MissionExecutor {
command_builder: CommandBuilder,
kill_command: Option<Vec<String>>,
line_sender: Sender<CommandExecInfo>,
pub line_receiver: Receiver<CommandExecInfo>,
}
pub struct TaskExecutor {
child_thread: thread::JoinHandle<()>,
stop_sender: Sender<StopMessage>,
grace_period_start: Option<Instant>, grace_period: Period,
}
#[derive(Clone, Copy)]
enum StopMessage {
SendStatus, Kill, }
impl TaskExecutor {
pub fn interrupt(self) {
let _ = self.stop_sender.send(StopMessage::Kill);
}
pub fn die(self) {
if let Err(e) = self.stop_sender.send(StopMessage::Kill) {
debug!("failed to send 'die' signal: {e}");
}
if self.child_thread.join().is_err() {
warn!("child_thread.join() failed"); }
}
pub fn is_in_grace_period(&mut self) -> bool {
if let Some(grace_period_start) = self.grace_period_start {
if grace_period_start.elapsed() < self.grace_period.duration {
return true;
}
self.grace_period_start = None;
}
false
}
}
impl MissionExecutor {
pub fn new(mission: &Mission) -> anyhow::Result<Self> {
let command_builder = mission.get_command()?;
let kill_command = mission.kill_command();
let (line_sender, line_receiver) = channel::unbounded();
Ok(Self {
command_builder,
kill_command,
line_sender,
line_receiver,
})
}
pub fn start(
&mut self,
task: Task,
) -> anyhow::Result<TaskExecutor> {
info!("start task {task:?}");
let grace_period = task.grace_period;
let grace_period_start = if grace_period.is_zero() {
None
} else {
Some(Instant::now())
};
let mut command_builder = self.command_builder.clone();
if let Some(backtrace) = task.backtrace {
command_builder.env("RUST_BACKTRACE", backtrace);
}
let kill_command = self.kill_command.clone();
let with_stdout = command_builder.is_with_stdout();
let line_sender = self.line_sender.clone();
let (stop_sender, stop_receiver) = channel::bounded(1);
let err_stop_sender = stop_sender.clone();
let child_thread = thread::spawn(move || {
if !grace_period.is_zero() {
thread::sleep(grace_period.duration);
}
let mut cmd = command_builder.build();
let mut child = match cmd.spawn() {
Ok(child) => child,
Err(e) => {
let _ = line_sender.send(CommandExecInfo::Error(
anyhow::anyhow!(e).context(format!("failed to spawn {cmd:?}")),
));
return;
}
};
if with_stdout {
let sender = line_sender.clone();
let Some(stdout) = child.stdout.take() else {
warn!("process has no stdout"); return;
};
let mut buf_reader = BufReader::new(stdout);
thread::spawn(move || {
let mut line = String::new();
loop {
match buf_reader.read_line(&mut line) {
Err(e) => {
warn!("error : {e}");
}
Ok(0) => {
break;
}
Ok(_) => {
let response = CommandExecInfo::Line(RawCommandOutputLine {
content: line.clone(),
origin: CommandStream::StdOut,
});
if sender.send(response).is_err() {
break; }
}
}
line.clear();
}
});
}
let err_line_sender = line_sender.clone();
let stderr = child
.stderr
.take()
.expect("MissionExecutor requires piped stderr");
let mut buf_reader = BufReader::new(stderr);
thread::spawn(move || {
let mut line = String::new();
loop {
match buf_reader.read_line(&mut line) {
Err(e) => {
warn!("error : {e}");
}
Ok(0) => {
if let Err(e) = err_stop_sender.send(StopMessage::SendStatus) {
warn!("sending stop message failed: {e}");
}
break;
}
Ok(_) => {
let response = CommandExecInfo::Line(RawCommandOutputLine {
content: line.clone(),
origin: CommandStream::StdErr,
});
if err_line_sender.send(response).is_err() {
break; }
}
}
line.clear();
}
});
match stop_receiver.recv() {
Ok(stop) => match stop {
StopMessage::SendStatus => {
let status = child.wait();
if let Ok(status) = status {
let _ = line_sender.send(CommandExecInfo::End { status });
}
}
StopMessage::Kill => {
debug!("explicit interrupt received");
kill(kill_command.as_deref(), &mut child);
}
},
Err(e) => {
debug!("recv error: {e}"); kill(kill_command.as_deref(), &mut child);
}
}
if let Err(e) = child.wait() {
warn!("waiting for child failed: {e}");
}
});
Ok(TaskExecutor {
child_thread,
stop_sender,
grace_period_start,
grace_period,
})
}
}
fn kill(
kill_command: Option<&[String]>,
child: &mut Child,
) {
if let Some(kill_command) = kill_command {
info!("launch specific kill command {kill_command:?}");
let Err(e) = run_kill_command(kill_command, child) else {
return;
};
warn!("specific kill command failed: {e}");
}
child.kill().expect("command couldn't be killed");
}
fn run_kill_command(
kill_command: &[String],
child: &mut Child,
) -> io::Result<()> {
let (exe, args) = kill_command
.split_first()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "empty kill command"))?;
let mut kill = Command::new(exe);
kill.args(args);
kill.arg(child.id().to_string());
let mut proc = kill.spawn()?;
let status = proc.wait()?;
if !status.success() {
return Err(io::Error::other(format!(
"kill command returned nonzero status: {status}"
)));
}
child.wait()?;
Ok(())
}