#![cfg(unix)]
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use grpc::{
DiscussionServiceServer, HookServiceServer, OperationLogQueryServiceServer,
SignalServiceServer, StateReviewServiceServer, TransactionServiceServer,
};
use objects::error::{HeddleError, Result};
use repo::{Repository, operation_dedup::OperationDedupStore};
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::Server;
use crate::grpc_local_impl::{
GrpcLocalService, LocalDiscussionService, LocalHookService, LocalOperationLogQueryService,
LocalSignalService, LocalStateReviewService, LocalTransactionService,
};
pub fn default_socket_path(heddle_dir: &Path) -> PathBuf {
heddle_dir.join("sockets").join("grpc.sock")
}
pub fn default_pid_path(heddle_dir: &Path) -> PathBuf {
heddle_dir.join("sockets").join("grpc.pid")
}
pub struct LocalDaemonConfig {
pub socket_path: PathBuf,
pub pid_path: PathBuf,
}
impl LocalDaemonConfig {
pub fn from_repo(repo: &Repository) -> Self {
let heddle_dir = repo.heddle_dir();
Self {
socket_path: default_socket_path(heddle_dir),
pid_path: default_pid_path(heddle_dir),
}
}
pub fn with_socket(mut self, path: PathBuf) -> Self {
self.socket_path = path;
self
}
}
struct PidGuard {
pid_path: PathBuf,
socket_path: PathBuf,
}
pub const PIDFILE_MARKER: &str = "heddle-agent";
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PidFileContents {
pub pid: i32,
pub started_at_secs: i64,
}
impl PidFileContents {
pub fn render(&self) -> String {
format!(
"{}\n{}\n{}\n",
self.pid, PIDFILE_MARKER, self.started_at_secs
)
}
pub fn parse(body: &str) -> Option<Self> {
let mut lines = body.lines();
let pid = lines.next()?.trim().parse::<i32>().ok()?;
let marker = lines.next()?.trim();
if marker != PIDFILE_MARKER {
return None;
}
let started_at_secs = lines.next()?.trim().parse::<i64>().ok()?;
Some(Self {
pid,
started_at_secs,
})
}
}
impl PidGuard {
fn install(pid_path: PathBuf, socket_path: PathBuf) -> Result<Self> {
if let Some(parent) = pid_path.parent() {
std::fs::create_dir_all(parent)?;
}
if pid_path.exists() {
let raw = std::fs::read_to_string(&pid_path).ok();
let parsed = raw.as_deref().and_then(PidFileContents::parse);
if let Some(existing) = parsed
&& pid_alive(existing.pid)
&& is_heddle_process(existing.pid)
{
return Err(HeddleError::Conflict(format!(
"heddle agent serve already running on this repo (pid {}); \
stop it first or remove {} if it's stale",
existing.pid,
pid_path.display()
)));
}
let _ = std::fs::remove_file(&pid_path);
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
}
let contents = PidFileContents {
pid: std::process::id() as i32,
started_at_secs: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0),
};
std::fs::write(&pid_path, contents.render())?;
Ok(Self {
pid_path,
socket_path,
})
}
}
impl Drop for PidGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.pid_path);
let _ = std::fs::remove_file(&self.socket_path);
}
}
#[cfg(any(target_os = "linux", target_os = "macos"))]
pub fn pid_alive(pid: i32) -> bool {
unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
pub fn pid_alive(_pid: i32) -> bool {
true
}
pub fn is_heddle_process(pid: i32) -> bool {
#[cfg(target_os = "linux")]
{
let exe = std::path::PathBuf::from(format!("/proc/{pid}/exe"));
match std::fs::read_link(&exe) {
Ok(path) => path.to_string_lossy().contains("heddle"),
Err(_) => false,
}
}
#[cfg(target_os = "macos")]
{
let mut buf = vec![0u8; libc::PROC_PIDPATHINFO_MAXSIZE as usize];
let len = unsafe { libc::proc_pidpath(pid, buf.as_mut_ptr() as *mut _, buf.len() as u32) };
if len <= 0 {
return false;
}
let path = String::from_utf8_lossy(&buf[..len as usize]);
path.contains("heddle")
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
{
let _ = pid;
false
}
}
pub async fn serve(
repo: Repository,
config: LocalDaemonConfig,
shutdown: impl std::future::Future<Output = ()> + Send + 'static,
) -> Result<()> {
if let Some(parent) = config.socket_path.parent() {
std::fs::create_dir_all(parent)?;
}
let _guard = PidGuard::install(config.pid_path.clone(), config.socket_path.clone())?;
if config.socket_path.exists() {
std::fs::remove_file(&config.socket_path)?;
}
let listener = UnixListener::bind(&config.socket_path).map_err(|e| {
HeddleError::Io(std::io::Error::new(
e.kind(),
format!("UnixListener::bind({}): {e}", config.socket_path.display()),
))
})?;
set_socket_mode_0600(&config.socket_path)?;
let dedup = Arc::new(OperationDedupStore::open(repo.heddle_dir())?);
let inner = GrpcLocalService::new(Arc::new(repo), dedup);
let state_review = StateReviewServiceServer::new(LocalStateReviewService::new(inner.clone()));
let discussion = DiscussionServiceServer::new(LocalDiscussionService::new(inner.clone()));
let signal = SignalServiceServer::new(LocalSignalService::new(inner.clone()));
let query =
OperationLogQueryServiceServer::new(LocalOperationLogQueryService::new(inner.clone()));
let transaction = TransactionServiceServer::new(LocalTransactionService::new(inner.clone()));
let hook = HookServiceServer::new(LocalHookService::new(inner));
let incoming = UnixListenerStream::new(listener);
Server::builder()
.add_service(state_review)
.add_service(discussion)
.add_service(signal)
.add_service(query)
.add_service(transaction)
.add_service(hook)
.serve_with_incoming_shutdown(incoming, shutdown)
.await
.map_err(|e| HeddleError::InvalidObject(format!("local daemon transport failed: {e}")))?;
Ok(())
}
#[cfg(unix)]
fn set_socket_mode_0600(path: &Path) -> Result<()> {
use std::os::unix::fs::PermissionsExt;
let permissions = std::fs::Permissions::from_mode(0o600);
std::fs::set_permissions(path, permissions)?;
Ok(())
}
pub fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> Result<()> {
let creds = stream
.peer_cred()
.map_err(|e| HeddleError::InvalidObject(format!("peer_cred failed: {e}")))?;
let our_uid = unsafe { libc::geteuid() };
if creds.uid() != our_uid {
return Err(HeddleError::Conflict(format!(
"peer uid {} does not match daemon uid {our_uid}",
creds.uid()
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
#[test]
fn default_socket_path_lives_under_heddle_dir() {
let temp = TempDir::new().unwrap();
let heddle = temp.path().join(".heddle");
std::fs::create_dir_all(&heddle).unwrap();
let path = default_socket_path(&heddle);
assert!(path.starts_with(&heddle));
assert!(path.ends_with("grpc.sock"));
}
#[test]
fn pid_guard_writes_and_removes_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
assert!(pid.exists());
drop(guard);
assert!(!pid.exists());
assert!(!sock.exists());
}
#[test]
fn pid_guard_refuses_when_live_heddle_process_owns_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let first = PidGuard::install(pid.clone(), sock.clone()).unwrap();
if is_heddle_process(std::process::id() as i32) {
let result = PidGuard::install(pid.clone(), sock.clone());
assert!(result.is_err(), "expected refusal for live owner");
} else {
let _second = PidGuard::install(pid.clone(), sock.clone()).unwrap();
}
drop(first);
}
#[test]
fn pid_guard_sweeps_stale_pidfile_with_dead_pid() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
let stale = PidFileContents {
pid: 2_147_483_646,
started_at_secs: 0,
};
std::fs::write(&pid, stale.render()).unwrap();
std::fs::write(&sock, "stale").unwrap();
let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
let raw = std::fs::read_to_string(&pid).unwrap();
let parsed = PidFileContents::parse(&raw).expect("guard wrote structured pidfile");
assert_eq!(parsed.pid, std::process::id() as i32);
assert!(parsed.started_at_secs > 0);
}
#[test]
fn pid_guard_sweeps_legacy_unstructured_pidfile() {
let temp = TempDir::new().unwrap();
let pid = temp.path().join("grpc.pid");
let sock = temp.path().join("grpc.sock");
std::fs::write(&pid, "12345").unwrap();
let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
let parsed = PidFileContents::parse(&std::fs::read_to_string(&pid).unwrap()).unwrap();
assert_eq!(parsed.pid, std::process::id() as i32);
}
#[test]
fn pidfile_contents_round_trip() {
let original = PidFileContents {
pid: 4321,
started_at_secs: 1_700_000_000,
};
let body = original.render();
let parsed = PidFileContents::parse(&body).expect("round-trip");
assert_eq!(parsed, original);
}
#[test]
fn pidfile_contents_rejects_missing_marker() {
let body = "1234\nnot-heddle-agent\n100\n";
assert!(PidFileContents::parse(body).is_none());
}
#[test]
fn pidfile_contents_rejects_bare_pid() {
assert!(PidFileContents::parse("12345").is_none());
}
}