use std::collections::HashSet;
use std::path::{Path, PathBuf};
use super::Repository;
#[cfg(doc)]
use super::WorkingTree;
const IPC_SOCKET_NAME: &str = "fsmonitor--daemon.ipc";
#[cfg(unix)]
pub(crate) const REAP_KILL_DEADLINE: std::time::Duration = std::time::Duration::from_millis(1500);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DaemonProcess {
pub pid: u32,
pub socket: Option<PathBuf>,
}
pub trait ProcessSignaller {
fn term(&self, pid: u32);
fn kill(&self, pid: u32);
fn is_alive(&self, pid: u32) -> bool;
}
pub fn parse_lsof_socket_path(lsof_stdout: &str) -> Option<PathBuf> {
for line in lsof_stdout.lines() {
let Some(name) = line.strip_prefix('n') else {
continue;
};
if name.starts_with("->") {
continue;
}
if name == IPC_SOCKET_NAME {
return None;
}
if Path::new(name).file_name().and_then(|f| f.to_str()) == Some(IPC_SOCKET_NAME) {
return Some(PathBuf::from(name));
}
}
None
}
#[cfg(unix)]
fn canonicalize_socket(socket: &Path) -> PathBuf {
match (
socket.parent().and_then(|p| dunce::canonicalize(p).ok()),
socket.file_name(),
) {
(Some(real_parent), Some(name)) => real_parent.join(name),
_ => socket.to_path_buf(),
}
}
pub fn socket_path_to_git_dir(socket: &Path) -> Option<PathBuf> {
socket.parent().map(Path::to_path_buf)
}
pub fn classify_orphans(
daemons: &[DaemonProcess],
live_git_dirs: Option<&HashSet<PathBuf>>,
repo_common_dir: &Path,
) -> Vec<u32> {
daemons
.iter()
.filter(|d| match &d.socket {
None => true,
Some(socket) => match (live_git_dirs, socket_path_to_git_dir(socket)) {
(Some(live), Some(git_dir)) => {
!live.contains(&git_dir) && git_dir.starts_with(repo_common_dir)
}
_ => false,
},
})
.map(|d| d.pid)
.collect()
}
#[cfg(unix)]
pub(crate) fn escalate_terminate<S: ProcessSignaller>(
signaller: &S,
pids: &[u32],
deadline: std::time::Duration,
) -> usize {
if pids.is_empty() {
return 0;
}
for &pid in pids {
signaller.term(pid);
}
let poll = std::time::Duration::from_millis(50);
let start = std::time::Instant::now();
loop {
if pids.iter().all(|&pid| !signaller.is_alive(pid)) {
return pids.len();
}
if start.elapsed() >= deadline {
break;
}
std::thread::sleep(poll);
}
let mut gone = 0;
for &pid in pids {
if signaller.is_alive(pid) {
signaller.kill(pid);
}
if !signaller.is_alive(pid) {
gone += 1;
}
}
gone
}
#[cfg(unix)]
pub(crate) struct NixSignaller;
#[cfg(unix)]
impl ProcessSignaller for NixSignaller {
fn term(&self, pid: u32) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
nix::sys::signal::Signal::SIGTERM,
);
}
fn kill(&self, pid: u32) {
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(pid as i32),
nix::sys::signal::Signal::SIGKILL,
);
}
fn is_alive(&self, pid: u32) -> bool {
match nix::sys::signal::kill(nix::unistd::Pid::from_raw(pid as i32), None) {
Ok(()) => true,
Err(nix::errno::Errno::EPERM) => true,
Err(_) => false,
}
}
}
#[cfg(unix)]
fn enumerate_daemons() -> Vec<DaemonProcess> {
use crate::shell_exec::Cmd;
let timeout = std::time::Duration::from_secs(5);
let Ok(output) = Cmd::new("pgrep")
.args(["-f", "git fsmonitor--daemon"])
.timeout(timeout)
.run()
else {
return Vec::new();
};
if !output.status.success() {
return Vec::new();
}
let pids: Vec<u32> = String::from_utf8_lossy(&output.stdout)
.lines()
.filter_map(|l| l.trim().parse::<u32>().ok())
.collect();
pids.into_iter()
.filter_map(|pid| {
let out = Cmd::new("lsof")
.args(["-a", "-p", &pid.to_string(), "-U", "-F", "n"])
.timeout(timeout)
.run()
.ok()?;
if !out.status.success() {
return None;
}
daemon_from_lsof_stdout(pid, &String::from_utf8_lossy(&out.stdout))
})
.collect()
}
#[cfg(unix)]
fn daemon_from_lsof_stdout(pid: u32, stdout: &str) -> Option<DaemonProcess> {
if !stdout.contains(IPC_SOCKET_NAME) {
return None;
}
let socket = parse_lsof_socket_path(stdout).map(|s| canonicalize_socket(&s));
Some(DaemonProcess { pid, socket })
}
#[cfg(unix)]
fn live_git_dirs(repo: &Repository) -> Option<HashSet<PathBuf>> {
let worktrees = repo.list_worktrees().ok()?;
Some(
worktrees
.iter()
.filter(|wt| !wt.is_prunable() && wt.path.exists())
.filter_map(|wt| repo.worktree_at(&wt.path).git_dir().ok())
.collect(),
)
}
pub fn reap_orphan_fsmonitor_daemons(repo: &Repository) {
#[cfg(unix)]
{
let daemons = enumerate_daemons();
if daemons.is_empty() {
return;
}
let live = live_git_dirs(repo);
let common_dir = dunce::canonicalize(repo.git_common_dir())
.unwrap_or_else(|_| repo.git_common_dir().to_path_buf());
let orphans = classify_orphans(&daemons, live.as_ref(), &common_dir);
if orphans.is_empty() {
return;
}
log::debug!(
"Reaping {} orphaned fsmonitor daemon(s): {:?}",
orphans.len(),
orphans
);
let gone = escalate_terminate(&NixSignaller, &orphans, REAP_KILL_DEADLINE);
log::debug!(
"Orphaned fsmonitor reap: {gone}/{} terminated",
orphans.len()
);
}
#[cfg(not(unix))]
{
let _ = repo;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(unix)]
use std::cell::RefCell;
#[cfg(unix)]
use std::collections::HashMap;
#[test]
fn parses_resolved_socket_path() {
let lsof = "p10033\nf21\nn->0x62fc003fda86ee70\nf24\nn/Users/me/repo/.git/worktrees/repo.feat/fsmonitor--daemon.ipc\n";
assert_eq!(
parse_lsof_socket_path(lsof),
Some(PathBuf::from(
"/Users/me/repo/.git/worktrees/repo.feat/fsmonitor--daemon.ipc"
))
);
}
#[test]
fn bare_socket_name_is_unresolvable() {
let lsof = "p10311\nf24\nnfsmonitor--daemon.ipc\n";
assert_eq!(parse_lsof_socket_path(lsof), None);
}
#[test]
fn no_fsmonitor_socket_yields_none() {
let lsof = "p999\nf3\nn->0xdead\nf4\nn->0xbeef\n";
assert_eq!(parse_lsof_socket_path(lsof), None);
}
#[cfg(unix)]
#[test]
fn canonicalize_socket_resolves_symlinked_git_dir_and_falls_back_when_gone() {
let tmp = tempfile::tempdir().unwrap();
let real = tmp.path().join("real-gitdir");
std::fs::create_dir(&real).unwrap();
let link = tmp.path().join("link-gitdir");
std::os::unix::fs::symlink(&real, &link).unwrap();
let via_link = link.join(IPC_SOCKET_NAME);
let resolved = canonicalize_socket(&via_link);
assert_eq!(
resolved,
dunce::canonicalize(&real).unwrap().join(IPC_SOCKET_NAME)
);
let gone = tmp.path().join("deleted-gitdir").join(IPC_SOCKET_NAME);
assert_eq!(canonicalize_socket(&gone), gone);
}
#[test]
fn git_dir_is_socket_parent() {
let socket = Path::new("/r/.git/worktrees/r.x/fsmonitor--daemon.ipc");
assert_eq!(
socket_path_to_git_dir(socket),
Some(PathBuf::from("/r/.git/worktrees/r.x"))
);
}
#[cfg(unix)]
#[test]
fn daemon_from_lsof_skips_pgrep_false_positive() {
let stdout = "p1234\nn/usr/lib/libc.so\nn/tmp/some-other.sock\nn/dev/null\n";
assert_eq!(daemon_from_lsof_stdout(1234, stdout), None);
}
#[cfg(unix)]
#[test]
fn daemon_from_lsof_keeps_resolved_socket() {
let stdout = "p1234\nn/r/.git/worktrees/r.x/fsmonitor--daemon.ipc\n";
let d = daemon_from_lsof_stdout(1234, stdout).expect("real daemon");
assert_eq!(d.pid, 1234);
assert!(d.socket.is_some());
}
#[cfg(unix)]
#[test]
fn daemon_from_lsof_keeps_bare_socket_name_as_class_one() {
let stdout = "p1234\nnfsmonitor--daemon.ipc\n";
let d = daemon_from_lsof_stdout(1234, stdout).expect("bare-name daemon");
assert_eq!(d.pid, 1234);
assert!(
d.socket.is_none(),
"bare name must remain unresolved (class 1)"
);
}
fn daemon(pid: u32, socket: Option<&str>) -> DaemonProcess {
DaemonProcess {
pid,
socket: socket.map(PathBuf::from),
}
}
#[test]
fn live_worktree_daemon_is_never_reaped() {
let common = PathBuf::from("/r/.git");
let live_git_dir = PathBuf::from("/r/.git/worktrees/r.live");
let live: HashSet<PathBuf> = [live_git_dir.clone()].into_iter().collect();
let daemons = vec![daemon(
1,
Some("/r/.git/worktrees/r.live/fsmonitor--daemon.ipc"),
)];
let orphans = classify_orphans(&daemons, Some(&live), &common);
assert!(
orphans.is_empty(),
"a daemon mapping to a live worktree must never be selected"
);
}
#[test]
fn unknowable_live_set_spares_resolved_socket_daemons() {
let common = PathBuf::from("/r/.git");
let daemons = vec![
daemon(10, None),
daemon(
20,
Some("/r/.git/worktrees/r.maybe-live/fsmonitor--daemon.ipc"),
),
];
let orphans = classify_orphans(&daemons, None, &common);
assert_eq!(
orphans,
vec![10],
"an unknowable live set must spare every resolved-socket daemon"
);
}
#[test]
fn classifies_each_orphan_class_and_spares_others() {
let common = PathBuf::from("/r/.git");
let live: HashSet<PathBuf> = [PathBuf::from("/r/.git/worktrees/r.live")]
.into_iter()
.collect();
let daemons = vec![
daemon(10, None),
daemon(20, Some("/r/.git/worktrees/r.gone/fsmonitor--daemon.ipc")),
daemon(30, Some("/r/.git/worktrees/r.live/fsmonitor--daemon.ipc")),
daemon(40, Some("/other/.git/worktrees/o.x/fsmonitor--daemon.ipc")),
];
let mut orphans = classify_orphans(&daemons, Some(&live), &common);
orphans.sort_unstable();
assert_eq!(orphans, vec![10, 20]);
}
#[cfg(unix)]
struct FakeSignaller {
alive: RefCell<HashMap<u32, bool>>,
survives_term: HashSet<u32>,
term_calls: RefCell<Vec<u32>>,
kill_calls: RefCell<Vec<u32>>,
}
#[cfg(unix)]
impl FakeSignaller {
fn new(pids: &[u32], survives_term: &[u32]) -> Self {
Self {
alive: RefCell::new(pids.iter().map(|&p| (p, true)).collect()),
survives_term: survives_term.iter().copied().collect(),
term_calls: RefCell::new(Vec::new()),
kill_calls: RefCell::new(Vec::new()),
}
}
}
#[cfg(unix)]
impl ProcessSignaller for FakeSignaller {
fn term(&self, pid: u32) {
self.term_calls.borrow_mut().push(pid);
if !self.survives_term.contains(&pid) {
self.alive.borrow_mut().insert(pid, false);
}
}
fn kill(&self, pid: u32) {
self.kill_calls.borrow_mut().push(pid);
self.alive.borrow_mut().insert(pid, false);
}
fn is_alive(&self, pid: u32) -> bool {
*self.alive.borrow().get(&pid).unwrap_or(&false)
}
}
#[cfg(unix)]
#[test]
fn sigterm_alone_terminates_responsive_daemons() {
let fake = FakeSignaller::new(&[1, 2], &[]);
let gone = escalate_terminate(&fake, &[1, 2], std::time::Duration::from_millis(200));
assert_eq!(gone, 2);
assert_eq!(*fake.term_calls.borrow(), vec![1, 2]);
assert!(
fake.kill_calls.borrow().is_empty(),
"responsive daemons must not be SIGKILL'd"
);
}
#[cfg(unix)]
#[test]
fn escalates_to_sigkill_after_bounded_wait() {
let fake = FakeSignaller::new(&[1, 2], &[2]);
let start = std::time::Instant::now();
let gone = escalate_terminate(&fake, &[1, 2], std::time::Duration::from_millis(150));
let elapsed = start.elapsed();
assert_eq!(gone, 2);
assert_eq!(*fake.kill_calls.borrow(), vec![2]);
assert!(
elapsed < std::time::Duration::from_secs(2),
"escalation must stay bounded, took {elapsed:?}"
);
}
#[cfg(unix)]
#[test]
fn empty_pid_list_is_a_noop() {
let fake = FakeSignaller::new(&[], &[]);
assert_eq!(
escalate_terminate(&fake, &[], std::time::Duration::from_millis(10)),
0
);
assert!(fake.term_calls.borrow().is_empty());
}
#[cfg(unix)]
#[test]
fn nix_signaller_terminates_responsive_child_with_sigterm() {
use std::process::Command;
let mut child = Command::new("sleep").arg("30").spawn().unwrap();
let pid = child.id();
escalate_terminate(&NixSignaller, &[pid], std::time::Duration::from_millis(500));
let status = child.wait().unwrap();
assert!(!status.success());
}
#[cfg(unix)]
#[test]
fn nix_signaller_escalates_to_sigkill_when_sigterm_ignored() {
use std::os::unix::process::CommandExt;
use std::process::Command;
use std::time::Duration;
let tmp = tempfile::tempdir().unwrap();
let ready = tmp.path().join("ready");
let mut child = Command::new("sh")
.arg("-c")
.arg(format!(
"trap '' TERM; : > {}; sleep 30",
ready.to_str().unwrap()
))
.process_group(0)
.spawn()
.unwrap();
let pid = child.id();
let wait_deadline = std::time::Instant::now() + Duration::from_secs(5);
while !ready.exists() {
assert!(
std::time::Instant::now() < wait_deadline,
"child never installed SIGTERM trap"
);
std::thread::sleep(Duration::from_millis(10));
}
escalate_terminate(&NixSignaller, &[pid], Duration::from_millis(200));
let _ = nix::sys::signal::kill(
nix::unistd::Pid::from_raw(-(pid as i32)),
nix::sys::signal::Signal::SIGKILL,
);
use std::os::unix::process::ExitStatusExt;
let status = child.wait().unwrap();
assert_eq!(
status.signal(),
Some(nix::sys::signal::Signal::SIGKILL as i32)
);
}
}