#![cfg(all(target_os = "linux", feature = "fuse"))]
use std::{
io,
os::{
fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd},
unix::{net::UnixStream, process::CommandExt},
},
path::{Path, PathBuf},
process::{Child, Command, ExitStatus, Stdio},
sync::{
Arc, Mutex,
atomic::{AtomicU8, Ordering},
},
thread::{self, JoinHandle},
time::{Duration, Instant},
};
use anyhow::{Context, Result, anyhow, bail};
use objects::sync::LockExt;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
pub const PANIC_ON_INIT_ENV: &str = "HEDDLE_FUSE_WORKER_PANIC_ON_INIT";
pub const STOP_GRACE_ENV: &str = "HEDDLE_FUSE_WORKER_STOP_GRACE_MS";
pub const WORKER_BINARY_ENV: &str = "HEDDLE_FUSE_WORKER_BIN";
pub const DEFAULT_STOP_GRACE: Duration = Duration::from_secs(2);
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind")]
pub enum SupervisorCommand {
Stop,
Status,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "kind")]
pub enum WorkerEvent {
MountReady { pid: u32, mount_path: PathBuf },
MountError { message: String },
StatusOk { pid: u32, mount_path: PathBuf },
Stopping,
}
pub mod framing {
use std::io::{self, Read, Write};
use serde::{Serialize, de::DeserializeOwned};
pub const MAX_FRAME_BYTES: usize = 64 * 1024;
pub fn write_frame<W: Write, T: Serialize>(w: &mut W, msg: &T) -> io::Result<()> {
let bytes = serde_json::to_vec(msg)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
if bytes.len() > MAX_FRAME_BYTES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"frame body {} exceeds MAX_FRAME_BYTES {}",
bytes.len(),
MAX_FRAME_BYTES
),
));
}
let len = bytes.len() as u32;
w.write_all(&len.to_le_bytes())?;
w.write_all(&bytes)?;
w.flush()
}
pub fn read_frame<R: Read, T: DeserializeOwned>(r: &mut R) -> io::Result<T> {
let mut len_buf = [0u8; 4];
r.read_exact(&mut len_buf)?;
let len = u32::from_le_bytes(len_buf) as usize;
if len > MAX_FRAME_BYTES {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("frame body claims {len} bytes, exceeds MAX_FRAME_BYTES {MAX_FRAME_BYTES}"),
));
}
let mut buf = vec![0u8; len];
r.read_exact(&mut buf)?;
serde_json::from_slice(&buf)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))
}
}
#[derive(Debug, Clone)]
pub struct WorkerArgs {
pub repo_root: PathBuf,
pub thread_id: String,
pub mountpoint: PathBuf,
pub ipc_fd: RawFd,
}
impl WorkerArgs {
pub fn parse<S: AsRef<str>>(args: &[S]) -> Result<Self> {
let mut repo_root: Option<PathBuf> = None;
let mut thread_id: Option<String> = None;
let mut mountpoint: Option<PathBuf> = None;
let mut ipc_fd: Option<RawFd> = None;
let mut i = 0;
while i < args.len() {
let a = args[i].as_ref();
let value = || -> Result<&str> {
args.get(i + 1)
.map(|s| s.as_ref())
.ok_or_else(|| anyhow!("expected value after {a}"))
};
match a {
"--repo-root" => {
repo_root = Some(PathBuf::from(value()?));
i += 2;
}
"--thread-id" => {
thread_id = Some(value()?.to_string());
i += 2;
}
"--mountpoint" => {
mountpoint = Some(PathBuf::from(value()?));
i += 2;
}
"--ipc-fd" => {
let raw = value()?.to_string();
ipc_fd = Some(
raw.parse::<RawFd>()
.with_context(|| format!("parse --ipc-fd value '{raw}'"))?,
);
i += 2;
}
other => bail!("unrecognised argument: {other}"),
}
}
Ok(WorkerArgs {
repo_root: repo_root.ok_or_else(|| anyhow!("--repo-root is required"))?,
thread_id: thread_id.ok_or_else(|| anyhow!("--thread-id is required"))?,
mountpoint: mountpoint.ok_or_else(|| anyhow!("--mountpoint is required"))?,
ipc_fd: ipc_fd.ok_or_else(|| anyhow!("--ipc-fd is required"))?,
})
}
}
pub fn run_worker(args: WorkerArgs) -> Result<()> {
use repo::Repository;
use crate::FuseShell;
let owned: OwnedFd = unsafe { OwnedFd::from_raw_fd(args.ipc_fd) };
let mut ipc: UnixStream = owned.into();
set_cloexec(ipc.as_raw_fd())?;
debug!(
repo_root = %args.repo_root.display(),
thread_id = %args.thread_id,
mountpoint = %args.mountpoint.display(),
"heddle-fuse-worker starting"
);
let mount_result = (|| -> Result<crate::FuseShell> {
let repo = Repository::open(&args.repo_root)
.with_context(|| format!("open repo at {}", args.repo_root.display()))?;
let mount = crate::ContentAddressedMount::new(repo, &args.thread_id)
.map_err(|e| anyhow!("open content-addressed mount for {}: {e}", args.thread_id))?;
Ok(FuseShell::new(mount))
})();
let shell = match mount_result {
Ok(s) => s,
Err(err) => {
let msg = format!("{err:#}");
let _ = framing::write_frame(
&mut ipc,
&WorkerEvent::MountError {
message: msg.clone(),
},
);
bail!("{msg}");
}
};
if std::env::var(PANIC_ON_INIT_ENV).is_ok() {
panic!("heddle-fuse-worker: panic injected by {PANIC_ON_INIT_ENV}");
}
let session = match shell.mount_background(&args.mountpoint) {
Ok(s) => s,
Err(err) => {
let msg = format!("mount_background failed: {err}");
let _ = framing::write_frame(
&mut ipc,
&WorkerEvent::MountError {
message: msg.clone(),
},
);
bail!("{msg}");
}
};
framing::write_frame(
&mut ipc,
&WorkerEvent::MountReady {
pid: std::process::id(),
mount_path: args.mountpoint.clone(),
},
)
.context("signal MountReady to supervisor")?;
debug!(mountpoint = %args.mountpoint.display(), "FUSE worker ready");
loop {
match framing::read_frame::<_, SupervisorCommand>(&mut ipc) {
Ok(SupervisorCommand::Stop) => {
debug!("received Stop; unmounting");
let _ = framing::write_frame(&mut ipc, &WorkerEvent::Stopping);
break;
}
Ok(SupervisorCommand::Status) => {
framing::write_frame(
&mut ipc,
&WorkerEvent::StatusOk {
pid: std::process::id(),
mount_path: args.mountpoint.clone(),
},
)
.context("reply to Status")?;
}
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
warn!("IPC socket EOF; supervisor gone, unmounting");
break;
}
Err(err) => {
warn!(%err, "IPC read error; unmounting");
break;
}
}
}
drop(session);
Ok(())
}
fn set_cloexec(fd: RawFd) -> Result<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
if flags < 0 {
return Err(io::Error::last_os_error()).context("fcntl(F_GETFD) on ipc fd");
}
let rc = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
if rc < 0 {
return Err(io::Error::last_os_error()).context("fcntl(F_SETFD, FD_CLOEXEC) on ipc fd");
}
Ok(())
}
fn clear_cloexec(fd: RawFd) -> Result<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
if flags < 0 {
return Err(io::Error::last_os_error()).context("fcntl(F_GETFD)");
}
let rc = unsafe { libc::fcntl(fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC) };
if rc < 0 {
return Err(io::Error::last_os_error()).context("fcntl(F_SETFD, !FD_CLOEXEC)");
}
Ok(())
}
#[repr(u8)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Liveness {
Running = 0,
Exited = 1,
}
pub struct Supervisor {
ipc: Mutex<Option<UnixStream>>,
pid: u32,
mountpoint: PathBuf,
stop_grace: Duration,
liveness: Arc<AtomicU8>,
watcher: Mutex<Option<JoinHandle<()>>>,
}
impl Supervisor {
pub fn spawn(
worker_binary: &Path,
repo_root: &Path,
thread_id: &str,
mountpoint: &Path,
) -> Result<Self> {
let stop_grace = stop_grace_from_env().unwrap_or(DEFAULT_STOP_GRACE);
let (parent_end, child_end) = UnixStream::pair().context("create supervisor socketpair")?;
clear_cloexec(child_end.as_raw_fd())?;
let child_owned: OwnedFd = unsafe { OwnedFd::from_raw_fd(child_end.into_raw_fd()) };
let child_raw: RawFd = child_owned.as_raw_fd();
let mut command = Command::new(worker_binary);
command
.arg("--repo-root")
.arg(repo_root)
.arg("--thread-id")
.arg(thread_id)
.arg("--mountpoint")
.arg(mountpoint)
.arg("--ipc-fd")
.arg(child_raw.to_string())
.stdin(Stdio::null());
unsafe {
command.pre_exec(move || {
let cur = libc::fcntl(child_raw, libc::F_GETFD);
if cur < 0 {
return Err(io::Error::last_os_error());
}
if libc::fcntl(child_raw, libc::F_SETFD, cur & !libc::FD_CLOEXEC) < 0 {
return Err(io::Error::last_os_error());
}
Ok(())
});
}
let child = command
.spawn()
.with_context(|| format!("spawn {}", worker_binary.display()))?;
drop(child_owned);
let pid = child.id();
let mut ipc = parent_end;
let event: WorkerEvent = match framing::read_frame(&mut ipc) {
Ok(ev) => ev,
Err(err) => {
let mut child = child;
let _ = child.wait();
return Err(anyhow!(
"read worker handshake (worker may have crashed before MountReady): {err}"
));
}
};
match event {
WorkerEvent::MountReady { .. } => {}
WorkerEvent::MountError { message } => {
let mut child = child;
let _ = child.wait();
bail!("worker reported mount error: {message}");
}
other => {
let mut child = child;
let _ = child.wait();
bail!("worker sent unexpected handshake frame: {other:?}");
}
}
let liveness = Arc::new(AtomicU8::new(Liveness::Running as u8));
let watcher_liveness = Arc::clone(&liveness);
let watcher_mount = mountpoint.to_path_buf();
let child_slot: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(Some(child)));
let watcher_child_slot = Arc::clone(&child_slot);
let spawn_result = thread::Builder::new()
.name(format!("fuse-worker-watcher:{thread_id}"))
.spawn(move || {
let c = watcher_child_slot
.lock()
.expect("watcher child slot lock")
.take()
.expect("watcher closure ran without a child in the slot");
watch_child(c, watcher_liveness, watcher_mount);
});
let watcher =
supervise_watcher_spawn(child_slot, spawn_result).context("spawn watcher thread")?;
Ok(Supervisor {
ipc: Mutex::new(Some(ipc)),
pid,
mountpoint: mountpoint.to_path_buf(),
stop_grace,
liveness,
watcher: Mutex::new(Some(watcher)),
})
}
pub fn unmount(&self) -> Result<()> {
let ipc_opt = self.ipc.lock_or_poisoned().take();
let Some(mut ipc) = ipc_opt else {
self.join_watcher();
return Ok(());
};
let _ = framing::write_frame(&mut ipc, &SupervisorCommand::Stop);
if !self.wait_for_exit(self.stop_grace) {
warn!(
pid = self.pid,
grace_ms = self.stop_grace.as_millis() as u64,
"FUSE worker did not exit on Stop; escalating to SIGTERM"
);
send_signal(self.pid as i32, libc::SIGTERM);
if !self.wait_for_exit(self.stop_grace) {
warn!(
pid = self.pid,
"FUSE worker did not exit on SIGTERM; escalating to SIGKILL"
);
send_signal(self.pid as i32, libc::SIGKILL);
self.wait_for_exit(self.stop_grace);
}
}
drop(ipc);
self.join_watcher();
Ok(())
}
fn join_watcher(&self) {
if let Some(handle) = self.watcher.lock_or_poisoned().take() {
let _ = handle.join();
}
}
pub fn mountpoint(&self) -> &Path {
&self.mountpoint
}
pub fn pid(&self) -> u32 {
self.pid
}
pub fn status(&self) -> Result<(u32, PathBuf)> {
let mut guard = self.ipc.lock_or_poisoned();
let Some(ipc) = guard.as_mut() else {
bail!("supervisor already shut down");
};
framing::write_frame(ipc, &SupervisorCommand::Status).context("send Status command")?;
let event: WorkerEvent = framing::read_frame(ipc).context("read Status reply")?;
match event {
WorkerEvent::StatusOk { pid, mount_path } => Ok((pid, mount_path)),
other => bail!("expected StatusOk, got {other:?}"),
}
}
pub fn is_alive(&self) -> bool {
self.liveness.load(Ordering::SeqCst) == Liveness::Running as u8
}
pub fn wait_for_exit(&self, dur: Duration) -> bool {
let deadline = Instant::now() + dur;
while self.is_alive() {
if Instant::now() >= deadline {
return !self.is_alive();
}
thread::sleep(Duration::from_millis(5));
}
true
}
}
impl Drop for Supervisor {
fn drop(&mut self) {
let _ = self.unmount();
}
}
fn watch_child(mut child: Child, liveness: Arc<AtomicU8>, mountpoint: PathBuf) {
let result = child.wait();
liveness.store(Liveness::Exited as u8, Ordering::SeqCst);
match result {
Ok(s) if s.success() => {
debug!(mountpoint = %mountpoint.display(), "FUSE worker exited cleanly");
}
Ok(s) => {
warn!(
mountpoint = %mountpoint.display(),
exit = format_exit(&s),
"FUSE worker exited unexpectedly"
);
}
Err(e) => {
warn!(
mountpoint = %mountpoint.display(),
error = %e,
"FUSE worker wait() failed"
);
}
}
}
fn format_exit(status: &ExitStatus) -> String {
use std::os::unix::process::ExitStatusExt;
if let Some(code) = status.code() {
format!("exit code {code}")
} else if let Some(sig) = status.signal() {
format!("signal {sig}")
} else {
format!("{status:?}")
}
}
fn send_signal(pid: i32, sig: i32) {
unsafe {
libc::kill(pid as libc::pid_t, sig);
}
}
fn supervise_watcher_spawn(
child_slot: Arc<Mutex<Option<Child>>>,
spawn_result: io::Result<JoinHandle<()>>,
) -> io::Result<JoinHandle<()>> {
match spawn_result {
Ok(handle) => Ok(handle),
Err(e) => {
if let Some(mut child) = child_slot.lock_or_poisoned().take() {
let _ = child.kill();
let _ = child.wait();
}
Err(e)
}
}
}
fn stop_grace_from_env() -> Option<Duration> {
std::env::var(STOP_GRACE_ENV)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.map(Duration::from_millis)
}
pub fn default_worker_binary() -> Result<PathBuf> {
if let Ok(override_path) = std::env::var(WORKER_BINARY_ENV) {
return Ok(PathBuf::from(override_path));
}
let exe = std::env::current_exe().context("locate current heddle executable")?;
let dir = exe
.parent()
.ok_or_else(|| anyhow!("current exe has no parent dir: {}", exe.display()))?;
let candidate = dir.join("heddle-fuse-worker");
if !candidate.exists() {
bail!(
"heddle-fuse-worker not found next to {} (looked at {}); \
reinstall heddle or set {WORKER_BINARY_ENV}",
exe.display(),
candidate.display(),
);
}
Ok(candidate)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn worker_args_parses_a_minimal_set() {
let argv = vec![
"--repo-root",
"/tmp/repo",
"--thread-id",
"main",
"--mountpoint",
"/tmp/mnt",
"--ipc-fd",
"3",
];
let parsed = WorkerArgs::parse(&argv).expect("parse");
assert_eq!(parsed.repo_root, PathBuf::from("/tmp/repo"));
assert_eq!(parsed.thread_id, "main");
assert_eq!(parsed.mountpoint, PathBuf::from("/tmp/mnt"));
assert_eq!(parsed.ipc_fd, 3);
}
#[test]
fn worker_args_rejects_unknown_flags() {
let argv = vec![
"--repo-root",
"/tmp/repo",
"--thread-id",
"main",
"--mountpoint",
"/tmp/mnt",
"--ipc-fd",
"3",
"--mystery",
];
assert!(WorkerArgs::parse(&argv).is_err());
}
#[test]
fn worker_args_requires_all_fields() {
let argv = vec![
"--repo-root",
"/tmp/repo",
"--thread-id",
"main",
"--mountpoint",
"/tmp/mnt",
];
let err = WorkerArgs::parse(&argv).unwrap_err();
assert!(err.to_string().contains("--ipc-fd"));
}
#[test]
fn framing_round_trips_typed_messages() {
use std::io::Cursor;
let mut buf = Vec::new();
framing::write_frame(
&mut buf,
&WorkerEvent::MountReady {
pid: 12345,
mount_path: PathBuf::from("/tmp/x"),
},
)
.unwrap();
let mut r = Cursor::new(buf);
let parsed: WorkerEvent = framing::read_frame(&mut r).unwrap();
match parsed {
WorkerEvent::MountReady { pid, mount_path } => {
assert_eq!(pid, 12345);
assert_eq!(mount_path, PathBuf::from("/tmp/x"));
}
_ => panic!("wrong variant"),
}
}
#[test]
fn supervise_watcher_spawn_reaps_child_on_failure() {
let child = std::process::Command::new("sleep")
.arg("60")
.spawn()
.expect("spawn sleep");
let pid = child.id();
let slot: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(Some(child)));
let forged: io::Result<JoinHandle<()>> =
Err(io::Error::other("forced watcher-spawn failure"));
let result = supervise_watcher_spawn(Arc::clone(&slot), forged);
assert!(result.is_err(), "helper must propagate the spawn error");
assert!(
slot.lock_or_poisoned().is_none(),
"child must be taken out of the slot so it can be reaped"
);
let raw = unsafe { libc::kill(pid as i32, 0) };
let errno = io::Error::last_os_error().raw_os_error();
if raw == 0 {
unsafe {
libc::kill(pid as i32, libc::SIGKILL);
}
}
assert_eq!(
raw, -1,
"child pid {pid} should have been reaped (kill 0 returned {raw})"
);
assert_eq!(
errno,
Some(libc::ESRCH),
"expected ESRCH after reap, got errno={errno:?}"
);
}
#[test]
fn framing_rejects_oversize_frame() {
use std::io::Cursor;
let mut buf = Vec::new();
buf.extend_from_slice(&(1u32 << 30).to_le_bytes());
let mut r = Cursor::new(buf);
let err = framing::read_frame::<_, WorkerEvent>(&mut r).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
}
}