use std::{
ffi::c_int,
io::{self, Read, Write},
os::unix::{net::UnixStream, process::CommandExt},
process::{exit, Command},
};
use crate::{
exec::terminate_process,
system::{
fork, getpgid, getpgrp,
interface::ProcessId,
kill, setpgid, setsid,
signal::SignalInfo,
term::{PtyFollower, Terminal},
wait::{Wait, WaitError, WaitOptions, WaitStatus},
ForkResult,
},
};
use crate::{
exec::{
event::StopReason,
use_pty::{SIGCONT_BG, SIGCONT_FG},
},
log::{dev_error, dev_info, dev_warn},
};
use signal_hook::consts::*;
use crate::exec::{
event::{EventClosure, EventDispatcher},
io_util::{retry_while_interrupted, was_interrupted},
use_pty::backchannel::{MonitorBackchannel, MonitorMessage, ParentMessage},
};
use crate::exec::{opt_fmt, signal_fmt};
pub(super) fn exec_monitor(
pty_follower: PtyFollower,
command: Command,
foreground: bool,
backchannel: &mut MonitorBackchannel,
) -> io::Result<()> {
let mut dispatcher = EventDispatcher::<MonitorClosure>::new()?;
setsid().map_err(|err| {
dev_warn!("cannot start a new session: {err}");
err
})?;
pty_follower.make_controlling_terminal().map_err(|err| {
dev_warn!("cannot set the controlling terminal: {err}");
err
})?;
let (mut errpipe_tx, errpipe_rx) = UnixStream::pair()?;
let event = retry_while_interrupted(|| backchannel.recv()).map_err(|err| {
dev_warn!("cannot receive green light from parent: {err}");
err
})?;
debug_assert_eq!(event, MonitorMessage::ExecCommand);
let ForkResult::Parent(command_pid) = fork().map_err(|err| {
dev_warn!("unable to fork command process: {err}");
err
})? else {
drop(errpipe_rx);
let err = exec_command(command, foreground, pty_follower);
dev_warn!("failed to execute command: {err}");
if let Some(error_code) = err.raw_os_error() {
errpipe_tx.write_all(&error_code.to_ne_bytes()).ok();
}
drop(errpipe_tx);
exit(1)
};
if let Err(err) = backchannel.send(&ParentMessage::CommandPid(command_pid)) {
dev_warn!("cannot send command PID to parent: {err}");
}
let mut closure = MonitorClosure::new(
command_pid,
pty_follower,
errpipe_rx,
backchannel,
&mut dispatcher,
);
if foreground {
if let Err(err) = closure.pty_follower.tcsetpgrp(closure.command_pgrp) {
dev_error!(
"cannot set foreground progess group to command ({}): {err}",
closure.command_pgrp
);
}
}
let reason = dispatcher.event_loop(&mut closure);
if let Some(command_pid) = closure.command_pid {
terminate_process(command_pid, true);
loop {
match command_pid.wait(WaitOptions::new()) {
Err(WaitError::Io(err)) if was_interrupted(&err) => {}
_ => break,
}
}
}
if let Err(err) = closure.pty_follower.tcsetpgrp(closure.monitor_pgrp) {
dev_error!(
"cannot set foreground process group to monitor ({}): {err}",
closure.monitor_pgrp
);
}
match reason {
StopReason::Break(err) => match err.try_into() {
Ok(msg) => {
if let Err(err) = closure.backchannel.send(&msg) {
dev_warn!("cannot send message over backchannel: {err}")
}
}
Err(err) => {
dev_warn!("socket error `{err:?}` cannot be converted to a message")
}
},
StopReason::Exit(command_status) => {
if let Err(err) = closure.backchannel.send(&command_status.into()) {
dev_warn!("command status cannot be send over backchannel: {err}")
}
}
}
drop(closure);
exit(1)
}
fn exec_command(mut command: Command, foreground: bool, pty_follower: PtyFollower) -> io::Error {
let command_pid = std::process::id() as ProcessId;
setpgid(0, command_pid).ok();
if foreground {
while !pty_follower.tcgetpgrp().is_ok_and(|pid| pid == command_pid) {
std::thread::sleep(std::time::Duration::from_micros(1));
}
}
drop(pty_follower);
command.exec()
}
struct MonitorClosure<'a> {
command_pid: Option<ProcessId>,
command_pgrp: ProcessId,
monitor_pgrp: ProcessId,
pty_follower: PtyFollower,
errpipe_rx: UnixStream,
backchannel: &'a mut MonitorBackchannel,
}
impl<'a> MonitorClosure<'a> {
fn new(
command_pid: ProcessId,
pty_follower: PtyFollower,
errpipe_rx: UnixStream,
backchannel: &'a mut MonitorBackchannel,
dispatcher: &mut EventDispatcher<Self>,
) -> Self {
let monitor_pgrp = getpgrp();
dispatcher.set_read_callback(&errpipe_rx, |monitor, dispatcher| {
monitor.read_errpipe(dispatcher)
});
dispatcher.set_read_callback(backchannel, |monitor, dispatcher| {
monitor.read_backchannel(dispatcher)
});
let command_pgrp = command_pid;
if let Err(err) = setpgid(command_pid, command_pgrp) {
dev_warn!("cannot set process group ID for process: {err}");
};
Self {
command_pid: Some(command_pid),
command_pgrp,
monitor_pgrp,
pty_follower,
errpipe_rx,
backchannel,
}
}
fn read_backchannel(&mut self, dispatcher: &mut EventDispatcher<Self>) {
match self.backchannel.recv() {
Err(err) if was_interrupted(&err) => {}
Err(err) => {
dev_warn!("monitor could not read from backchannel: {}", err);
dispatcher.set_break(err);
}
Ok(event) => {
match event {
MonitorMessage::ExecCommand => unreachable!(),
MonitorMessage::Signal(signal) => {
if let Some(command_pid) = self.command_pid {
self.send_signal(signal, command_pid, true)
}
}
}
}
}
}
fn handle_sigchld(&mut self, command_pid: ProcessId, dispatcher: &mut EventDispatcher<Self>) {
let status = loop {
match command_pid.wait(WaitOptions::new().untraced().no_hang()) {
Ok((_pid, status)) => break status,
Err(WaitError::Io(err)) if was_interrupted(&err) => {}
Err(_) => return,
}
};
if let Some(exit_code) = status.exit_status() {
dev_info!("command ({command_pid}) exited with status code {exit_code}");
self.command_pid = None;
dispatcher.set_exit(status);
} else if let Some(signal) = status.term_signal() {
dev_info!(
"command ({command_pid}) was terminated by {}",
signal_fmt(signal),
);
self.command_pid = None;
dispatcher.set_exit(status);
} else if let Some(signal) = status.stop_signal() {
dev_info!(
"command ({command_pid}) was stopped by {}",
signal_fmt(signal),
);
if let Ok(pgrp) = self.pty_follower.tcgetpgrp() {
if pgrp != self.monitor_pgrp {
self.command_pgrp = pgrp;
}
}
self.backchannel
.send(&ParentMessage::CommandStatus(status))
.ok();
} else if status.did_continue() {
dev_info!("command ({command_pid}) continued execution");
} else {
dev_warn!("unexpected wait status for command ({command_pid})")
}
}
fn read_errpipe(&mut self, dispatcher: &mut EventDispatcher<Self>) {
let mut buf = 0i32.to_ne_bytes();
match self.errpipe_rx.read_exact(&mut buf) {
Err(err) if was_interrupted(&err) => { }
Err(err) => dispatcher.set_break(err),
Ok(_) => {
let error_code = i32::from_ne_bytes(buf);
self.backchannel
.send(&ParentMessage::IoError(error_code))
.ok();
}
}
}
fn send_signal(&self, signal: c_int, command_pid: ProcessId, from_parent: bool) {
dev_info!(
"sending {}{} to command",
signal_fmt(signal),
opt_fmt(from_parent, " from parent"),
);
match signal {
SIGALRM => {
terminate_process(command_pid, false);
}
SIGCONT_FG => {
if let Err(err) = self.pty_follower.tcsetpgrp(self.command_pgrp) {
dev_error!(
"cannot set the foreground process group to command ({}): {err}",
self.command_pgrp
);
}
kill(command_pid, SIGCONT).ok();
}
SIGCONT_BG => {
if let Err(err) = self.pty_follower.tcsetpgrp(self.monitor_pgrp) {
dev_error!(
"cannot set the foreground process group to monitor ({}): {err}",
self.monitor_pgrp
);
}
kill(command_pid, SIGCONT).ok();
}
signal => {
kill(command_pid, signal).ok();
}
}
}
}
fn is_self_terminating(
signaler_pid: ProcessId,
command_pid: ProcessId,
command_pgrp: ProcessId,
) -> bool {
if signaler_pid != 0 {
if signaler_pid == command_pid {
return true;
}
if let Ok(grp_leader) = getpgid(signaler_pid) {
if grp_leader == command_pgrp {
return true;
}
}
}
false
}
impl<'a> EventClosure for MonitorClosure<'a> {
type Break = io::Error;
type Exit = WaitStatus;
fn on_signal(&mut self, info: SignalInfo, dispatcher: &mut EventDispatcher<Self>) {
dev_info!(
"monitor received{} {} from {}",
opt_fmt(info.is_user_signaled(), " user signaled"),
signal_fmt(info.signal()),
info.pid()
);
let Some(command_pid) = self.command_pid else {
dev_info!("command was terminated, ignoring signal");
return;
};
match info.signal() {
SIGCHLD => self.handle_sigchld(command_pid, dispatcher),
_ if info.is_user_signaled()
&& is_self_terminating(info.pid(), command_pid, self.command_pgrp) => {}
signal => self.send_signal(signal, command_pid, false),
}
}
}