#![cfg(unix)]
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use grpc::{
DiscussionServiceServer, HookServiceServer, OperationLogQueryServiceServer,
SignalServiceServer, StateReviewServiceServer, TimelineServiceServer, TransactionServiceServer,
};
use objects::error::{HeddleError, Result};
use repo::{Repository, operation_dedup::OperationDedupStore};
use tokio::net::UnixListener;
use tokio_stream::{StreamExt, wrappers::UnixListenerStream};
use tonic::transport::Server;
use crate::grpc_local_impl::{
GrpcLocalService, LocalDiscussionService, LocalHookService, LocalOperationLogQueryService,
LocalSignalService, LocalStateReviewService, LocalTimelineService, LocalTransactionService,
};
const PRIVATE_SOCKET_UMASK: libc::mode_t = 0o177;
static SOCKET_BIND_UMASK_LOCK: Mutex<()> = Mutex::new(());
pub fn default_socket_path(heddle_dir: &Path) -> PathBuf {
heddle_dir.join("sockets").join("grpc.sock")
}
pub fn default_pid_path(heddle_dir: &Path) -> PathBuf {
heddle_dir.join("sockets").join("grpc.pid")
}
pub struct LocalDaemonConfig {
pub socket_path: PathBuf,
pub pid_path: PathBuf,
}
impl LocalDaemonConfig {
pub fn from_repo(repo: &Repository) -> Self {
let heddle_dir = repo.heddle_dir();
Self {
socket_path: default_socket_path(heddle_dir),
pid_path: default_pid_path(heddle_dir),
}
}
pub fn with_socket(mut self, path: PathBuf) -> Self {
self.socket_path = path;
self
}
}
struct PidGuard {
pid_path: PathBuf,
socket_path: PathBuf,
}
pub const PIDFILE_MARKER: &str = "heddle-agent";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PidFileContents {
pub pid: i32,
pub started_at_secs: i64,
}
impl PidFileContents {
pub fn render(&self) -> String {
format!(
"{}\n{}\n{}\n",
self.pid, PIDFILE_MARKER, self.started_at_secs
)
}
pub fn parse(body: &str) -> Option<Self> {
let mut lines = body.lines();
let pid = lines.next()?.trim().parse::<i32>().ok()?;
let marker = lines.next()?.trim();
if marker != PIDFILE_MARKER {
return None;
}
let started_at_secs = lines.next()?.trim().parse::<i64>().ok()?;
Some(Self {
pid,
started_at_secs,
})
}
}
impl PidGuard {
fn install(pid_path: PathBuf, socket_path: PathBuf) -> Result<Self> {
if let Some(parent) = pid_path.parent() {
std::fs::create_dir_all(parent)?;
}
if pid_path.exists() {
let raw = std::fs::read_to_string(&pid_path).ok();
let parsed = raw.as_deref().and_then(PidFileContents::parse);
if let Some(existing) = parsed
&& pid_alive(existing.pid)
&& is_heddle_process(existing.pid)
{
return Err(HeddleError::Conflict(format!(
"heddle agent serve already running on this repo (pid {}); \
stop it first or remove {} if it's stale",
existing.pid,
pid_path.display()
)));
}
let _ = std::fs::remove_file(&pid_path);
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
}
let contents = PidFileContents {
pid: std::process::id() as i32,
started_at_secs: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
};
std::fs::write(&pid_path, contents.render())?;
Ok(Self {
pid_path,
socket_path,
})
}
}
impl Drop for PidGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.pid_path);
let _ = std::fs::remove_file(&self.socket_path);
}
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub fn pid_alive(pid: i32) -> bool {
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
pub fn pid_alive(_pid: i32) -> bool {
true
}
pub fn is_heddle_process(pid: i32) -> bool {
process_uid_matches_self(pid) && process_exe_matches_current(pid)
}
#[cfg(target_os = "linux")]
fn process_uid_matches_self(pid: i32) -> bool {
use std::os::unix::fs::MetadataExt;
let path = PathBuf::from(format!("/proc/{pid}"));
let Ok(metadata) = std::fs::metadata(path) else {
return false;
};
metadata.uid() == unsafe { libc::geteuid() }
}
#[cfg(not(target_os = "linux"))]
fn process_uid_matches_self(_pid: i32) -> bool {
true
}
fn process_exe_matches_current(pid: i32) -> bool {
let Some(process_exe) = process_exe_path(pid) else {
return false;
};
let Ok(current_exe) = std::env::current_exe() else {
return false;
};
executable_identity_matches(&process_exe, ¤t_exe)
}
fn executable_identity_matches(process_exe: &Path, current_exe: &Path) -> bool {
let Ok(process_exe) = process_exe.canonicalize() else {
return false;
};
let Ok(current_exe) = current_exe.canonicalize() else {
return false;
};
process_exe == current_exe
}
#[cfg(target_os = "linux")]
fn process_exe_path(pid: i32) -> Option<PathBuf> {
std::fs::read_link(format!("/proc/{pid}/exe")).ok()
}
#[cfg(target_os = "macos")]
fn process_exe_path(pid: i32) -> Option<PathBuf> {
use std::{ffi::OsString, os::unix::ffi::OsStringExt};
let mut buf = vec![0u8; libc::PROC_PIDPATHINFO_MAXSIZE as usize];
let len = unsafe { libc::proc_pidpath(pid, buf.as_mut_ptr() as *mut _, buf.len() as u32) };
if len <= 0 {
return None;
}
Some(PathBuf::from(OsString::from_vec(
buf[..len as usize].to_vec(),
)))
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn process_exe_path(_pid: i32) -> Option<PathBuf> {
None
}
pub async fn serve(
repo: Repository,
config: LocalDaemonConfig,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<()> {
create_private_socket_parent(&config.socket_path)?;
let _guard = PidGuard::install(config.pid_path.clone(), config.socket_path.clone())?;
if config.socket_path.exists() {
std::fs::remove_file(&config.socket_path)?;
}
let listener = bind_private_unix_listener(&config.socket_path)?;
set_socket_mode_0600(&config.socket_path)?;
listener.set_nonblocking(true).map_err(|e| {
HeddleError::Io(std::io::Error::new(
e.kind(),
format!(
"UnixListener::set_nonblocking({}): {e}",
config.socket_path.display()
),
))
})?;
let listener = UnixListener::from_std(listener).map_err(|e| {
HeddleError::Io(std::io::Error::new(
e.kind(),
format!(
"UnixListener::from_std({}): {e}",
config.socket_path.display()
),
))
})?;
let report = crate::transaction_replay::replay_active_transactions(&repo);
if report.has_hard_failures() {
tracing::error!(
recovered_txns = report.recovered_transaction_ids.len(),
orphan_tmps = report.orphan_temp_files_removed,
unparseable = report.unparseable_sentinels.len(),
failed_sentinel_writes = report.failed_sentinel_writes.len(),
failed_orphan_deletes = report.failed_orphan_deletes.len(),
failed_oplog_appends = report.failed_oplog_appends.len(),
unreadable_entries = report.unreadable_entries,
scan_error = report.scan_error.as_deref().unwrap_or(""),
"local-daemon: transaction replay hit hard failures; \
scan may not have run or audit-trail entries were lost"
);
} else if report.has_recoverable_failures() {
tracing::warn!(
recovered_txns = report.recovered_transaction_ids.len(),
orphan_tmps = report.orphan_temp_files_removed,
unparseable = report.unparseable_sentinels.len(),
failed_sentinel_writes = report.failed_sentinel_writes.len(),
failed_orphan_deletes = report.failed_orphan_deletes.len(),
unreadable_entries = report.unreadable_entries,
"local-daemon: transaction replay left recoverable failures on disk; \
next startup will retry, but operator inspection is recommended"
);
} else if !report.is_clean() {
tracing::info!(
recovered_txns = report.recovered_transaction_ids.len(),
orphan_tmps = report.orphan_temp_files_removed,
"local-daemon: transaction replay recovered prior in-flight state"
);
}
let dedup = Arc::new(OperationDedupStore::open(repo.heddle_dir())?);
let inner = GrpcLocalService::new(Arc::new(repo), dedup);
let state_review = StateReviewServiceServer::new(LocalStateReviewService::new(inner.clone()));
let discussion = DiscussionServiceServer::new(LocalDiscussionService::new(inner.clone()));
let signal = SignalServiceServer::new(LocalSignalService::new(inner.clone()));
let query =
OperationLogQueryServiceServer::new(LocalOperationLogQueryService::new(inner.clone()));
let timeline = TimelineServiceServer::new(LocalTimelineService::new(inner.clone()));
let transaction = TransactionServiceServer::new(LocalTransactionService::new(inner.clone()));
let hook = HookServiceServer::new(LocalHookService::new(inner));
let incoming = UnixListenerStream::new(listener).filter_map(guard_peer_connection);
Server::builder()
.add_service(state_review)
.add_service(discussion)
.add_service(signal)
.add_service(query)
.add_service(timeline)
.add_service(transaction)
.add_service(hook)
.serve_with_incoming_shutdown(incoming, shutdown)
.await
.map_err(|e| HeddleError::InvalidObject(format!("local daemon transport failed: {e}")))?;
Ok(())
}
fn create_private_socket_parent(socket_path: &Path) -> Result<()> {
if let Some(parent) = socket_path.parent() {
use std::os::unix::fs::DirBuilderExt;
let mut builder = std::fs::DirBuilder::new();
builder.recursive(true).mode(0o700);
builder.create(parent)?;
}
Ok(())
}
fn bind_private_unix_listener(socket_path: &Path) -> Result<std::os::unix::net::UnixListener> {
let _lock = SOCKET_BIND_UMASK_LOCK
.lock()
.map_err(|_| HeddleError::InvalidObject("daemon socket umask lock poisoned".to_string()))?;
let _umask = UmaskGuard::set(PRIVATE_SOCKET_UMASK);
std::os::unix::net::UnixListener::bind(socket_path).map_err(|e| {
HeddleError::Io(std::io::Error::new(
e.kind(),
format!("UnixListener::bind({}): {e}", socket_path.display()),
))
})
}
struct UmaskGuard {
previous: libc::mode_t,
}
impl UmaskGuard {
fn set(mask: libc::mode_t) -> Self {
let previous = unsafe { libc::umask(mask) };
Self { previous }
}
}
impl Drop for UmaskGuard {
fn drop(&mut self) {
unsafe {
libc::umask(self.previous);
}
}
}
#[cfg(unix)]
fn set_socket_mode_0600(path: &Path) -> Result<()> {
use std::os::unix::fs::PermissionsExt;
let permissions = std::fs::Permissions::from_mode(0o600);
std::fs::set_permissions(path, permissions)?;
Ok(())
}
pub fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> Result<()> {
let creds = stream
.peer_cred()
.map_err(|e| HeddleError::InvalidObject(format!("peer_cred failed: {e}")))?;
let our_uid = unsafe { libc::geteuid() };
enforce_peer_uid(creds.uid(), our_uid)
}
fn enforce_peer_uid(peer_uid: u32, our_uid: u32) -> Result<()> {
if peer_uid != our_uid {
return Err(HeddleError::Conflict(format!(
"peer uid {peer_uid} does not match daemon uid {our_uid}"
)));
}
Ok(())
}
fn guard_peer_connection(
conn: std::io::Result<tokio::net::UnixStream>,
) -> Option<std::io::Result<tokio::net::UnixStream>> {
match conn {
Ok(stream) => match check_peer_uid_matches_self(&stream) {
Ok(()) => Some(Ok(stream)),
Err(e) => {
tracing::warn!(
error = %e,
"local-daemon: rejecting connection from peer with mismatched uid"
);
None
}
},
Err(e) => Some(Err(e)),
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
#[test]
#[serial_test::serial(process_global)]
fn default_socket_path_lives_under_heddle_dir() {
let temp = TempDir::new().unwrap();
let heddle = temp.path().join(".heddle");
std::fs::create_dir_all(&heddle).unwrap();
let path = default_socket_path(&heddle);
assert!(path.starts_with(&heddle));
assert!(path.ends_with("grpc.sock"));
}
#[test]
#[serial_test::serial(process_global)]
fn create_private_socket_parent_creates_new_parent_0700() {
use std::os::unix::fs::PermissionsExt;
let temp = TempDir::new().unwrap();
let socket = temp
.path()
.join(".heddle")
.join("sockets")
.join("grpc.sock");
create_private_socket_parent(&socket).unwrap();
let mode = std::fs::metadata(socket.parent().unwrap())
.unwrap()
.permissions()
.mode()
& 0o777;
assert_eq!(mode, 0o700, "new socket parent must be private");
}
#[test]
#[serial_test::serial(process_global)]
fn bind_private_unix_listener_creates_socket_0600_before_chmod() {
use std::os::unix::fs::PermissionsExt;
let temp = TempDir::new().unwrap();
let socket = temp.path().join("grpc.sock");
let _listener = match bind_private_unix_listener(&socket) {
Ok(listener) => listener,
Err(HeddleError::Io(err)) if err.kind() == std::io::ErrorKind::PermissionDenied => {
eprintln!(
"skipping daemon socket mode test: local Unix listener bind denied: {err}"
);
return;
}
Err(err) => panic!("bind private Unix listener: {err}"),
};
let mode = std::fs::metadata(&socket).unwrap().permissions().mode() & 0o777;
assert_eq!(
mode, 0o600,
"socket must be born private before set_socket_mode_0600 runs"
);
}
#[test]
#[serial_test::serial(process_global)]
fn bind_private_unix_listener_restores_umask_after_bind_error() {
let temp = TempDir::new().unwrap();
let socket = temp.path().join("missing").join("grpc.sock");
let before = current_umask();
let result = bind_private_unix_listener(&socket);
let after = current_umask();
assert!(result.is_err(), "bind should fail for a missing parent");
assert_eq!(after, before, "bind errors must restore the prior umask");
}
fn current_umask() -> libc::mode_t {
unsafe {
let current = libc::umask(0);
libc::umask(current);
current
}
}
#[test]
#[serial_test::serial(process_global)]
fn pid_guard_writes_and_removes_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
assert!(pid.exists());
drop(guard);
assert!(!pid.exists());
assert!(!sock.exists());
}
#[test]
#[serial_test::serial(process_global)]
fn pid_guard_refuses_when_live_heddle_process_owns_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let first = PidGuard::install(pid.clone(), sock.clone()).unwrap();
let result = PidGuard::install(pid.clone(), sock.clone());
assert!(result.is_err(), "expected refusal for live owner");
drop(first);
}
#[test]
#[serial_test::serial(process_global)]
fn pid_guard_sweeps_stale_pidfile_with_dead_pid() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let stale = PidFileContents {
pid: 2_147_483_646,
started_at_secs: 0,
};
std::fs::write(&pid, stale.render()).unwrap();
std::fs::write(&sock, "stale").unwrap();
let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
let raw = std::fs::read_to_string(&pid).unwrap();
let parsed = PidFileContents::parse(&raw).expect("guard wrote structured pidfile");
assert_eq!(parsed.pid, std::process::id() as i32);
assert!(parsed.started_at_secs > 0);
}
#[test]
#[serial_test::serial(process_global)]
fn pid_guard_sweeps_legacy_unstructured_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
std::fs::write(&pid, "12345").unwrap();
let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
let parsed = PidFileContents::parse(&std::fs::read_to_string(&pid).unwrap()).unwrap();
assert_eq!(parsed.pid, std::process::id() as i32);
}
#[test]
fn pidfile_contents_round_trip() {
let original = PidFileContents {
pid: 4321,
started_at_secs: 1_700_000_000,
};
let body = original.render();
let parsed = PidFileContents::parse(&body).expect("round-trip");
assert_eq!(parsed, original);
}
#[test]
fn pidfile_contents_rejects_missing_marker() {
let body = "1234\nnot-heddle-agent\n100\n";
assert!(PidFileContents::parse(body).is_none());
}
#[test]
fn pidfile_contents_rejects_bare_pid() {
assert!(PidFileContents::parse("12345").is_none());
}
#[test]
fn executable_identity_accepts_same_canonical_path() {
let current = std::env::current_exe().unwrap();
assert!(executable_identity_matches(¤t, ¤t));
}
#[test]
fn executable_identity_rejects_spoofed_heddle_path() {
let temp = TempDir::new().unwrap();
let spoofed = temp.path().join("contains-heddle").join("heddle-spoof");
std::fs::create_dir_all(spoofed.parent().unwrap()).unwrap();
std::fs::write(&spoofed, "not the current executable").unwrap();
let current = std::env::current_exe().unwrap();
assert!(
!executable_identity_matches(&spoofed, ¤t),
"a pathname containing heddle must not satisfy executable identity"
);
}
#[test]
fn is_heddle_process_accepts_self_pid() {
assert!(
is_heddle_process(std::process::id() as i32),
"the current process should resolve to the current executable"
);
}
#[test]
fn enforce_peer_uid_admits_matching_uid() {
assert!(enforce_peer_uid(1000, 1000).is_ok());
}
#[test]
fn enforce_peer_uid_rejects_mismatched_uid() {
let err = enforce_peer_uid(1001, 1000).unwrap_err();
assert!(
matches!(err, HeddleError::Conflict(_)),
"mismatched peer uid must be a Conflict, got {err:?}"
);
}
#[test]
fn guard_propagates_listener_io_errors() {
let io_err = std::io::Error::other("accept failed");
let out = guard_peer_connection(Err(io_err));
assert!(matches!(out, Some(Err(_))), "io errors must propagate");
}
#[tokio::test]
async fn guard_admits_same_process_peer() {
let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
let out = guard_peer_connection(Ok(peer));
assert!(
matches!(out, Some(Ok(_))),
"a same-uid peer must be admitted by the gate"
);
}
#[tokio::test]
async fn check_peer_uid_matches_self_admits_socketpair() {
let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
assert!(check_peer_uid_matches_self(&peer).is_ok());
}
}