use crate::{
ast,
common::{is_windows_subsystem_for_linux, WSL},
env::Statuses,
event::{self, Event},
flog::{flog, flogf},
global_safety::RelaxedAtomicBool,
io::IoChain,
job_group::{JobGroup, MaybeJobId},
parse_tree::NodeRef,
parser::{Block, Parser},
portable_atomic::AtomicU64,
prelude::*,
reader::{fish_is_unwinding_for_exit, reader_schedule_prompt_repaint},
redirection::RedirectionSpecList,
signal::{signal_set_handlers_once, Signal},
topic_monitor::{topic_monitor_principal, GenerationsList, Topic},
wait_handle::{InternalJobId, WaitHandle, WaitHandleRef, WaitHandleStore},
wutil::{perror_nix, wbasename},
};
use cfg_if::cfg_if;
use fish_common::{escape, timef, Timepoint};
use fish_widestring::ToWString;
use libc::{
_SC_CLK_TCK, EXIT_SUCCESS, SIGABRT, SIGBUS, SIGFPE, SIGILL, SIGINT, SIGPIPE, SIGQUIT, SIGSEGV,
SIGSYS, SIGTTOU, SIG_IGN, STDOUT_FILENO, WCONTINUED, WEXITSTATUS, WIFCONTINUED, WIFEXITED,
WIFSIGNALED, WIFSTOPPED, WNOHANG, WSTOPSIG, WTERMSIG, WUNTRACED,
};
use nix::{
sys::{
signal::{kill, killpg, SaFlags, SigAction, SigHandler, SigSet, Signal as NixSignal},
wait::{waitpid, WaitPidFlag, WaitStatus},
},
unistd::getpgrp,
};
use std::{
cell::{Cell, Ref, RefCell, RefMut},
fs,
io::{Read as _, Write as _},
num::NonZeroU32,
os::fd::RawFd,
rc::Rc,
sync::{
atomic::{AtomicU8, Ordering},
Arc, LazyLock, Mutex, OnceLock,
},
};
#[derive(Default)]
pub enum ProcessType {
#[default]
External,
Builtin,
Function,
BlockNode(NodeRef<ast::Statement>),
Exec,
}
#[repr(u8)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub enum JobControl {
All,
Interactive,
None,
}
impl TryFrom<&wstr> for JobControl {
type Error = ();
fn try_from(value: &wstr) -> Result<Self, Self::Error> {
if value == "full" {
Ok(JobControl::All)
} else if value == "interactive" {
Ok(JobControl::Interactive)
} else if value == "none" {
Ok(JobControl::None)
} else {
Err(())
}
}
}
pub type ClockTicks = u64;
pub fn clock_ticks_to_seconds(ticks: ClockTicks) -> f64 {
let clock_ticks_per_sec = unsafe { libc::sysconf(_SC_CLK_TCK) };
if clock_ticks_per_sec > 0 {
return ticks as f64 / clock_ticks_per_sec as f64;
}
0.0
}
pub type JobGroupRef = Arc<JobGroup>;
#[derive(Default, Debug, Copy, Clone)]
pub struct ProcStatus(Option<i32>);
impl ProcStatus {
fn new(status: Option<i32>) -> Self {
ProcStatus(status)
}
fn status(&self) -> i32 {
self.0.unwrap_or(0)
}
pub fn is_empty(&self) -> bool {
self.0.is_none()
}
const fn w_exitcode(ret: i32, sig: i32) -> i32 {
cfg_if! {
if #[cfg(waitstatus_signal_ret)] {
(sig << 8) | ret
} else {
(ret << 8) | sig
}
}
}
pub fn from_waitpid(status: i32) -> ProcStatus {
ProcStatus::new(Some(status))
}
pub fn from_exit_code(ret: i32) -> ProcStatus {
assert!(
ret >= 0,
"trying to create proc_status_t from failed waitid()/waitpid() call \
or invalid builtin exit code!"
);
const {
let _zerocode = ProcStatus::w_exitcode(0, 0);
assert!(
WIFEXITED(_zerocode),
"Synthetic exit status not reported as exited"
);
}
assert!(ret < 256);
ProcStatus::new(Some(Self::w_exitcode(ret, 0 )))
}
pub fn from_signal(signal: Signal) -> ProcStatus {
ProcStatus::new(Some(Self::w_exitcode(0 , signal.code())))
}
pub fn empty() -> ProcStatus {
ProcStatus::new(None)
}
pub fn stopped(&self) -> bool {
WIFSTOPPED(self.status())
}
pub fn continued(&self) -> bool {
WIFCONTINUED(self.status())
}
pub fn normal_exited(&self) -> bool {
WIFEXITED(self.status())
}
pub fn signal_exited(&self) -> bool {
WIFSIGNALED(self.status())
}
pub fn stop_signal(&self) -> libc::c_int {
assert!(self.stopped(), "Process is not signal stopped");
WSTOPSIG(self.status())
}
pub fn signal_code(&self) -> libc::c_int {
assert!(self.signal_exited(), "Process is not signal exited");
WTERMSIG(self.status())
}
pub fn exit_code(&self) -> u8 {
assert!(self.normal_exited(), "Process is not normal exited");
u8::try_from(WEXITSTATUS(self.status())).unwrap()
}
pub fn is_success(&self) -> bool {
self.normal_exited() && i32::from(self.exit_code()) == EXIT_SUCCESS
}
pub fn status_value(&self) -> i32 {
if self.signal_exited() {
128 + self.signal_code()
} else if self.normal_exited() {
i32::from(self.exit_code())
} else if self.stopped() {
128 + self.stop_signal()
} else {
panic!("Unsupported status value")
}
}
}
pub struct InternalProc {
internal_proc_id: u64,
status: OnceLock<ProcStatus>,
}
impl InternalProc {
pub fn new() -> Self {
static NEXT_PROC_ID: AtomicU64 = AtomicU64::new(0);
Self {
internal_proc_id: NEXT_PROC_ID.fetch_add(1, Ordering::SeqCst),
status: OnceLock::new(),
}
}
pub fn exited(&self) -> bool {
self.status.get().is_some()
}
pub fn mark_exited(&self, status: ProcStatus) {
self.status.set(status).expect("Status already set");
topic_monitor_principal().post(Topic::InternalExit);
flog!(
proc_internal_proc,
"Internal proc",
self.internal_proc_id,
"exited with status",
status.status_value()
);
}
pub fn get_status(&self) -> ProcStatus {
*self.status.get().expect("Process has not exited")
}
pub fn get_id(&self) -> u64 {
self.internal_proc_id
}
}
#[repr(transparent)]
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
pub struct Pid(NonZeroU32);
impl Pid {
#[inline(always)]
pub fn new(pid: i32) -> Self {
Self(
u32::try_from(pid)
.ok()
.and_then(NonZeroU32::new)
.expect("PID must be greater than zero"),
)
}
#[inline(always)]
pub fn get(&self) -> i32 {
self.0.get() as i32
}
#[inline(always)]
pub fn as_pid_t(&self) -> libc::pid_t {
#[allow(clippy::useless_conversion)]
self.get().into()
}
#[inline(always)]
pub fn as_nix_pid(&self) -> nix::unistd::Pid {
nix::unistd::Pid::from_raw(self.as_pid_t())
}
#[inline(always)]
pub fn from_nix_pid_unchecked(pid: nix::unistd::Pid) -> Self {
Self::new(pid.as_raw())
}
}
impl std::fmt::Display for Pid {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.get(), f)
}
}
impl ToWString for Pid {
fn to_wstring(&self) -> WString {
self.get().to_wstring()
}
}
impl fish_printf::ToArg<'static> for Pid {
fn to_arg(self) -> fish_printf::Arg<'static> {
self.get().to_arg()
}
}
#[derive(Default)]
pub struct Process {
pub is_first_in_job: bool,
pub is_last_in_job: bool,
pub typ: ProcessType,
pub variable_assignments: Vec<ConcreteAssignment>,
pub actual_cmd: WString,
pub gens: GenerationsList,
pub pid: OnceLock<Pid>,
pub internal_proc: RefCell<Option<Arc<InternalProc>>>,
pub pipe_write_fd: RawFd,
pub completed: RelaxedAtomicBool,
pub stopped: RelaxedAtomicBool,
pub leads_pgrp: bool,
pub posted_proc_exit: RelaxedAtomicBool,
pub status: Cell<ProcStatus>,
pub last_times: Cell<ProcTimes>,
argv: Vec<WString>,
proc_redirection_specs: RedirectionSpecList,
wait_handle: RefCell<Option<WaitHandleRef>>,
}
#[derive(Default, Clone, Copy)]
pub struct ProcTimes {
pub time: Timepoint,
pub jiffies: ClockTicks,
}
pub struct ConcreteAssignment {
pub variable_name: WString,
pub values: Vec<WString>,
}
impl ConcreteAssignment {
pub fn new(variable_name: WString, values: Vec<WString>) -> Self {
Self {
variable_name,
values,
}
}
}
impl Process {
pub fn new() -> Self {
Default::default()
}
#[inline(always)]
pub fn pid(&self) -> Option<Pid> {
self.pid.get().copied()
}
#[inline(always)]
pub fn has_pid(&self) -> bool {
self.pid().is_some()
}
pub fn set_pid(&self, pid: Pid) {
self.pid
.set(pid)
.expect("Process::set_pid() called more than once!");
}
pub fn set_argv(&mut self, argv: Vec<WString>) {
self.argv = argv;
}
pub fn argv(&self) -> &Vec<WString> {
&self.argv
}
pub fn argv0(&self) -> Option<&wstr> {
self.argv.first().map(|s| s.as_utfstr())
}
#[inline]
pub fn status(&self) -> ProcStatus {
self.status.get()
}
pub fn redirection_specs(&self) -> &RedirectionSpecList {
&self.proc_redirection_specs
}
pub fn redirection_specs_mut(&mut self) -> &mut RedirectionSpecList {
&mut self.proc_redirection_specs
}
pub fn set_redirection_specs(&mut self, specs: RedirectionSpecList) {
self.proc_redirection_specs = specs;
}
pub fn check_generations_before_launch(&self) {
self.gens
.update(&topic_monitor_principal().current_generations());
}
pub fn mark_aborted_before_launch(&self) {
self.completed.store(true);
if self.status().is_success() {
self.status
.set(ProcStatus::from_exit_code(libc::EXIT_FAILURE));
}
}
pub fn is_internal(&self) -> bool {
match self.typ {
ProcessType::Builtin | ProcessType::Function | ProcessType::BlockNode(_) => true,
ProcessType::External | ProcessType::Exec => false,
}
}
pub fn is_builtin(&self) -> bool {
matches!(self.typ, ProcessType::Builtin)
}
pub fn is_function(&self) -> bool {
matches!(self.typ, ProcessType::Function)
}
pub fn is_block_node(&self) -> bool {
matches!(self.typ, ProcessType::BlockNode(_))
}
pub fn is_external(&self) -> bool {
matches!(self.typ, ProcessType::External)
}
pub fn is_exec(&self) -> bool {
matches!(self.typ, ProcessType::Exec)
}
pub fn get_wait_handle(&self) -> Option<WaitHandleRef> {
self.wait_handle.borrow().clone()
}
pub fn is_stopped(&self) -> bool {
self.stopped.load()
}
pub fn is_completed(&self) -> bool {
self.completed.load()
}
pub fn make_wait_handle(&self, jid: InternalJobId) -> Option<WaitHandleRef> {
let pid = self.pid()?;
if self.wait_handle.borrow().is_none() {
self.wait_handle.replace(Some(WaitHandle::new(
pid,
jid,
wbasename(&self.actual_cmd.clone()).to_owned(),
)));
}
self.get_wait_handle()
}
}
#[derive(Default, Clone, Copy)]
pub struct JobProperties {
pub skip_notification: bool,
pub initial_background: bool,
pub from_event_handler: bool,
}
#[derive(Default)]
pub struct JobFlags {
pub constructed: bool,
pub notified_of_stop: bool,
pub negate: bool,
pub disown_requested: bool,
pub is_group_root: bool,
}
#[derive(Default)]
pub struct Job {
properties: JobProperties,
command_str: WString,
pub processes: Box<[Process]>,
pub group: Option<JobGroupRef>,
pub internal_job_id: InternalJobId,
pub job_flags: RefCell<JobFlags>,
}
impl Job {
pub fn new(properties: JobProperties, command_str: WString) -> Self {
static NEXT_INTERNAL_JOB_ID: AtomicU64 = AtomicU64::new(0);
Job {
properties,
command_str,
internal_job_id: NEXT_INTERNAL_JOB_ID.fetch_add(1, Ordering::Relaxed),
..Default::default()
}
}
pub fn group(&self) -> &JobGroup {
self.group.as_ref().unwrap()
}
pub fn command(&self) -> &wstr {
&self.command_str
}
pub fn processes(&self) -> &[Process] {
&self.processes
}
pub fn processes_mut(&mut self) -> &mut Box<[Process]> {
&mut self.processes
}
#[inline(always)]
pub fn external_procs(&self) -> impl Iterator<Item = &Process> {
self.processes.iter().filter(|p| p.pid().is_some())
}
pub fn can_reap(&self, p: &Process) -> bool {
!(
p.is_completed() ||
(!self.is_constructed() && self.get_pgid() == p.pid())
)
}
pub fn preview(&self) -> WString {
if self.processes().is_empty() {
return L!("").to_owned();
}
let procs = self.processes();
let result = procs.first().unwrap().argv0().unwrap_or(L!("null"));
result.to_owned() + L!("...")
}
pub fn get_pgid(&self) -> Option<Pid> {
self.group().get_pgid()
}
pub fn get_last_pid(&self) -> Option<Pid> {
self.external_procs().last().and_then(|proc| proc.pid())
}
pub fn job_id(&self) -> MaybeJobId {
self.group().job_id
}
pub fn flags(&self) -> Ref<'_, JobFlags> {
self.job_flags.borrow()
}
pub fn mut_flags(&self) -> RefMut<'_, JobFlags> {
self.job_flags.borrow_mut()
}
pub fn wants_job_control(&self) -> bool {
self.group().wants_job_control()
}
pub fn entitled_to_terminal(&self) -> bool {
self.group().is_foreground() && self.processes().iter().any(|p| !p.is_internal())
}
pub fn is_initially_background(&self) -> bool {
self.properties.initial_background
}
pub fn mark_constructed(&self) {
assert!(!self.is_constructed(), "Job was already constructed");
self.mut_flags().constructed = true;
}
pub fn has_external_proc(&self) -> bool {
self.processes().iter().any(|p| !p.is_internal())
}
pub fn wants_job_id(&self) -> bool {
self.processes().len() > 1
|| !self.processes()[0].is_internal()
|| self.is_initially_background()
}
pub fn is_constructed(&self) -> bool {
self.flags().constructed
}
pub fn is_completed(&self) -> bool {
assert!(!self.processes().is_empty());
self.processes().iter().all(|p| p.is_completed())
}
pub fn is_stopped(&self) -> bool {
let mut has_stopped = false;
for p in self.processes().iter() {
if !p.is_completed() && !p.is_stopped() {
return false;
}
has_stopped |= p.is_stopped();
}
has_stopped
}
pub fn is_visible(&self) -> bool {
!self.is_completed() && self.is_constructed() && !self.flags().disown_requested
}
pub fn skip_notification(&self) -> bool {
self.properties.skip_notification
}
#[allow(clippy::wrong_self_convention)]
pub fn from_event_handler(&self) -> bool {
self.properties.from_event_handler
}
pub fn is_foreground(&self) -> bool {
self.group().is_foreground()
}
pub fn posts_job_exit_events(&self) -> bool {
if !self.flags().is_group_root {
return false;
}
self.has_external_proc()
}
pub fn continue_job(&self, parser: &Parser, block_io: Option<&IoChain>) {
flogf!(
proc_job_run,
"Run job %d (%s), %s, %s",
self.job_id(),
self.command(),
if self.is_completed() {
"COMPLETED"
} else {
"UNCOMPLETED"
},
if parser.scope().is_interactive {
"INTERACTIVE"
} else {
"NON-INTERACTIVE"
}
);
while !fish_is_unwinding_for_exit() && !self.is_stopped() && !self.is_completed() {
process_mark_finished_children(parser, true, block_io);
}
if self.is_completed() {
let procs = self.processes();
let p = procs.last().unwrap();
if p.status().normal_exited() || p.status().signal_exited() {
if let Some(statuses) = self.get_statuses() {
parser.set_last_statuses(statuses);
parser.libdata_mut().status_count += 1;
}
}
}
}
pub fn resume(&self) -> bool {
self.mut_flags().notified_of_stop = false;
if !self.signal(NixSignal::SIGCONT) {
flogf!(
proc_pgroup,
"Failed to send SIGCONT to procs in job %s",
self.command()
);
return false;
}
for p in self.processes.iter() {
p.stopped.store(false);
}
true
}
pub fn signal(&self, signal: NixSignal) -> bool {
if let Some(pgid) = self.group().get_pgid() {
if let Err(err) = killpg(pgid.as_nix_pid(), signal) {
perror_nix(&format!("killpg({pgid}, {})", signal.as_str()), err);
return false;
}
} else {
for p in self.external_procs() {
if !p.is_completed() && kill(p.pid().unwrap().as_nix_pid(), signal).is_err() {
return false;
}
}
}
true
}
pub fn get_statuses(&self) -> Option<Statuses> {
let mut st = Statuses::default();
let mut has_status = false;
let mut laststatus = 0;
st.pipestatus.resize(self.processes().len(), 0);
for (i, p) in self.processes().iter().enumerate() {
let status = p.status();
if status.is_empty() {
st.pipestatus[i] = laststatus;
continue;
}
if status.signal_exited() {
st.kill_signal = Some(Signal::new(status.signal_code()));
}
laststatus = status.status_value();
has_status = true;
st.pipestatus[i] = status.status_value();
}
if !has_status {
return None;
}
st.status = if self.flags().negate {
if laststatus == 0 {
1
} else {
0
}
} else {
laststatus
};
Some(st)
}
}
pub type JobRef = Rc<Job>;
pub fn is_interactive_session() -> bool {
IS_INTERACTIVE_SESSION.load()
}
pub fn set_interactive_session(flag: bool) {
IS_INTERACTIVE_SESSION.store(flag);
}
static IS_INTERACTIVE_SESSION: RelaxedAtomicBool = RelaxedAtomicBool::new(false);
pub fn get_login() -> bool {
IS_LOGIN.load()
}
pub fn mark_login() {
IS_LOGIN.store(true);
}
static IS_LOGIN: RelaxedAtomicBool = RelaxedAtomicBool::new(false);
pub fn no_exec() -> bool {
IS_NO_EXEC.load()
}
pub fn mark_no_exec() {
IS_NO_EXEC.store(true);
}
static IS_NO_EXEC: RelaxedAtomicBool = RelaxedAtomicBool::new(false);
pub type JobList = Vec<JobRef>;
pub fn get_job_control_mode() -> JobControl {
unsafe { std::mem::transmute(JOB_CONTROL_MODE.load(Ordering::Relaxed)) }
}
pub fn set_job_control_mode(mode: JobControl) {
JOB_CONTROL_MODE.store(mode as u8, Ordering::Relaxed);
if mode == JobControl::All {
unsafe {
libc::signal(SIGTTOU, SIG_IGN);
}
}
}
static JOB_CONTROL_MODE: AtomicU8 = AtomicU8::new(JobControl::Interactive as u8);
pub fn job_reap(parser: &Parser, interactive: bool, block_io: Option<&IoChain>) -> bool {
if parser.jobs().is_empty() {
return false;
}
process_mark_finished_children(parser, false, block_io);
process_clean_after_marking(parser, interactive)
}
pub fn jobs_requiring_warning_on_exit(parser: &Parser) -> JobList {
let mut result = vec![];
for job in parser.jobs().iter() {
if !job.is_foreground() && job.is_constructed() && !job.is_completed() {
result.push(job.clone());
}
}
result
}
pub fn print_exit_warning_for_jobs(jobs: &JobList) {
printf!("%s\n", wgettext!("There are still jobs active:"));
printf!("\n PID %s\n", wgettext!("Command"));
for j in jobs {
printf!(
"%6d %s\n",
j.external_procs().next().and_then(|p| p.pid()).unwrap(),
j.command()
);
}
printf!(
"\n%s\n",
wgettext!("A second attempt to exit will terminate them."),
);
printf!(
"%s\n",
wgettext!("Use 'disown PID' to remove jobs from the list without terminating them."),
);
reader_schedule_prompt_repaint();
}
pub fn proc_get_jiffies(inpid: Pid) -> ClockTicks {
if !*HAVE_PROC_STAT {
return 0;
}
let filename = format!("/proc/{}/stat", inpid);
let Ok(mut f) = fs::File::open(filename) else {
return 0;
};
let mut buf = vec![];
if f.read_to_end(&mut buf).is_err() {
return 0;
}
let mut timesstrs = buf.split(|c| *c == b' ').skip(13);
let mut sum = 0;
for _ in 0..4 {
let Some(timestr) = timesstrs.next() else {
return 0;
};
let Ok(timestr) = std::str::from_utf8(timestr) else {
return 0;
};
let Ok(time) = str::parse::<u64>(timestr) else {
return 0;
};
sum += time;
}
sum
}
pub fn proc_update_jiffies(parser: &Parser) {
for job in parser.jobs().iter() {
for p in job.external_procs() {
p.last_times.replace(ProcTimes {
time: timef(),
jiffies: proc_get_jiffies(p.pid().unwrap()),
});
}
}
}
pub fn proc_init() {
signal_set_handlers_once(false);
}
fn handle_child_status(job: &Job, proc: &Process, status: ProcStatus) {
proc.status.set(status);
if status.stopped() {
proc.stopped.store(true);
} else if status.continued() {
proc.stopped.store(false);
} else {
proc.completed.store(true);
}
if status.signal_exited() {
let sig = status.signal_code();
if [SIGINT, SIGQUIT].contains(&sig) {
if is_interactive_session() {
job.group().cancel_with_signal(Signal::new(sig));
} else if !event::is_signal_observed(sig) {
let act =
SigAction::new(SigHandler::SigDfl, SaFlags::empty(), SigSet::empty()).into();
unsafe {
libc::sigaction(sig, &act, std::ptr::null_mut());
libc::kill(libc::getpid(), sig);
}
}
}
}
}
pub fn proc_wait_any(parser: &Parser) {
process_mark_finished_children(parser, true, None);
let is_interactive = parser.scope().is_interactive;
process_clean_after_marking(parser, is_interactive);
}
pub fn hup_jobs(jobs: &JobList) {
let fish_pgrp = getpgrp();
let mut kill_list = Vec::new();
for j in jobs {
let Some(pgid) = j.get_pgid() else { continue };
if pgid.as_nix_pid() != fish_pgrp && !j.is_completed() {
j.signal(NixSignal::SIGHUP);
if j.is_stopped() {
j.signal(NixSignal::SIGCONT);
}
if is_windows_subsystem_for_linux(WSL::V1) {
kill_list.push(j);
}
}
}
if !kill_list.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(50));
for j in kill_list.drain(..) {
j.signal(NixSignal::SIGKILL);
}
}
}
pub fn add_disowned_job(j: &Job) {
let mut disowned_pids = DISOWNED_PIDS.lock().unwrap();
for process in j.external_procs() {
disowned_pids.push(process.pid().unwrap());
}
}
fn reap_disowned_pids() {
let mut disowned_pids = DISOWNED_PIDS.lock().unwrap();
disowned_pids.retain(
|pid| match waitpid(pid.as_nix_pid(), Some(WaitPidFlag::WNOHANG)) {
Ok(wait_status) => match wait_status {
WaitStatus::Exited(_, _) | WaitStatus::Signaled(_, _, _) => {
flogf!(proc_reap_external, "Reaped disowned PID or PGID %d", pid);
false
}
_ => true,
},
Err(_) => false,
},
);
}
static DISOWNED_PIDS: Mutex<Vec<Pid>> = Mutex::new(Vec::new());
fn process_mark_finished_children(parser: &Parser, block_ok: bool, block_io: Option<&IoChain>) {
let mut reapgens = GenerationsList::invalid();
for j in parser.jobs().iter() {
for proc in j.processes().iter() {
if !j.can_reap(proc) {
continue;
}
if proc.has_pid() {
reapgens.set_min_from(Topic::SigChld, &proc.gens);
reapgens.set_min_from(Topic::SigHupIntTerm, &proc.gens);
}
if proc.internal_proc.borrow().is_some() {
reapgens.set_min_from(Topic::InternalExit, &proc.gens);
reapgens.set_min_from(Topic::SigHupIntTerm, &proc.gens);
}
}
}
if !topic_monitor_principal().check(&reapgens, block_ok) {
return;
}
for j in parser.jobs().iter() {
for proc in j.external_procs() {
if !j.can_reap(proc) {
continue;
}
proc.gens.sighupintterm.set(reapgens.sighupintterm.get());
if proc.gens.sigchld == reapgens.sigchld {
continue;
}
proc.gens.sigchld.set(reapgens.sigchld.get());
let mut statusv: libc::c_int = -1;
let pid = unsafe {
libc::waitpid(
proc.pid().unwrap().as_pid_t(),
&mut statusv,
WNOHANG | WUNTRACED | WCONTINUED,
)
};
if pid == 0 {
continue;
}
let pid = Pid::new(pid);
assert_eq!(pid, proc.pid().unwrap(), "Unexpected waitpid() return");
let status = ProcStatus::from_waitpid(statusv);
handle_child_status(j, proc, status);
if status.stopped() {
j.group().set_is_foreground(false);
}
if status.continued() {
j.mut_flags().notified_of_stop = false;
}
if status.normal_exited() || status.signal_exited() {
flogf!(
proc_reap_external,
"Reaped external process '%s' (pid %d, status %d)",
proc.argv0().unwrap(),
pid,
proc.status().status_value()
);
block_io.map(bufferfill_read_finished_process_output);
} else {
assert!(status.stopped() || status.continued());
flogf!(
proc_reap_external,
"External process '%s' (pid %d, %s)",
proc.argv0().unwrap(),
proc.pid().unwrap(),
if proc.status().stopped() {
"stopped"
} else {
"continued"
}
);
}
}
}
for j in parser.jobs().iter() {
for proc in j.processes.iter() {
if proc.internal_proc.borrow().is_none() || !j.can_reap(proc) {
continue;
}
proc.gens.sighupintterm.set(reapgens.sighupintterm.get());
if proc.gens.internal_exit == reapgens.internal_exit {
continue;
}
proc.gens.internal_exit.set(reapgens.internal_exit.get());
let borrow = proc.internal_proc.borrow();
let internal_proc = borrow.as_ref().unwrap();
if !internal_proc.exited() {
continue;
}
let status = internal_proc.get_status();
handle_child_status(j, proc, status);
flogf!(
proc_reap_internal,
"Reaped internal process '%s' (id %u, status %d)",
proc.argv0().unwrap(),
internal_proc.get_id(),
proc.status().status_value(),
);
}
}
reap_disowned_pids();
}
fn bufferfill_read_finished_process_output(block_io: &IoChain) {
let Some(stdout) = block_io.io_for_fd(STDOUT_FILENO) else {
return;
};
let Some(stdout) = stdout.as_bufferfill() else {
return;
};
stdout.read_all_available();
}
fn generate_process_exit_events(j: &Job, out_evts: &mut Vec<Event>) {
if !j.from_event_handler() || !j.is_foreground() {
for p in j.external_procs() {
if p.is_completed() && !p.posted_proc_exit.load() {
p.posted_proc_exit.store(true);
out_evts.push(Event::process_exit(
p.pid().unwrap(),
p.status().status_value(),
));
}
}
}
}
fn generate_job_exit_events(j: &Job, out_evts: &mut Vec<Event>) {
if !j.from_event_handler() || !j.is_foreground() {
if j.posts_job_exit_events() {
if let Some(last_pid) = j.get_last_pid() {
out_evts.push(Event::job_exit(last_pid, j.internal_job_id));
}
}
}
out_evts.push(Event::caller_exit(j.internal_job_id, j.job_id()));
}
fn proc_wants_summary(j: &Job, p: &Process) -> bool {
if !p.is_completed() || !p.has_pid() {
return false;
}
let s = p.status();
if !s.signal_exited() || s.signal_code() == SIGPIPE {
return false;
}
if j.skip_notification() && !CRASHSIGNALS.contains(&s.signal_code()) {
return false;
}
true
}
fn job_wants_summary(j: &Job) -> bool {
if j.skip_notification() {
return false;
}
if j.processes().len() == 1 && proc_wants_summary(j, &j.processes()[0]) {
return false;
}
if j.is_foreground() {
return false;
}
true
}
fn job_or_proc_wants_summary(j: &Job) -> bool {
job_wants_summary(j) || j.processes().iter().any(|p| proc_wants_summary(j, p))
}
fn call_job_summary(parser: &Parser, cmd: &wstr) {
let event = Event::generic(L!("fish_job_summary").to_owned());
let b = parser.push_block(Block::event_block(event));
let saved_status = parser.get_last_statuses();
parser.eval(cmd, &IoChain::new());
parser.set_last_statuses(saved_status);
parser.pop_block(b);
}
fn summary_command(j: &Job, p: Option<&Process>) -> WString {
let mut buffer = L!("fish_job_summary").to_owned();
buffer += &sprintf!(" %s", j.job_id().to_wstring())[..];
buffer += &sprintf!(" %d", if j.is_foreground() { 1 } else { 0 })[..];
buffer.push(' ');
buffer += &escape(j.command())[..];
match p {
None => {
buffer += if j.is_stopped() {
L!(" STOPPED")
} else {
L!(" ENDED")
};
}
Some(p) => {
let sig = Signal::new(p.status().signal_code());
buffer.push(' ');
buffer += &escape(sig.name())[..];
buffer.push(' ');
buffer += &escape(sig.desc())[..];
if j.external_procs().count() > 1 {
let pid = p.pid().map_or("-".to_owned(), |p| p.to_string());
buffer += &sprintf!(" %s", pid)[..];
buffer.push(' ');
buffer += &escape(p.argv0().unwrap())[..];
}
}
}
buffer
}
fn summarize_jobs(parser: &Parser, jobs: &[JobRef]) -> bool {
if jobs.is_empty() {
return false;
}
for j in jobs {
if j.is_stopped() {
call_job_summary(parser, &summary_command(j, None));
} else {
for p in j.processes().iter() {
if proc_wants_summary(j, p) {
call_job_summary(parser, &summary_command(j, Some(p)));
}
}
if job_wants_summary(j) {
call_job_summary(parser, &summary_command(j, None));
}
}
}
true
}
fn remove_disowned_jobs(jobs: &mut JobList) {
jobs.retain(|j| !j.flags().disown_requested || !j.is_constructed());
}
fn save_wait_handle_for_completed_job(job: &Job, store: &mut WaitHandleStore) {
assert!(job.is_completed(), "Job not completed");
if !job.is_foreground() {
for proc in job.processes().iter() {
if let Some(wh) = proc.make_wait_handle(job.internal_job_id) {
store.add(wh);
}
}
}
for proc in job.processes().iter() {
if let Some(wh) = proc.get_wait_handle() {
wh.set_status_and_complete(proc.status().status_value());
}
}
}
fn process_clean_after_marking(parser: &Parser, interactive: bool) -> bool {
if parser.scope().is_cleaning_procs {
return false;
}
let _cleaning = parser.push_scope(|s| s.is_cleaning_procs = true);
remove_disowned_jobs(&mut parser.jobs_mut());
let mut exit_events = vec![];
let should_process_job = |j: &Job| {
j.is_constructed() && (interactive || !job_or_proc_wants_summary(j))
};
let mut jobs_to_summarize = vec![];
for j in parser.jobs().iter() {
if j.is_stopped()
&& !j.flags().notified_of_stop
&& should_process_job(j)
&& job_wants_summary(j)
{
j.mut_flags().notified_of_stop = true;
jobs_to_summarize.push(j.clone());
}
}
for j in parser.jobs().iter() {
generate_process_exit_events(j, &mut exit_events);
}
let mut completed_jobs = vec![];
parser.jobs_mut().retain(|j| {
if !should_process_job(j) || !j.is_completed() {
return true;
}
if job_or_proc_wants_summary(j) {
jobs_to_summarize.push(j.clone());
}
generate_job_exit_events(j, &mut exit_events);
completed_jobs.push(j.clone());
false
});
for j in completed_jobs {
save_wait_handle_for_completed_job(&j, &mut parser.mut_wait_handles());
}
let printed = summarize_jobs(parser, &jobs_to_summarize);
for evt in exit_events {
event::fire(parser, evt);
}
if printed {
let _ = std::io::stdout().flush();
}
printed
}
pub static HAVE_PROC_STAT: LazyLock<bool> =
LazyLock::new(|| fs::metadata("/proc/self/stat").is_ok());
const CRASHSIGNALS: [libc::c_int; 6] = [SIGABRT, SIGBUS, SIGFPE, SIGILL, SIGSEGV, SIGSYS];