cli/client/local_daemon.rs
1// SPDX-License-Identifier: Apache-2.0
2//! Local-daemon auto-detection.
3//!
4//! Every gRPC-using verb in the CLI checks this first. When the per-repo
5//! `.heddle/sockets/grpc.sock` exists and the pidfile points at a live
6//! process, callers can route their RPC over the UDS instead of opening
7//! an in-process [`GrpcLocalService`]. The latency win matters for tight
8//! agent loops.
9//!
10//! Three layers:
11//!
12//! 1. [`detect_local_daemon`] — file-stat probe (pidfile + liveness via
13//! `kill(pid, 0)`). Cheap, syscall-only, used as the cheap negative
14//! case ("no daemon, fall through to in-process").
15//! 2. [`detect_local_daemon_with_connect_probe`] — same as (1) but
16//! actually opens a `UnixStream` to confirm the listener accepts.
17//! Catches the "stale socket file with a live unrelated PID" race.
18//! 3. [`connect_local_daemon_channel`] — full path: build a tonic
19//! [`tonic::transport::Channel`] over the UDS, run the gRPC
20//! `Health.Check` handshake, and cache the working channel for the
21//! rest of the process. This is what the read-shaped CLI verbs
22//! route through.
23//!
24//! All three caches are keyed by canonical heddle-dir path, so a CLI
25//! invocation that touches one repo pays the probe cost exactly once.
26
27use std::{
28 path::{Path, PathBuf},
29 time::Duration,
30};
31
32use crate::util::OnceMap;
33
34/// A reachable local daemon — the path of the UDS socket the caller
35/// can connect to. Returned by [`detect_local_daemon`] when the probe
36/// reports `Running`.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct UdsTarget {
39 pub socket_path: PathBuf,
40 pub pid: u32,
41}
42
43/// Cache key — canonical heddle-dir path so two probes from different
44/// CWDs against the same repo share a result.
45type ProbeCacheKey = PathBuf;
46
47/// Process-wide probe cache. Each heddle dir is probed at most once
48/// per process lifetime, after which subsequent calls return the
49/// cached `Option<UdsTarget>` without touching the filesystem.
50///
51/// Keyed by canonical heddle-dir path so a process that touches more
52/// than one repo (test binaries, agent dispatch loops) caches each
53/// repo independently.
54static DETECT_CACHE: OnceMap<ProbeCacheKey, Option<UdsTarget>> = OnceMap::new();
55
56/// Run the probe and, when the daemon is `Running`, return the UDS
57/// target a tonic client can dial. Cached for the process lifetime so
58/// hot agent loops don't pay two stat-syscalls per RPC.
59///
60/// Probing failure (`Absent`, `Stale`) returns `None` — the caller
61/// should fall through to its in-process or remote fallback. The
62/// full `Health.Check` + version handshake over a tonic UDS Channel
63/// layers on top of [`detect_local_daemon_with_connect_probe`] and
64/// [`connect_local_daemon_channel`].
65pub fn detect_local_daemon(heddle_dir: &Path) -> Option<UdsTarget> {
66 let key: ProbeCacheKey = heddle_dir.to_path_buf();
67 DETECT_CACHE.get_or_init_with(&key, || {
68 let probe = probe(heddle_dir);
69 match probe.status {
70 LocalDaemonStatus::Running { pid } => Some(UdsTarget {
71 socket_path: probe.socket_path,
72 pid,
73 }),
74 LocalDaemonStatus::Stale { .. } | LocalDaemonStatus::Absent => None,
75 }
76 })
77}
78
79/// Stronger variant of [`detect_local_daemon`] — runs the file-stat
80/// probe, then attempts a UDS connect to confirm the daemon is
81/// actually accepting connections (not just a stale pidfile that
82/// happens to point at a live unrelated process).
83///
84/// The connect step is intentionally bounded by `timeout`. Default
85/// callers should pass something tight (50ms is plenty for a local
86/// socket) so a hung daemon doesn't stall every CLI invocation.
87///
88/// Returns `None` if either the file-stat probe says `Absent`/`Stale`
89/// or the UDS connect fails / times out. Caches the *first* outcome
90/// for the given heddle dir; subsequent calls are O(1).
91#[cfg(unix)]
92pub async fn detect_local_daemon_with_connect_probe(
93 heddle_dir: &Path,
94 timeout: Duration,
95) -> Option<UdsTarget> {
96 // The file-stat probe handles the cache and the obvious negative
97 // cases; only the positive path needs the live connect.
98 let target = detect_local_daemon(heddle_dir)?;
99 match tokio::time::timeout(
100 timeout,
101 tokio::net::UnixStream::connect(&target.socket_path),
102 )
103 .await
104 {
105 Ok(Ok(_stream)) => Some(target),
106 // Either the connect errored (socket present but listener
107 // dead — rare but possible during a graceful shutdown) or
108 // the connect timed out (daemon hung). Either way it's not
109 // safe to route RPCs through it. The next probe will retry.
110 Ok(Err(_)) | Err(_) => None,
111 }
112}
113
114/// Process-wide cache of working tonic [`tonic::transport::Channel`]s
115/// keyed by canonical heddle-dir path. Once we've successfully passed
116/// the Health.Check handshake, every subsequent caller in the same
117/// process gets the same channel (which is itself internally pooled
118/// by tonic / hyper).
119///
120/// `Channel` is cheap to clone (it's a handle to the underlying
121/// connection pool), so handing out clones is the fastest way to
122/// amortize the connect cost across a hot agent loop. Keyed by
123/// canonical heddle-dir path so a process touching multiple repos
124/// (test binaries, multi-repo agents) keeps a per-repo channel.
125#[cfg(unix)]
126static CHANNEL_CACHE: OnceMap<ProbeCacheKey, tonic::transport::Channel> = OnceMap::new();
127
128/// Connect-and-handshake outcome for [`connect_local_daemon_channel`].
129///
130/// `target` is repeated for the convenience of callers that only need
131/// to know whether a daemon is reachable; the live `channel` is what
132/// actually issues RPCs.
133#[cfg(unix)]
134#[derive(Debug, Clone)]
135pub struct LocalDaemonChannel {
136 pub target: UdsTarget,
137 pub channel: tonic::transport::Channel,
138}
139
140/// Build a tonic [`tonic::transport::Channel`] over the per-repo UDS,
141/// perform the gRPC `Health.Check` handshake, and return both
142/// alongside the [`UdsTarget`].
143///
144/// On the happy path the channel is cached in [`CHANNEL_CACHE`] for
145/// the lifetime of this process and subsequent calls return clones of
146/// it in O(1). This is the path agent loops should use.
147///
148/// `connect_timeout` bounds the UDS connect *and* the Health.Check —
149/// 50–250ms is appropriate for a same-host socket; anything longer
150/// implies a hung daemon. Returns `None` on any failure mode (no
151/// daemon, connect failed, health refused) — the caller falls through
152/// to its in-process or remote fallback.
153///
154/// # First consumer (TODO)
155///
156/// `cmd_status` is the natural first consumer — its read-shaped output
157/// is built from a handful of `Repository` lookups that the
158/// `OperationLogQueryService` already covers. A future patch should
159/// branch in `crates/cli/src/cli/commands/status.rs::cmd_status` like:
160///
161/// ```ignore
162/// if let Some(LocalDaemonChannel { channel, .. }) =
163/// connect_local_daemon_channel(repo.heddle_dir(), Duration::from_millis(150)).await
164/// {
165/// let mut client = OperationLogQueryServiceClient::new(channel);
166/// // Build StatusOutput from RPCs instead of direct Repository reads.
167/// } else {
168/// // Existing in-process path.
169/// }
170/// ```
171///
172/// Held back from this patch because (a) the query surface doesn't
173/// yet cover every field in `StatusOutput`, and (b) the brief calls
174/// out the channel-construction primitive as the deliverable.
175#[cfg(unix)]
176pub async fn connect_local_daemon_channel(
177 heddle_dir: &Path,
178 connect_timeout: Duration,
179) -> Option<LocalDaemonChannel> {
180 let key: ProbeCacheKey = heddle_dir.to_path_buf();
181 if let Some(channel) = CHANNEL_CACHE.get(&key) {
182 // The detect cache holds the matching target — pull it back
183 // out so the returned struct stays self-contained.
184 let target = detect_local_daemon(heddle_dir)?;
185 return Some(LocalDaemonChannel { target, channel });
186 }
187
188 match build_channel(heddle_dir, connect_timeout).await {
189 Ok(LocalDaemonChannel { target, channel }) => {
190 CHANNEL_CACHE.insert(key, channel.clone());
191 Some(LocalDaemonChannel { target, channel })
192 }
193 Err(_) => None,
194 }
195}
196
197#[cfg(unix)]
198async fn build_channel(
199 heddle_dir: &Path,
200 connect_timeout: Duration,
201) -> std::result::Result<LocalDaemonChannel, ChannelError> {
202 let target = detect_local_daemon(heddle_dir).ok_or(ChannelError::NoDaemon)?;
203 // `unix:` URIs aren't usable as the *origin* on a HTTP/2 channel
204 // (the authority pseudo-header has to be a plausible host). The
205 // standard tonic UDS recipe is to give the endpoint an opaque
206 // `http://heddle-uds` URI for routing and override the connector
207 // with a service that returns a `UnixStream` regardless of what
208 // URI it's asked for.
209 let endpoint = tonic::transport::Endpoint::try_from("http://heddle-uds")
210 .map_err(ChannelError::EndpointBuild)?
211 .connect_timeout(connect_timeout);
212
213 let socket_path = target.socket_path.clone();
214 let connector = tower::service_fn(move |_uri: tonic::transport::Uri| {
215 let socket_path = socket_path.clone();
216 async move {
217 let stream = tokio::net::UnixStream::connect(&socket_path).await?;
218 // tonic 0.14 requires the connector's response type to
219 // implement `hyper::rt::{Read, Write}`. `TokioIo` is the
220 // standard adapter and it's what tonic's own UDS connector
221 // uses internally — see
222 // `tonic/src/transport/channel/uds_connector.rs`.
223 std::io::Result::Ok(hyper_util::rt::TokioIo::new(stream))
224 }
225 });
226
227 let channel = endpoint
228 .connect_with_connector(connector)
229 .await
230 .map_err(ChannelError::Connect)?;
231
232 // Health.Check is the version handshake. Today the local daemon
233 // doesn't install a `tonic_health` reporter, so we expect either
234 // `Ok(Serving)` or `Err(Unimplemented)` — the latter is treated
235 // as "channel works, daemon predates the handshake" and accepted.
236 // Any other error means the channel is wedged and we should fall
237 // back to in-process.
238 let mut health = tonic_health::pb::health_client::HealthClient::new(channel.clone());
239 let request = tonic::Request::new(tonic_health::pb::HealthCheckRequest {
240 // Empty service name → "is the whole server serving?" per the
241 // gRPC health protocol spec.
242 service: String::new(),
243 });
244 match tokio::time::timeout(connect_timeout, health.check(request)).await {
245 Ok(Ok(response)) => {
246 let status = response.into_inner().status;
247 if status == tonic_health::pb::health_check_response::ServingStatus::Serving as i32 {
248 Ok(LocalDaemonChannel { target, channel })
249 } else {
250 Err(ChannelError::HealthNotServing)
251 }
252 }
253 // Unimplemented: daemon doesn't ship Health (today's case).
254 // We still trust the connection — the underlying HTTP/2
255 // handshake succeeded above, which is itself a strong signal.
256 Ok(Err(status)) if status.code() == tonic::Code::Unimplemented => {
257 Ok(LocalDaemonChannel { target, channel })
258 }
259 Ok(Err(status)) => Err(ChannelError::HealthRpc(status)),
260 Err(_elapsed) => Err(ChannelError::HealthRpc(tonic::Status::deadline_exceeded(
261 "Health.Check timed out",
262 ))),
263 }
264}
265
266/// Errors from the channel-build path. Kept private to the module —
267/// callers see `Option<LocalDaemonChannel>` from
268/// [`connect_local_daemon_channel`] and treat `None` as "no daemon,
269/// fall through to in-process".
270#[cfg(unix)]
271#[derive(Debug)]
272#[allow(dead_code)]
273enum ChannelError {
274 /// Detect probe said no daemon (cheap negative case).
275 NoDaemon,
276 /// Tonic refused to build the endpoint URI. Programmer error in
277 /// practice, but we surface it for the test path.
278 EndpointBuild(tonic::transport::Error),
279 /// `connect_with_connector` failed — daemon not accepting.
280 Connect(tonic::transport::Error),
281 /// Health.Check round-trip failed (transport, codec, etc.).
282 HealthRpc(tonic::Status),
283 /// Health.Check came back with `NOT_SERVING`. We don't trust it.
284 HealthNotServing,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct LocalDaemonProbe {
289 pub socket_path: PathBuf,
290 pub pid_path: PathBuf,
291 pub status: LocalDaemonStatus,
292}
293
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub enum LocalDaemonStatus {
296 /// Socket and pidfile both exist and the pid is alive.
297 Running { pid: u32 },
298 /// Pidfile exists but the pid is dead. The socket may be a leftover.
299 Stale { pid: u32 },
300 /// No pidfile or socket.
301 Absent,
302}
303
304/// Probe the per-repo daemon directory. Cheap (two file stats + one
305/// `kill(pid, 0)`).
306pub fn probe(heddle_dir: &Path) -> LocalDaemonProbe {
307 let socket_path = heddle_dir.join("sockets").join("grpc.sock");
308 let pid_path = heddle_dir.join("sockets").join("grpc.pid");
309 let status = match read_pid(&pid_path) {
310 Some(pid) if pid_alive(pid) => LocalDaemonStatus::Running { pid },
311 Some(pid) => LocalDaemonStatus::Stale { pid },
312 None => LocalDaemonStatus::Absent,
313 };
314 LocalDaemonProbe {
315 socket_path,
316 pid_path,
317 status,
318 }
319}
320
321fn read_pid(path: &Path) -> Option<u32> {
322 // The hardened pidfile written by `daemon::local_daemon` has three
323 // lines: `<pid>\nheddle-agent\n<unix_secs>\n`. We only need the
324 // first line for liveness checks. Parse the leading line, falling
325 // back to the entire file (legacy single-line format) so older
326 // pidfiles still resolve.
327 let raw = std::fs::read_to_string(path).ok()?;
328 let first = raw.lines().next().unwrap_or("").trim();
329 first
330 .parse::<u32>()
331 .ok()
332 .or_else(|| raw.trim().parse::<u32>().ok())
333}
334
335#[cfg(unix)]
336fn pid_alive(pid: u32) -> bool {
337 // SAFETY: kill(pid, 0) only validates existence; signal 0 sends nothing.
338 unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
339}
340
341#[cfg(not(unix))]
342fn pid_alive(_pid: u32) -> bool {
343 false
344}
345
346#[cfg(test)]
347mod tests {
348 use tempfile::TempDir;
349
350 use super::*;
351
352 #[test]
353 fn absent_when_no_files() {
354 let temp = TempDir::new().unwrap();
355 let probe = probe(temp.path());
356 assert_eq!(probe.status, LocalDaemonStatus::Absent);
357 }
358
359 #[test]
360 fn stale_when_pidfile_holds_dead_pid() {
361 let temp = TempDir::new().unwrap();
362 let sockets = temp.path().join("sockets");
363 std::fs::create_dir_all(&sockets).unwrap();
364 // PID 2_147_483_646 is well beyond pid_max and not in use.
365 std::fs::write(sockets.join("grpc.pid"), "2147483646").unwrap();
366 let probe = probe(temp.path());
367 assert!(matches!(probe.status, LocalDaemonStatus::Stale { .. }));
368 }
369
370 #[test]
371 fn running_when_pidfile_holds_self_pid() {
372 let temp = TempDir::new().unwrap();
373 let sockets = temp.path().join("sockets");
374 std::fs::create_dir_all(&sockets).unwrap();
375 std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
376 let probe = probe(temp.path());
377 match probe.status {
378 LocalDaemonStatus::Running { pid } => assert_eq!(pid, std::process::id()),
379 other => panic!("expected Running, got {other:?}"),
380 }
381 }
382
383 #[test]
384 fn detect_returns_target_when_running() {
385 let temp = TempDir::new().unwrap();
386 let sockets = temp.path().join("sockets");
387 std::fs::create_dir_all(&sockets).unwrap();
388 std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
389 let target = detect_local_daemon(temp.path()).expect("daemon detected");
390 assert_eq!(target.pid, std::process::id());
391 assert!(
392 target.socket_path.ends_with("sockets/grpc.sock"),
393 "socket path was {:?}",
394 target.socket_path
395 );
396 }
397
398 #[test]
399 fn detect_returns_none_when_absent() {
400 let temp = TempDir::new().unwrap();
401 // A fresh temp dir with no `sockets/` subtree — probe returns
402 // Absent, detect collapses that to None.
403 assert!(detect_local_daemon(temp.path()).is_none());
404 }
405
406 #[cfg(unix)]
407 #[tokio::test]
408 async fn connect_probe_rejects_socketless_pidfile() {
409 // Pidfile points at our own pid (so the file-stat probe says
410 // `Running`), but no listener is bound to the socket path.
411 // The connect probe must catch this and return None — that's
412 // its whole job.
413 let temp = TempDir::new().unwrap();
414 let sockets = temp.path().join("sockets");
415 std::fs::create_dir_all(&sockets).unwrap();
416 std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
417 let result = detect_local_daemon_with_connect_probe(
418 temp.path(),
419 std::time::Duration::from_millis(50),
420 )
421 .await;
422 assert!(
423 result.is_none(),
424 "connect probe should reject when no listener is bound"
425 );
426 }
427
428 #[cfg(unix)]
429 #[tokio::test]
430 async fn connect_probe_accepts_live_listener() {
431 use tokio::net::UnixListener;
432 let temp = TempDir::new().unwrap();
433 let sockets = temp.path().join("sockets");
434 std::fs::create_dir_all(&sockets).unwrap();
435 let socket_path = sockets.join("grpc.sock");
436 let _listener = UnixListener::bind(&socket_path).unwrap();
437 std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
438 let result = detect_local_daemon_with_connect_probe(
439 temp.path(),
440 std::time::Duration::from_millis(200),
441 )
442 .await;
443 assert!(
444 result.is_some(),
445 "connect probe should succeed when a listener is bound"
446 );
447 }
448
449 #[cfg(unix)]
450 #[tokio::test]
451 async fn connect_channel_is_none_when_daemon_absent() {
452 // No pidfile, no socket — `connect_local_daemon_channel`
453 // should short-circuit on the detect probe and return None
454 // without attempting a connect.
455 let temp = TempDir::new().unwrap();
456 let result =
457 connect_local_daemon_channel(temp.path(), std::time::Duration::from_millis(50)).await;
458 assert!(result.is_none());
459 }
460}