1#![cfg(unix)]
31
32use std::{
33 path::{Path, PathBuf},
34 sync::{Arc, Mutex},
35};
36
37use grpc::{
38 DiscussionServiceServer, HookServiceServer, OperationLogQueryServiceServer,
39 SignalServiceServer, StateReviewServiceServer, TimelineServiceServer, TransactionServiceServer,
40};
41use objects::error::{HeddleError, Result};
42use repo::{Repository, operation_dedup::OperationDedupStore};
43use tokio::net::UnixListener;
44use tokio_stream::{StreamExt, wrappers::UnixListenerStream};
45use tonic::transport::Server;
46
47use crate::grpc_local_impl::{
48 GrpcLocalService, LocalDiscussionService, LocalHookService, LocalOperationLogQueryService,
49 LocalSignalService, LocalStateReviewService, LocalTimelineService, LocalTransactionService,
50};
51
52const PRIVATE_SOCKET_UMASK: libc::mode_t = 0o177;
53
54static SOCKET_BIND_UMASK_LOCK: Mutex<()> = Mutex::new(());
55
56pub fn default_socket_path(heddle_dir: &Path) -> PathBuf {
58 heddle_dir.join("sockets").join("grpc.sock")
59}
60
61pub fn default_pid_path(heddle_dir: &Path) -> PathBuf {
63 heddle_dir.join("sockets").join("grpc.pid")
64}
65
66pub struct LocalDaemonConfig {
69 pub socket_path: PathBuf,
70 pub pid_path: PathBuf,
71}
72
73impl LocalDaemonConfig {
74 pub fn from_repo(repo: &Repository) -> Self {
75 let heddle_dir = repo.heddle_dir();
76 Self {
77 socket_path: default_socket_path(heddle_dir),
78 pid_path: default_pid_path(heddle_dir),
79 }
80 }
81
82 pub fn with_socket(mut self, path: PathBuf) -> Self {
83 self.socket_path = path;
84 self
85 }
86}
87
88struct PidGuard {
91 pid_path: PathBuf,
92 socket_path: PathBuf,
93}
94
95pub const PIDFILE_MARKER: &str = "heddle-agent";
99
100#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct PidFileContents {
116 pub pid: i32,
117 pub started_at_secs: i64,
118}
119
120impl PidFileContents {
121 pub fn render(&self) -> String {
123 format!(
124 "{}\n{}\n{}\n",
125 self.pid, PIDFILE_MARKER, self.started_at_secs
126 )
127 }
128
129 pub fn parse(body: &str) -> Option<Self> {
133 let mut lines = body.lines();
134 let pid = lines.next()?.trim().parse::<i32>().ok()?;
135 let marker = lines.next()?.trim();
136 if marker != PIDFILE_MARKER {
137 return None;
138 }
139 let started_at_secs = lines.next()?.trim().parse::<i64>().ok()?;
140 Some(Self {
141 pid,
142 started_at_secs,
143 })
144 }
145}
146
147impl PidGuard {
148 fn install(pid_path: PathBuf, socket_path: PathBuf) -> Result<Self> {
149 if let Some(parent) = pid_path.parent() {
150 std::fs::create_dir_all(parent)?;
151 }
152 if pid_path.exists() {
159 let raw = std::fs::read_to_string(&pid_path).ok();
160 let parsed = raw.as_deref().and_then(PidFileContents::parse);
161 if let Some(existing) = parsed
162 && pid_alive(existing.pid)
163 && is_heddle_process(existing.pid)
164 {
165 return Err(HeddleError::Conflict(format!(
166 "heddle agent serve already running on this repo (pid {}); \
167 stop it first or remove {} if it's stale",
168 existing.pid,
169 pid_path.display()
170 )));
171 }
172 let _ = std::fs::remove_file(&pid_path);
174 if socket_path.exists() {
175 let _ = std::fs::remove_file(&socket_path);
176 }
177 }
178 let contents = PidFileContents {
180 pid: std::process::id() as i32,
181 started_at_secs: std::time::SystemTime::now()
182 .duration_since(std::time::UNIX_EPOCH)
183 .map(|d| d.as_secs() as i64)
184 .unwrap_or(0),
185 };
186 std::fs::write(&pid_path, contents.render())?;
187 Ok(Self {
188 pid_path,
189 socket_path,
190 })
191 }
192}
193
194impl Drop for PidGuard {
195 fn drop(&mut self) {
196 let _ = std::fs::remove_file(&self.pid_path);
197 let _ = std::fs::remove_file(&self.socket_path);
198 }
199}
200
201#[cfg(any(target_os = "linux", target_os = "macos"))]
202pub fn pid_alive(pid: i32) -> bool {
203 unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
207}
208
209#[cfg(not(any(target_os = "linux", target_os = "macos")))]
210pub fn pid_alive(_pid: i32) -> bool {
211 true
214}
215
216pub fn is_heddle_process(pid: i32) -> bool {
230 process_uid_matches_self(pid) && process_exe_matches_current(pid)
231}
232
233#[cfg(target_os = "linux")]
234fn process_uid_matches_self(pid: i32) -> bool {
235 use std::os::unix::fs::MetadataExt;
236
237 let path = PathBuf::from(format!("/proc/{pid}"));
238 let Ok(metadata) = std::fs::metadata(path) else {
239 return false;
240 };
241 metadata.uid() == unsafe { libc::geteuid() }
243}
244
245#[cfg(not(target_os = "linux"))]
246fn process_uid_matches_self(_pid: i32) -> bool {
247 true
248}
249
250fn process_exe_matches_current(pid: i32) -> bool {
251 let Some(process_exe) = process_exe_path(pid) else {
252 return false;
253 };
254 let Ok(current_exe) = std::env::current_exe() else {
255 return false;
256 };
257 executable_identity_matches(&process_exe, ¤t_exe)
258}
259
260fn executable_identity_matches(process_exe: &Path, current_exe: &Path) -> bool {
261 let Ok(process_exe) = process_exe.canonicalize() else {
262 return false;
263 };
264 let Ok(current_exe) = current_exe.canonicalize() else {
265 return false;
266 };
267 process_exe == current_exe
268}
269
270#[cfg(target_os = "linux")]
271fn process_exe_path(pid: i32) -> Option<PathBuf> {
272 std::fs::read_link(format!("/proc/{pid}/exe")).ok()
273}
274
275#[cfg(target_os = "macos")]
276fn process_exe_path(pid: i32) -> Option<PathBuf> {
277 use std::{ffi::OsString, os::unix::ffi::OsStringExt};
278
279 let mut buf = vec![0u8; libc::PROC_PIDPATHINFO_MAXSIZE as usize];
280 let len = unsafe { libc::proc_pidpath(pid, buf.as_mut_ptr() as *mut _, buf.len() as u32) };
282 if len <= 0 {
283 return None;
284 }
285 Some(PathBuf::from(OsString::from_vec(
286 buf[..len as usize].to_vec(),
287 )))
288}
289
290#[cfg(not(any(target_os = "linux", target_os = "macos")))]
291fn process_exe_path(_pid: i32) -> Option<PathBuf> {
292 None
293}
294
295pub async fn serve(
298 repo: Repository,
299 config: LocalDaemonConfig,
300 shutdown: impl std::future::Future<Output = ()> + Send + 'static,
301) -> Result<()> {
302 create_private_socket_parent(&config.socket_path)?;
303 let _guard = PidGuard::install(config.pid_path.clone(), config.socket_path.clone())?;
305
306 if config.socket_path.exists() {
309 std::fs::remove_file(&config.socket_path)?;
310 }
311 let listener = bind_private_unix_listener(&config.socket_path)?;
312 set_socket_mode_0600(&config.socket_path)?;
317 listener.set_nonblocking(true).map_err(|e| {
318 HeddleError::Io(std::io::Error::new(
319 e.kind(),
320 format!(
321 "UnixListener::set_nonblocking({}): {e}",
322 config.socket_path.display()
323 ),
324 ))
325 })?;
326 let listener = UnixListener::from_std(listener).map_err(|e| {
327 HeddleError::Io(std::io::Error::new(
328 e.kind(),
329 format!(
330 "UnixListener::from_std({}): {e}",
331 config.socket_path.display()
332 ),
333 ))
334 })?;
335
336 let report = crate::transaction_replay::replay_active_transactions(&repo);
352 if report.has_hard_failures() {
353 tracing::error!(
354 recovered_txns = report.recovered_transaction_ids.len(),
355 orphan_tmps = report.orphan_temp_files_removed,
356 unparseable = report.unparseable_sentinels.len(),
357 failed_sentinel_writes = report.failed_sentinel_writes.len(),
358 failed_orphan_deletes = report.failed_orphan_deletes.len(),
359 failed_oplog_appends = report.failed_oplog_appends.len(),
360 unreadable_entries = report.unreadable_entries,
361 scan_error = report.scan_error.as_deref().unwrap_or(""),
362 "local-daemon: transaction replay hit hard failures; \
363 scan may not have run or audit-trail entries were lost"
364 );
365 } else if report.has_recoverable_failures() {
366 tracing::warn!(
367 recovered_txns = report.recovered_transaction_ids.len(),
368 orphan_tmps = report.orphan_temp_files_removed,
369 unparseable = report.unparseable_sentinels.len(),
370 failed_sentinel_writes = report.failed_sentinel_writes.len(),
371 failed_orphan_deletes = report.failed_orphan_deletes.len(),
372 unreadable_entries = report.unreadable_entries,
373 "local-daemon: transaction replay left recoverable failures on disk; \
374 next startup will retry, but operator inspection is recommended"
375 );
376 } else if !report.is_clean() {
377 tracing::info!(
378 recovered_txns = report.recovered_transaction_ids.len(),
379 orphan_tmps = report.orphan_temp_files_removed,
380 "local-daemon: transaction replay recovered prior in-flight state"
381 );
382 }
383
384 let dedup = Arc::new(OperationDedupStore::open(repo.heddle_dir())?);
385 let inner = GrpcLocalService::new(Arc::new(repo), dedup);
386
387 let state_review = StateReviewServiceServer::new(LocalStateReviewService::new(inner.clone()));
388 let discussion = DiscussionServiceServer::new(LocalDiscussionService::new(inner.clone()));
389 let signal = SignalServiceServer::new(LocalSignalService::new(inner.clone()));
390 let query =
391 OperationLogQueryServiceServer::new(LocalOperationLogQueryService::new(inner.clone()));
392 let timeline = TimelineServiceServer::new(LocalTimelineService::new(inner.clone()));
393 let transaction = TransactionServiceServer::new(LocalTransactionService::new(inner.clone()));
394 let hook = HookServiceServer::new(LocalHookService::new(inner));
395
396 let incoming = UnixListenerStream::new(listener).filter_map(guard_peer_connection);
402
403 Server::builder()
404 .add_service(state_review)
405 .add_service(discussion)
406 .add_service(signal)
407 .add_service(query)
408 .add_service(timeline)
409 .add_service(transaction)
410 .add_service(hook)
411 .serve_with_incoming_shutdown(incoming, shutdown)
412 .await
413 .map_err(|e| HeddleError::InvalidObject(format!("local daemon transport failed: {e}")))?;
414 Ok(())
415}
416
417fn create_private_socket_parent(socket_path: &Path) -> Result<()> {
418 if let Some(parent) = socket_path.parent() {
419 use std::os::unix::fs::DirBuilderExt;
420 let mut builder = std::fs::DirBuilder::new();
421 builder.recursive(true).mode(0o700);
422 builder.create(parent)?;
423 }
424 Ok(())
425}
426
427fn bind_private_unix_listener(socket_path: &Path) -> Result<std::os::unix::net::UnixListener> {
428 let _lock = SOCKET_BIND_UMASK_LOCK
429 .lock()
430 .map_err(|_| HeddleError::InvalidObject("daemon socket umask lock poisoned".to_string()))?;
431 let _umask = UmaskGuard::set(PRIVATE_SOCKET_UMASK);
439 std::os::unix::net::UnixListener::bind(socket_path).map_err(|e| {
440 HeddleError::Io(std::io::Error::new(
441 e.kind(),
442 format!("UnixListener::bind({}): {e}", socket_path.display()),
443 ))
444 })
445}
446
447struct UmaskGuard {
448 previous: libc::mode_t,
449}
450
451impl UmaskGuard {
452 fn set(mask: libc::mode_t) -> Self {
453 let previous = unsafe { libc::umask(mask) };
457 Self { previous }
458 }
459}
460
461impl Drop for UmaskGuard {
462 fn drop(&mut self) {
463 unsafe {
465 libc::umask(self.previous);
466 }
467 }
468}
469
470#[cfg(unix)]
471fn set_socket_mode_0600(path: &Path) -> Result<()> {
472 use std::os::unix::fs::PermissionsExt;
473 let permissions = std::fs::Permissions::from_mode(0o600);
474 std::fs::set_permissions(path, permissions)?;
475 Ok(())
476}
477
478pub fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> Result<()> {
486 let creds = stream
487 .peer_cred()
488 .map_err(|e| HeddleError::InvalidObject(format!("peer_cred failed: {e}")))?;
489 let our_uid = unsafe { libc::geteuid() };
491 enforce_peer_uid(creds.uid(), our_uid)
492}
493
494fn enforce_peer_uid(peer_uid: u32, our_uid: u32) -> Result<()> {
499 if peer_uid != our_uid {
500 return Err(HeddleError::Conflict(format!(
501 "peer uid {peer_uid} does not match daemon uid {our_uid}"
502 )));
503 }
504 Ok(())
505}
506
507fn guard_peer_connection(
513 conn: std::io::Result<tokio::net::UnixStream>,
514) -> Option<std::io::Result<tokio::net::UnixStream>> {
515 match conn {
516 Ok(stream) => match check_peer_uid_matches_self(&stream) {
517 Ok(()) => Some(Ok(stream)),
518 Err(e) => {
519 tracing::warn!(
520 error = %e,
521 "local-daemon: rejecting connection from peer with mismatched uid"
522 );
523 None
524 }
525 },
526 Err(e) => Some(Err(e)),
527 }
528}
529
530#[cfg(test)]
531mod tests {
532 use tempfile::TempDir;
533
534 use super::*;
535
536 #[test]
537 #[serial_test::serial(process_global)]
538 fn default_socket_path_lives_under_heddle_dir() {
539 let temp = TempDir::new().unwrap();
540 let heddle = temp.path().join(".heddle");
541 std::fs::create_dir_all(&heddle).unwrap();
542 let path = default_socket_path(&heddle);
543 assert!(path.starts_with(&heddle));
544 assert!(path.ends_with("grpc.sock"));
545 }
546
547 #[test]
548 #[serial_test::serial(process_global)]
549 fn create_private_socket_parent_creates_new_parent_0700() {
550 use std::os::unix::fs::PermissionsExt;
551
552 let temp = TempDir::new().unwrap();
553 let socket = temp
554 .path()
555 .join(".heddle")
556 .join("sockets")
557 .join("grpc.sock");
558 create_private_socket_parent(&socket).unwrap();
559
560 let mode = std::fs::metadata(socket.parent().unwrap())
561 .unwrap()
562 .permissions()
563 .mode()
564 & 0o777;
565 assert_eq!(mode, 0o700, "new socket parent must be private");
566 }
567
568 #[test]
569 #[serial_test::serial(process_global)]
570 fn bind_private_unix_listener_creates_socket_0600_before_chmod() {
571 use std::os::unix::fs::PermissionsExt;
572
573 let temp = TempDir::new().unwrap();
574 let socket = temp.path().join("grpc.sock");
575
576 let _listener = match bind_private_unix_listener(&socket) {
577 Ok(listener) => listener,
578 Err(HeddleError::Io(err)) if err.kind() == std::io::ErrorKind::PermissionDenied => {
579 eprintln!(
580 "skipping daemon socket mode test: local Unix listener bind denied: {err}"
581 );
582 return;
583 }
584 Err(err) => panic!("bind private Unix listener: {err}"),
585 };
586
587 let mode = std::fs::metadata(&socket).unwrap().permissions().mode() & 0o777;
588 assert_eq!(
589 mode, 0o600,
590 "socket must be born private before set_socket_mode_0600 runs"
591 );
592 }
593
594 #[test]
595 #[serial_test::serial(process_global)]
596 fn bind_private_unix_listener_restores_umask_after_bind_error() {
597 let temp = TempDir::new().unwrap();
598 let socket = temp.path().join("missing").join("grpc.sock");
599 let before = current_umask();
600
601 let result = bind_private_unix_listener(&socket);
602
603 let after = current_umask();
604 assert!(result.is_err(), "bind should fail for a missing parent");
605 assert_eq!(after, before, "bind errors must restore the prior umask");
606 }
607
608 fn current_umask() -> libc::mode_t {
609 unsafe {
612 let current = libc::umask(0);
613 libc::umask(current);
614 current
615 }
616 }
617
618 #[test]
619 #[serial_test::serial(process_global)]
620 fn pid_guard_writes_and_removes_pidfile() {
621 let temp = TempDir::new().unwrap();
622 let pid = temp.path().join("grpc.pid");
623 let sock = temp.path().join("grpc.sock");
624 let guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
625 assert!(pid.exists());
626 drop(guard);
627 assert!(!pid.exists());
628 assert!(!sock.exists());
629 }
630
631 #[test]
632 #[serial_test::serial(process_global)]
633 fn pid_guard_refuses_when_live_heddle_process_owns_pidfile() {
634 let temp = TempDir::new().unwrap();
635 let pid = temp.path().join("grpc.pid");
636 let sock = temp.path().join("grpc.sock");
637 let first = PidGuard::install(pid.clone(), sock.clone()).unwrap();
641 let result = PidGuard::install(pid.clone(), sock.clone());
642 assert!(result.is_err(), "expected refusal for live owner");
643 drop(first);
644 }
645
646 #[test]
647 #[serial_test::serial(process_global)]
648 fn pid_guard_sweeps_stale_pidfile_with_dead_pid() {
649 let temp = TempDir::new().unwrap();
650 let pid = temp.path().join("grpc.pid");
651 let sock = temp.path().join("grpc.sock");
652 let stale = PidFileContents {
654 pid: 2_147_483_646,
655 started_at_secs: 0,
656 };
657 std::fs::write(&pid, stale.render()).unwrap();
658 std::fs::write(&sock, "stale").unwrap();
659 let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
660 let raw = std::fs::read_to_string(&pid).unwrap();
662 let parsed = PidFileContents::parse(&raw).expect("guard wrote structured pidfile");
663 assert_eq!(parsed.pid, std::process::id() as i32);
664 assert!(parsed.started_at_secs > 0);
665 }
666
667 #[test]
668 #[serial_test::serial(process_global)]
669 fn pid_guard_sweeps_legacy_unstructured_pidfile() {
670 let temp = TempDir::new().unwrap();
674 let pid = temp.path().join("grpc.pid");
675 let sock = temp.path().join("grpc.sock");
676 std::fs::write(&pid, "12345").unwrap();
677 let _guard = PidGuard::install(pid.clone(), sock.clone()).unwrap();
678 let parsed = PidFileContents::parse(&std::fs::read_to_string(&pid).unwrap()).unwrap();
679 assert_eq!(parsed.pid, std::process::id() as i32);
680 }
681
682 #[test]
683 fn pidfile_contents_round_trip() {
684 let original = PidFileContents {
685 pid: 4321,
686 started_at_secs: 1_700_000_000,
687 };
688 let body = original.render();
689 let parsed = PidFileContents::parse(&body).expect("round-trip");
690 assert_eq!(parsed, original);
691 }
692
693 #[test]
694 fn pidfile_contents_rejects_missing_marker() {
695 let body = "1234\nnot-heddle-agent\n100\n";
698 assert!(PidFileContents::parse(body).is_none());
699 }
700
701 #[test]
702 fn pidfile_contents_rejects_bare_pid() {
703 assert!(PidFileContents::parse("12345").is_none());
706 }
707
708 #[test]
709 fn executable_identity_accepts_same_canonical_path() {
710 let current = std::env::current_exe().unwrap();
711 assert!(executable_identity_matches(¤t, ¤t));
712 }
713
714 #[test]
715 fn executable_identity_rejects_spoofed_heddle_path() {
716 let temp = TempDir::new().unwrap();
717 let spoofed = temp.path().join("contains-heddle").join("heddle-spoof");
718 std::fs::create_dir_all(spoofed.parent().unwrap()).unwrap();
719 std::fs::write(&spoofed, "not the current executable").unwrap();
720
721 let current = std::env::current_exe().unwrap();
722
723 assert!(
724 !executable_identity_matches(&spoofed, ¤t),
725 "a pathname containing heddle must not satisfy executable identity"
726 );
727 }
728
729 #[test]
730 fn is_heddle_process_accepts_self_pid() {
731 assert!(
732 is_heddle_process(std::process::id() as i32),
733 "the current process should resolve to the current executable"
734 );
735 }
736
737 #[test]
738 fn enforce_peer_uid_admits_matching_uid() {
739 assert!(enforce_peer_uid(1000, 1000).is_ok());
742 }
743
744 #[test]
745 fn enforce_peer_uid_rejects_mismatched_uid() {
746 let err = enforce_peer_uid(1001, 1000).unwrap_err();
749 assert!(
750 matches!(err, HeddleError::Conflict(_)),
751 "mismatched peer uid must be a Conflict, got {err:?}"
752 );
753 }
754
755 #[test]
756 fn guard_propagates_listener_io_errors() {
757 let io_err = std::io::Error::other("accept failed");
761 let out = guard_peer_connection(Err(io_err));
762 assert!(matches!(out, Some(Err(_))), "io errors must propagate");
763 }
764
765 #[tokio::test]
766 async fn guard_admits_same_process_peer() {
767 let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
771 let out = guard_peer_connection(Ok(peer));
772 assert!(
773 matches!(out, Some(Ok(_))),
774 "a same-uid peer must be admitted by the gate"
775 );
776 }
777
778 #[tokio::test]
779 async fn check_peer_uid_matches_self_admits_socketpair() {
780 let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
783 assert!(check_peer_uid_matches_self(&peer).is_ok());
784 }
785}