Skip to main content

cli/client/
local_daemon.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-daemon auto-detection.
3//!
4//! Every gRPC-using verb in the CLI checks this first. When the per-repo
5//! `.heddle/sockets/grpc.sock` exists and the pidfile points at a live
6//! process, callers can route their RPC over the UDS instead of opening
7//! an in-process [`GrpcLocalService`]. The latency win matters for tight
8//! agent loops.
9//!
10//! Three layers:
11//!
12//! 1. [`detect_local_daemon`] — file-stat probe (pidfile + liveness via
13//!    `kill(pid, 0)`). Cheap, syscall-only, used as the cheap negative
14//!    case ("no daemon, fall through to in-process").
15//! 2. [`detect_local_daemon_with_connect_probe`] — same as (1) but
16//!    actually opens a `UnixStream` to confirm the listener accepts.
17//!    Catches the "stale socket file with a live unrelated PID" race.
18//! 3. [`connect_local_daemon_channel`] — full path: build a tonic
19//!    [`tonic::transport::Channel`] over the UDS, run the gRPC
20//!    `Health.Check` handshake, and cache the working channel for the
21//!    rest of the process. This is what the read-shaped CLI verbs
22//!    route through.
23//!
24//! All three caches are keyed by canonical heddle-dir path, so a CLI
25//! invocation that touches one repo pays the probe cost exactly once.
26
27use std::{
28    path::{Path, PathBuf},
29    time::Duration,
30};
31
32use crate::util::OnceMap;
33
34/// A reachable local daemon — the path of the UDS socket the caller
35/// can connect to. Returned by [`detect_local_daemon`] when the probe
36/// reports `Running`.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct UdsTarget {
39    pub socket_path: PathBuf,
40    pub pid: u32,
41}
42
43/// Cache key — canonical heddle-dir path so two probes from different
44/// CWDs against the same repo share a result.
45type ProbeCacheKey = PathBuf;
46
47/// Process-wide probe cache. Each heddle dir is probed at most once
48/// per process lifetime, after which subsequent calls return the
49/// cached `Option<UdsTarget>` without touching the filesystem.
50///
51/// Keyed by canonical heddle-dir path so a process that touches more
52/// than one repo (test binaries, agent dispatch loops) caches each
53/// repo independently.
54static DETECT_CACHE: OnceMap<ProbeCacheKey, Option<UdsTarget>> = OnceMap::new();
55
56/// Run the probe and, when the daemon is `Running`, return the UDS
57/// target a tonic client can dial. Cached for the process lifetime so
58/// hot agent loops don't pay two stat-syscalls per RPC.
59///
60/// Probing failure (`Absent`, `Stale`) returns `None` — the caller
61/// should fall through to its in-process or remote fallback. The
62/// full `Health.Check` + version handshake over a tonic UDS Channel
63/// layers on top of [`detect_local_daemon_with_connect_probe`] and
64/// [`connect_local_daemon_channel`].
65pub fn detect_local_daemon(heddle_dir: &Path) -> Option<UdsTarget> {
66    let key: ProbeCacheKey = heddle_dir.to_path_buf();
67    DETECT_CACHE.get_or_init_with(&key, || {
68        let probe = probe(heddle_dir);
69        match probe.status {
70            LocalDaemonStatus::Running { pid } => Some(UdsTarget {
71                socket_path: probe.socket_path,
72                pid,
73            }),
74            LocalDaemonStatus::Stale { .. } | LocalDaemonStatus::Absent => None,
75        }
76    })
77}
78
79/// Stronger variant of [`detect_local_daemon`] — runs the file-stat
80/// probe, then attempts a UDS connect to confirm the daemon is
81/// actually accepting connections (not just a stale pidfile that
82/// happens to point at a live unrelated process).
83///
84/// The connect step is intentionally bounded by `timeout`. Default
85/// callers should pass something tight (50ms is plenty for a local
86/// socket) so a hung daemon doesn't stall every CLI invocation.
87///
88/// Returns `None` if either the file-stat probe says `Absent`/`Stale`
89/// or the UDS connect fails / times out. Caches the *first* outcome
90/// for the given heddle dir; subsequent calls are O(1).
91#[cfg(unix)]
92pub async fn detect_local_daemon_with_connect_probe(
93    heddle_dir: &Path,
94    timeout: Duration,
95) -> Option<UdsTarget> {
96    // The file-stat probe handles the cache and the obvious negative
97    // cases; only the positive path needs the live connect.
98    let target = detect_local_daemon(heddle_dir)?;
99    match tokio::time::timeout(
100        timeout,
101        tokio::net::UnixStream::connect(&target.socket_path),
102    )
103    .await
104    {
105        Ok(Ok(_stream)) => Some(target),
106        // Either the connect errored (socket present but listener
107        // dead — rare but possible during a graceful shutdown) or
108        // the connect timed out (daemon hung). Either way it's not
109        // safe to route RPCs through it. The next probe will retry.
110        Ok(Err(_)) | Err(_) => None,
111    }
112}
113
114/// Process-wide cache of working tonic [`tonic::transport::Channel`]s
115/// keyed by canonical heddle-dir path. Once we've successfully passed
116/// the Health.Check handshake, every subsequent caller in the same
117/// process gets the same channel (which is itself internally pooled
118/// by tonic / hyper).
119///
120/// `Channel` is cheap to clone (it's a handle to the underlying
121/// connection pool), so handing out clones is the fastest way to
122/// amortize the connect cost across a hot agent loop. Keyed by
123/// canonical heddle-dir path so a process touching multiple repos
124/// (test binaries, multi-repo agents) keeps a per-repo channel.
125#[cfg(unix)]
126static CHANNEL_CACHE: OnceMap<ProbeCacheKey, tonic::transport::Channel> = OnceMap::new();
127
128/// Connect-and-handshake outcome for [`connect_local_daemon_channel`].
129///
130/// `target` is repeated for the convenience of callers that only need
131/// to know whether a daemon is reachable; the live `channel` is what
132/// actually issues RPCs.
133#[cfg(unix)]
134#[derive(Debug, Clone)]
135pub struct LocalDaemonChannel {
136    pub target: UdsTarget,
137    pub channel: tonic::transport::Channel,
138}
139
140/// Build a tonic [`tonic::transport::Channel`] over the per-repo UDS,
141/// perform the gRPC `Health.Check` handshake, and return both
142/// alongside the [`UdsTarget`].
143///
144/// On the happy path the channel is cached in [`CHANNEL_CACHE`] for
145/// the lifetime of this process and subsequent calls return clones of
146/// it in O(1). This is the path agent loops should use.
147///
148/// `connect_timeout` bounds the UDS connect *and* the Health.Check —
149/// 50–250ms is appropriate for a same-host socket; anything longer
150/// implies a hung daemon. Returns `None` on any failure mode (no
151/// daemon, connect failed, health refused) — the caller falls through
152/// to its in-process or remote fallback.
153///
154/// # First consumer (TODO)
155///
156/// `cmd_status` is the natural first consumer — its read-shaped output
157/// is built from a handful of `Repository` lookups that the
158/// `OperationLogQueryService` already covers. A future patch should
159/// branch in `crates/cli/src/cli/commands/status.rs::cmd_status` like:
160///
161/// ```ignore
162/// if let Some(LocalDaemonChannel { channel, .. }) =
163///     connect_local_daemon_channel(repo.heddle_dir(), Duration::from_millis(150)).await
164/// {
165///     let mut client = OperationLogQueryServiceClient::new(channel);
166///     // Build StatusOutput from RPCs instead of direct Repository reads.
167/// } else {
168///     // Existing in-process path.
169/// }
170/// ```
171///
172/// Held back from this patch because (a) the query surface doesn't
173/// yet cover every field in `StatusOutput`, and (b) the brief calls
174/// out the channel-construction primitive as the deliverable.
175#[cfg(unix)]
176pub async fn connect_local_daemon_channel(
177    heddle_dir: &Path,
178    connect_timeout: Duration,
179) -> Option<LocalDaemonChannel> {
180    let key: ProbeCacheKey = heddle_dir.to_path_buf();
181    if let Some(channel) = CHANNEL_CACHE.get(&key) {
182        // The detect cache holds the matching target — pull it back
183        // out so the returned struct stays self-contained.
184        let target = detect_local_daemon(heddle_dir)?;
185        return Some(LocalDaemonChannel { target, channel });
186    }
187
188    match build_channel(heddle_dir, connect_timeout).await {
189        Ok(LocalDaemonChannel { target, channel }) => {
190            CHANNEL_CACHE.insert(key, channel.clone());
191            Some(LocalDaemonChannel { target, channel })
192        }
193        Err(_) => None,
194    }
195}
196
197#[cfg(unix)]
198async fn build_channel(
199    heddle_dir: &Path,
200    connect_timeout: Duration,
201) -> std::result::Result<LocalDaemonChannel, ChannelError> {
202    let target = detect_local_daemon(heddle_dir).ok_or(ChannelError::NoDaemon)?;
203    // `unix:` URIs aren't usable as the *origin* on a HTTP/2 channel
204    // (the authority pseudo-header has to be a plausible host). The
205    // standard tonic UDS recipe is to give the endpoint an opaque
206    // `http://heddle-uds` URI for routing and override the connector
207    // with a service that returns a `UnixStream` regardless of what
208    // URI it's asked for.
209    let endpoint = tonic::transport::Endpoint::try_from("http://heddle-uds")
210        .map_err(ChannelError::EndpointBuild)?
211        .connect_timeout(connect_timeout);
212
213    let socket_path = target.socket_path.clone();
214    let connector = tower::service_fn(move |_uri: tonic::transport::Uri| {
215        let socket_path = socket_path.clone();
216        async move {
217            let stream = tokio::net::UnixStream::connect(&socket_path).await?;
218            // tonic 0.14 requires the connector's response type to
219            // implement `hyper::rt::{Read, Write}`. `TokioIo` is the
220            // standard adapter and it's what tonic's own UDS connector
221            // uses internally — see
222            // `tonic/src/transport/channel/uds_connector.rs`.
223            std::io::Result::Ok(hyper_util::rt::TokioIo::new(stream))
224        }
225    });
226
227    let channel = endpoint
228        .connect_with_connector(connector)
229        .await
230        .map_err(ChannelError::Connect)?;
231
232    // Health.Check is the version handshake. Today the local daemon
233    // doesn't install a `tonic_health` reporter, so we expect either
234    // `Ok(Serving)` or `Err(Unimplemented)` — the latter is treated
235    // as "channel works, daemon predates the handshake" and accepted.
236    // Any other error means the channel is wedged and we should fall
237    // back to in-process.
238    let mut health = tonic_health::pb::health_client::HealthClient::new(channel.clone());
239    let request = tonic::Request::new(tonic_health::pb::HealthCheckRequest {
240        // Empty service name → "is the whole server serving?" per the
241        // gRPC health protocol spec.
242        service: String::new(),
243    });
244    match tokio::time::timeout(connect_timeout, health.check(request)).await {
245        Ok(Ok(response)) => {
246            let status = response.into_inner().status;
247            if status == tonic_health::pb::health_check_response::ServingStatus::Serving as i32 {
248                Ok(LocalDaemonChannel { target, channel })
249            } else {
250                Err(ChannelError::HealthNotServing)
251            }
252        }
253        // Unimplemented: daemon doesn't ship Health (today's case).
254        // We still trust the connection — the underlying HTTP/2
255        // handshake succeeded above, which is itself a strong signal.
256        Ok(Err(status)) if status.code() == tonic::Code::Unimplemented => {
257            Ok(LocalDaemonChannel { target, channel })
258        }
259        Ok(Err(status)) => Err(ChannelError::HealthRpc(status)),
260        Err(_elapsed) => Err(ChannelError::HealthRpc(tonic::Status::deadline_exceeded(
261            "Health.Check timed out",
262        ))),
263    }
264}
265
266/// Errors from the channel-build path. Kept private to the module —
267/// callers see `Option<LocalDaemonChannel>` from
268/// [`connect_local_daemon_channel`] and treat `None` as "no daemon,
269/// fall through to in-process".
270#[cfg(unix)]
271#[derive(Debug)]
272#[allow(dead_code)]
273enum ChannelError {
274    /// Detect probe said no daemon (cheap negative case).
275    NoDaemon,
276    /// Tonic refused to build the endpoint URI. Programmer error in
277    /// practice, but we surface it for the test path.
278    EndpointBuild(tonic::transport::Error),
279    /// `connect_with_connector` failed — daemon not accepting.
280    Connect(tonic::transport::Error),
281    /// Health.Check round-trip failed (transport, codec, etc.).
282    HealthRpc(tonic::Status),
283    /// Health.Check came back with `NOT_SERVING`. We don't trust it.
284    HealthNotServing,
285}
286
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct LocalDaemonProbe {
289    pub socket_path: PathBuf,
290    pub pid_path: PathBuf,
291    pub status: LocalDaemonStatus,
292}
293
294#[derive(Debug, Clone, PartialEq, Eq)]
295pub enum LocalDaemonStatus {
296    /// Socket and pidfile both exist and the pid is alive.
297    Running { pid: u32 },
298    /// Pidfile exists but the pid is dead. The socket may be a leftover.
299    Stale { pid: u32 },
300    /// No pidfile or socket.
301    Absent,
302}
303
304/// Probe the per-repo daemon directory. Cheap (two file stats + one
305/// `kill(pid, 0)`).
306pub fn probe(heddle_dir: &Path) -> LocalDaemonProbe {
307    let socket_path = heddle_dir.join("sockets").join("grpc.sock");
308    let pid_path = heddle_dir.join("sockets").join("grpc.pid");
309    let status = match read_pid(&pid_path) {
310        Some(pid) if pid_alive(pid) => LocalDaemonStatus::Running { pid },
311        Some(pid) => LocalDaemonStatus::Stale { pid },
312        None => LocalDaemonStatus::Absent,
313    };
314    LocalDaemonProbe {
315        socket_path,
316        pid_path,
317        status,
318    }
319}
320
321fn read_pid(path: &Path) -> Option<u32> {
322    // The hardened pidfile written by `daemon::local_daemon` has three
323    // lines: `<pid>\nheddle-agent\n<unix_secs>\n`. We only need the
324    // first line for liveness checks. Parse the leading line, falling
325    // back to the entire file (legacy single-line format) so older
326    // pidfiles still resolve.
327    let raw = std::fs::read_to_string(path).ok()?;
328    let first = raw.lines().next().unwrap_or("").trim();
329    first
330        .parse::<u32>()
331        .ok()
332        .or_else(|| raw.trim().parse::<u32>().ok())
333}
334
335#[cfg(unix)]
336fn pid_alive(pid: u32) -> bool {
337    // SAFETY: kill(pid, 0) only validates existence; signal 0 sends nothing.
338    unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
339}
340
341#[cfg(not(unix))]
342fn pid_alive(_pid: u32) -> bool {
343    false
344}
345
346#[cfg(test)]
347mod tests {
348    use tempfile::TempDir;
349
350    use super::*;
351
352    #[test]
353    fn absent_when_no_files() {
354        let temp = TempDir::new().unwrap();
355        let probe = probe(temp.path());
356        assert_eq!(probe.status, LocalDaemonStatus::Absent);
357    }
358
359    #[test]
360    fn stale_when_pidfile_holds_dead_pid() {
361        let temp = TempDir::new().unwrap();
362        let sockets = temp.path().join("sockets");
363        std::fs::create_dir_all(&sockets).unwrap();
364        // PID 2_147_483_646 is well beyond pid_max and not in use.
365        std::fs::write(sockets.join("grpc.pid"), "2147483646").unwrap();
366        let probe = probe(temp.path());
367        assert!(matches!(probe.status, LocalDaemonStatus::Stale { .. }));
368    }
369
370    #[test]
371    fn running_when_pidfile_holds_self_pid() {
372        let temp = TempDir::new().unwrap();
373        let sockets = temp.path().join("sockets");
374        std::fs::create_dir_all(&sockets).unwrap();
375        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
376        let probe = probe(temp.path());
377        match probe.status {
378            LocalDaemonStatus::Running { pid } => assert_eq!(pid, std::process::id()),
379            other => panic!("expected Running, got {other:?}"),
380        }
381    }
382
383    #[test]
384    fn detect_returns_target_when_running() {
385        let temp = TempDir::new().unwrap();
386        let sockets = temp.path().join("sockets");
387        std::fs::create_dir_all(&sockets).unwrap();
388        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
389        let target = detect_local_daemon(temp.path()).expect("daemon detected");
390        assert_eq!(target.pid, std::process::id());
391        assert!(
392            target.socket_path.ends_with("sockets/grpc.sock"),
393            "socket path was {:?}",
394            target.socket_path
395        );
396    }
397
398    #[test]
399    fn detect_returns_none_when_absent() {
400        let temp = TempDir::new().unwrap();
401        // A fresh temp dir with no `sockets/` subtree — probe returns
402        // Absent, detect collapses that to None.
403        assert!(detect_local_daemon(temp.path()).is_none());
404    }
405
406    #[cfg(unix)]
407    #[tokio::test]
408    async fn connect_probe_rejects_socketless_pidfile() {
409        // Pidfile points at our own pid (so the file-stat probe says
410        // `Running`), but no listener is bound to the socket path.
411        // The connect probe must catch this and return None — that's
412        // its whole job.
413        let temp = TempDir::new().unwrap();
414        let sockets = temp.path().join("sockets");
415        std::fs::create_dir_all(&sockets).unwrap();
416        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
417        let result = detect_local_daemon_with_connect_probe(
418            temp.path(),
419            std::time::Duration::from_millis(50),
420        )
421        .await;
422        assert!(
423            result.is_none(),
424            "connect probe should reject when no listener is bound"
425        );
426    }
427
428    #[cfg(unix)]
429    #[tokio::test]
430    async fn connect_probe_accepts_live_listener() {
431        use tokio::net::UnixListener;
432        let temp = TempDir::new().unwrap();
433        let sockets = temp.path().join("sockets");
434        std::fs::create_dir_all(&sockets).unwrap();
435        let socket_path = sockets.join("grpc.sock");
436        let _listener = UnixListener::bind(&socket_path).unwrap();
437        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
438        let result = detect_local_daemon_with_connect_probe(
439            temp.path(),
440            std::time::Duration::from_millis(200),
441        )
442        .await;
443        assert!(
444            result.is_some(),
445            "connect probe should succeed when a listener is bound"
446        );
447    }
448
449    #[cfg(unix)]
450    #[tokio::test]
451    async fn connect_channel_is_none_when_daemon_absent() {
452        // No pidfile, no socket — `connect_local_daemon_channel`
453        // should short-circuit on the detect probe and return None
454        // without attempting a connect.
455        let temp = TempDir::new().unwrap();
456        let result =
457            connect_local_daemon_channel(temp.path(), std::time::Duration::from_millis(50)).await;
458        assert!(result.is_none());
459    }
460}