1#![cfg(unix)]
31
32use std::{
33 path::{Path, PathBuf},
34 sync::Arc,
35};
36
37use grpc::{
38 DiscussionServiceServer, HookServiceServer, OperationLogQueryServiceServer,
39 SignalServiceServer, StateReviewServiceServer, TransactionServiceServer,
40};
41use objects::error::{HeddleError, Result};
42use repo::{Repository, operation_dedup::OperationDedupStore};
43use tokio::net::UnixListener;
44use tokio_stream::wrappers::UnixListenerStream;
45use tonic::transport::Server;
46
47use crate::grpc_local_impl::{
48 GrpcLocalService, LocalDiscussionService, LocalHookService, LocalOperationLogQueryService,
49 LocalSignalService, LocalStateReviewService, LocalTransactionService,
50};
51
52pub fn default_socket_path(heddle_dir: &Path) -> PathBuf {
54 heddle_dir.join("sockets").join("grpc.sock")
55}
56
57pub fn default_pid_path(heddle_dir: &Path) -> PathBuf {
59 heddle_dir.join("sockets").join("grpc.pid")
60}
61
62pub struct LocalDaemonConfig {
65 pub socket_path: PathBuf,
66 pub pid_path: PathBuf,
67}
68
69impl LocalDaemonConfig {
70 pub fn from_repo(repo: &Repository) -> Self {
71 let heddle_dir = repo.heddle_dir();
72 Self {
73 socket_path: default_socket_path(heddle_dir),
74 pid_path: default_pid_path(heddle_dir),
75 }
76 }
77
78 pub fn with_socket(mut self, path: PathBuf) -> Self {
79 self.socket_path = path;
80 self
81 }
82}
83
84struct PidGuard {
87 pid_path: PathBuf,
88 socket_path: PathBuf,
89}
90
91pub const PIDFILE_MARKER: &str = "heddle-agent";
95
96#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct PidFileContents {
112 pub pid: i32,
113 pub started_at_secs: i64,
114}
115
116impl PidFileContents {
117 pub fn render(&self) -> String {
119 format!(
120 "{}\n{}\n{}\n",
121 self.pid, PIDFILE_MARKER, self.started_at_secs
122 )
123 }
124
125 pub fn parse(body: &str) -> Option<Self> {
129 let mut lines = body.lines();
130 let pid = lines.next()?.trim().parse::<i32>().ok()?;
131 let marker = lines.next()?.trim();
132 if marker != PIDFILE_MARKER {
133 return None;
134 }
135 let started_at_secs = lines.next()?.trim().parse::<i64>().ok()?;
136 Some(Self {
137 pid,
138 started_at_secs,
139 })
140 }
141}
142
143impl PidGuard {
144 fn install(pid_path: PathBuf, socket_path: PathBuf) -> Result<Self> {
145 if let Some(parent) = pid_path.parent() {
146 std::fs::create_dir_all(parent)?;
147 }
148 if pid_path.exists() {
155 let raw = std::fs::read_to_string(&pid_path).ok();
156 let parsed = raw.as_deref().and_then(PidFileContents::parse);
157 if let Some(existing) = parsed
158 && pid_alive(existing.pid)
159 && is_heddle_process(existing.pid)
160 {
161 return Err(HeddleError::Conflict(format!(
162 "heddle agent serve already running on this repo (pid {}); \
163 stop it first or remove {} if it's stale",
164 existing.pid,
165 pid_path.display()
166 )));
167 }
168 let _ = std::fs::remove_file(&pid_path);
170 if socket_path.exists() {
171 let _ = std::fs::remove_file(&socket_path);
172 }
173 }
174 let contents = PidFileContents {
176 pid: std::process::id() as i32,
177 started_at_secs: std::time::SystemTime::now()
178 .duration_since(std::time::UNIX_EPOCH)
179 .map(|d| d.as_secs() as i64)
180 .unwrap_or(0),
181 };
182 std::fs::write(&pid_path, contents.render())?;
183 Ok(Self {
184 pid_path,
185 socket_path,
186 })
187 }
188}
189
190impl Drop for PidGuard {
191 fn drop(&mut self) {
192 let _ = std::fs::remove_file(&self.pid_path);
193 let _ = std::fs::remove_file(&self.socket_path);
194 }
195}
196
197#[cfg(any(target_os = "linux", target_os = "macos"))]
198pub fn pid_alive(pid: i32) -> bool {
199 unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
203}
204
205#[cfg(not(any(target_os = "linux", target_os = "macos")))]
206pub fn pid_alive(_pid: i32) -> bool {
207 true
210}
211
212pub fn is_heddle_process(pid: i32) -> bool {
226 #[cfg(target_os = "linux")]
227 {
228 let exe = std::path::PathBuf::from(format!("/proc/{pid}/exe"));
229 match std::fs::read_link(&exe) {
230 Ok(path) => path.to_string_lossy().contains("heddle"),
231 Err(_) => false,
234 }
235 }
236 #[cfg(target_os = "macos")]
237 {
238 let mut buf = vec![0u8; libc::PROC_PIDPATHINFO_MAXSIZE as usize];
239 let len = unsafe { libc::proc_pidpath(pid, buf.as_mut_ptr() as *mut _, buf.len() as u32) };
241 if len <= 0 {
242 return false;
243 }
244 let path = String::from_utf8_lossy(&buf[..len as usize]);
245 path.contains("heddle")
246 }
247 #[cfg(not(any(target_os = "linux", target_os = "macos")))]
248 {
249 let _ = pid;
250 false
251 }
252}
253
254pub async fn serve(
257 repo: Repository,
258 config: LocalDaemonConfig,
259 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
260) -> Result<()> {
261 if let Some(parent) = config.socket_path.parent() {
262 std::fs::create_dir_all(parent)?;
263 }
264 let _guard = PidGuard::install(config.pid_path.clone(), config.socket_path.clone())?;
266
267 if config.socket_path.exists() {
270 std::fs::remove_file(&config.socket_path)?;
271 }
272 let listener = UnixListener::bind(&config.socket_path).map_err(|e| {
273 HeddleError::Io(std::io::Error::new(
274 e.kind(),
275 format!("UnixListener::bind({}): {e}", config.socket_path.display()),
276 ))
277 })?;
278 set_socket_mode_0600(&config.socket_path)?;
280
281 let dedup = Arc::new(OperationDedupStore::open(repo.heddle_dir())?);
282 let inner = GrpcLocalService::new(Arc::new(repo), dedup);
283
284 let state_review = StateReviewServiceServer::new(LocalStateReviewService::new(inner.clone()));
285 let discussion = DiscussionServiceServer::new(LocalDiscussionService::new(inner.clone()));
286 let signal = SignalServiceServer::new(LocalSignalService::new(inner.clone()));
287 let query =
288 OperationLogQueryServiceServer::new(LocalOperationLogQueryService::new(inner.clone()));
289 let transaction = TransactionServiceServer::new(LocalTransactionService::new(inner.clone()));
290 let hook = HookServiceServer::new(LocalHookService::new(inner));
291
292 let incoming = UnixListenerStream::new(listener);
293
294 Server::builder()
295 .add_service(state_review)
296 .add_service(discussion)
297 .add_service(signal)
298 .add_service(query)
299 .add_service(transaction)
300 .add_service(hook)
301 .serve_with_incoming_shutdown(incoming, shutdown)
302 .await
303 .map_err(|e| HeddleError::InvalidObject(format!("local daemon transport failed: {e}")))?;
304 Ok(())
305}
306
307#[cfg(unix)]
308fn set_socket_mode_0600(path: &Path) -> Result<()> {
309 use std::os::unix::fs::PermissionsExt;
310 let permissions = std::fs::Permissions::from_mode(0o600);
311 std::fs::set_permissions(path, permissions)?;
312 Ok(())
313}
314
315pub fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> Result<()> {
322 let creds = stream
323 .peer_cred()
324 .map_err(|e| HeddleError::InvalidObject(format!("peer_cred failed: {e}")))?;
325 let our_uid = unsafe { libc::geteuid() };
327 if creds.uid() != our_uid {
328 return Err(HeddleError::Conflict(format!(
329 "peer uid {} does not match daemon uid {our_uid}",
330 creds.uid()
331 )));
332 }
333 Ok(())
334}
335
336#[cfg(test)]
337mod tests {
338 use tempfile::TempDir;
339
340 use super::*;
341
342 #[test]
343 fn default_socket_path_lives_under_heddle_dir() {
344 let temp = TempDir::new().unwrap();
345 let heddle = temp.path().join(".heddle");
346 std::fs::create_dir_all(&heddle).unwrap();
347 let path = default_socket_path(&heddle);
348 assert!(path.starts_with(&heddle));
349 assert!(path.ends_with("grpc.sock"));
350 }
351
352 #[test]
353 fn pid_guard_writes_and_removes_pidfile() {
354 let temp = TempDir::new().unwrap();
355 let pid = temp.path().join("grpc.pid");
356 let sock = temp.path().join("grpc.sock");
357 let guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
358 assert!(pid.exists());
359 drop(guard);
360 assert!(!pid.exists());
361 assert!(!sock.exists());
362 }
363
364 #[test]
365 fn pid_guard_refuses_when_live_heddle_process_owns_pidfile() {
366 let temp = TempDir::new().unwrap();
367 let pid = temp.path().join("grpc.pid");
368 let sock = temp.path().join("grpc.sock");
369 let first = PidGuard::install(pid.clone(), sock.clone()).unwrap();
376 if is_heddle_process(std::process::id() as i32) {
377 let result = PidGuard::install(pid.clone(), sock.clone());
380 assert!(result.is_err(), "expected refusal for live owner");
381 } else {
382 let _second = PidGuard::install(pid.clone(), sock.clone()).unwrap();
387 }
388 drop(first);
389 }
390
391 #[test]
392 fn pid_guard_sweeps_stale_pidfile_with_dead_pid() {
393 let temp = TempDir::new().unwrap();
394 let pid = temp.path().join("grpc.pid");
395 let sock = temp.path().join("grpc.sock");
396 let stale = PidFileContents {
398 pid: 2_147_483_646,
399 started_at_secs: 0,
400 };
401 std::fs::write(&pid, stale.render()).unwrap();
402 std::fs::write(&sock, "stale").unwrap();
403 let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
404 let raw = std::fs::read_to_string(&pid).unwrap();
406 let parsed = PidFileContents::parse(&raw).expect("guard wrote structured pidfile");
407 assert_eq!(parsed.pid, std::process::id() as i32);
408 assert!(parsed.started_at_secs > 0);
409 }
410
411 #[test]
412 fn pid_guard_sweeps_legacy_unstructured_pidfile() {
413 let temp = TempDir::new().unwrap();
417 let pid = temp.path().join("grpc.pid");
418 let sock = temp.path().join("grpc.sock");
419 std::fs::write(&pid, "12345").unwrap();
420 let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
421 let parsed = PidFileContents::parse(&std::fs::read_to_string(&pid).unwrap()).unwrap();
422 assert_eq!(parsed.pid, std::process::id() as i32);
423 }
424
425 #[test]
426 fn pidfile_contents_round_trip() {
427 let original = PidFileContents {
428 pid: 4321,
429 started_at_secs: 1_700_000_000,
430 };
431 let body = original.render();
432 let parsed = PidFileContents::parse(&body).expect("round-trip");
433 assert_eq!(parsed, original);
434 }
435
436 #[test]
437 fn pidfile_contents_rejects_missing_marker() {
438 let body = "1234\nnot-heddle-agent\n100\n";
441 assert!(PidFileContents::parse(body).is_none());
442 }
443
444 #[test]
445 fn pidfile_contents_rejects_bare_pid() {
446 assert!(PidFileContents::parse("12345").is_none());
449 }
450}