Skip to main content

cli/client/
local_daemon.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Local-daemon auto-detection.
3//!
4//! Every gRPC-using verb in the CLI checks this first. When the per-repo
5//! `.heddle/sockets/grpc.sock` exists and the pidfile points at a live
6//! process, callers can route their RPC over the UDS instead of opening
7//! an in-process [`GrpcLocalService`]. The latency win matters for tight
8//! agent loops.
9//!
10//! Three layers:
11//!
12//! 1. [`detect_local_daemon`] — file-stat probe (pidfile + liveness via
13//!    `kill(pid, 0)` + same-executable identity). Cheap, syscall-only,
14//!    used as the cheap negative case ("no daemon, fall through to
15//!    in-process").
16//! 2. [`detect_local_daemon_with_connect_probe`] — same as (1) but
17//!    actually opens a `UnixStream` and checks kernel-reported peer
18//!    credentials to confirm the listener is owned by our uid. Catches
19//!    the "stale socket file with a live unrelated PID" race.
20//! 3. [`connect_local_daemon_channel`] — full path: build a tonic
21//!    [`tonic::transport::Channel`] over the UDS, run the gRPC
22//!    `Health.Check` handshake, and cache the working channel for the
23//!    rest of the process. This is what the read-shaped CLI verbs
24//!    route through.
25//!
26//! All three caches are keyed by canonical heddle-dir path, so a CLI
27//! invocation that touches one repo pays the probe cost exactly once.
28
29use std::{
30    io,
31    path::{Path, PathBuf},
32    sync::{
33        Arc,
34        atomic::{AtomicU64, Ordering},
35    },
36    time::Duration,
37};
38
39#[cfg(unix)]
40use daemon::local_daemon::is_heddle_process;
41
42use crate::util::OnceMap;
43
44/// A reachable local daemon — the path of the UDS socket the caller
45/// can connect to. Returned by [`detect_local_daemon`] when the probe
46/// reports `Running`.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct UdsTarget {
49    pub socket_path: PathBuf,
50    pub pid: u32,
51}
52
53/// Cache key — canonical heddle-dir path so two probes from different
54/// CWDs against the same repo share a result.
55type ProbeCacheKey = PathBuf;
56
57/// Process-wide probe cache. Each heddle dir is probed at most once
58/// per process lifetime, after which subsequent calls return the
59/// cached `Option<UdsTarget>` without touching the filesystem.
60///
61/// Keyed by canonical heddle-dir path so a process that touches more
62/// than one repo (test binaries, agent dispatch loops) caches each
63/// repo independently.
64static DETECT_CACHE: OnceMap<ProbeCacheKey, Option<UdsTarget>> = OnceMap::new();
65
66/// Run the probe and, when the daemon is `Running`, return the UDS
67/// target a tonic client can dial. Cached for the process lifetime so
68/// hot agent loops don't pay two stat-syscalls per RPC.
69///
70/// Probing failure (`Absent`, `Stale`) returns `None` — the caller
71/// should fall through to its in-process or remote fallback. The
72/// full `Health.Check` + version handshake over a tonic UDS Channel
73/// layers on top of [`detect_local_daemon_with_connect_probe`] and
74/// [`connect_local_daemon_channel`].
75pub fn detect_local_daemon(heddle_dir: &Path) -> Option<UdsTarget> {
76    let key: ProbeCacheKey = heddle_dir.to_path_buf();
77    DETECT_CACHE.get_or_init_with(&key, || {
78        let probe = probe(heddle_dir);
79        match probe.status {
80            LocalDaemonStatus::Running { pid } => Some(UdsTarget {
81                socket_path: probe.socket_path,
82                pid,
83            }),
84            LocalDaemonStatus::Stale { .. } | LocalDaemonStatus::Absent => None,
85        }
86    })
87}
88
89/// Stronger variant of [`detect_local_daemon`] — runs the file-stat
90/// probe, then attempts a UDS connect to confirm the daemon is
91/// actually accepting connections (not just a stale pidfile that
92/// happens to point at a live unrelated process).
93///
94/// The connect step is intentionally bounded by `timeout`. Default
95/// callers should pass something tight (50ms is plenty for a local
96/// socket) so a hung daemon doesn't stall every CLI invocation.
97///
98/// Returns `None` if either the file-stat probe says `Absent`/`Stale`
99/// or the UDS connect fails / times out. Caches the *first* outcome
100/// for the given heddle dir; subsequent calls are O(1).
101#[cfg(unix)]
102pub async fn detect_local_daemon_with_connect_probe(
103    heddle_dir: &Path,
104    timeout: Duration,
105) -> Option<UdsTarget> {
106    // The file-stat probe handles the cache and the obvious negative
107    // cases; only the positive path needs the live connect.
108    let target = detect_local_daemon(heddle_dir)?;
109    match tokio::time::timeout(
110        timeout,
111        tokio::net::UnixStream::connect(&target.socket_path),
112    )
113    .await
114    {
115        Ok(Ok(stream)) => match check_peer_uid_matches_self(&stream) {
116            Ok(()) => Some(target),
117            Err(_) => None,
118        },
119        // Either the connect errored (socket present but listener
120        // dead — rare but possible during a graceful shutdown) or
121        // the connect timed out (daemon hung). Either way it's not
122        // safe to route RPCs through it. The next probe will retry.
123        Ok(Err(_)) | Err(_) => None,
124    }
125}
126
127/// Process-wide cache of working tonic [`tonic::transport::Channel`]s
128/// keyed by canonical heddle-dir path. Once we've successfully passed
129/// the Health.Check handshake, every subsequent caller in the same
130/// process gets the same channel (which is itself internally pooled
131/// by tonic / hyper).
132///
133/// `Channel` is cheap to clone (it's a handle to the underlying
134/// connection pool), so handing out clones is the fastest way to
135/// amortize the connect cost across a hot agent loop. Keyed by
136/// canonical heddle-dir path so a process touching multiple repos
137/// (test binaries, multi-repo agents) keeps a per-repo channel.
138#[cfg(unix)]
139static CHANNEL_CACHE: OnceMap<ProbeCacheKey, tonic::transport::Channel> = OnceMap::new();
140
141/// Per-heddle-dir count of how many times [`build_channel`] has
142/// actually opened a UDS, constructed a tonic channel, and run the
143/// `Health.Check` handshake. A cache hit in
144/// [`connect_local_daemon_channel`] returns a clone of the cached
145/// channel without entering `build_channel`, so it does not bump this
146/// counter.
147///
148/// Keyed per heddle dir (not process-global) so the count is
149/// deterministic regardless of which other tests run concurrently.
150///
151/// Test-support only — lets the serve_local test assert that the second
152/// `connect_local_daemon_channel` is a true O(1) cache hit (no rebuild)
153/// instead of comparing wall-clock time, which flakes on shared CI
154/// runners (issue #722).
155#[cfg(unix)]
156#[doc(hidden)]
157static CHANNEL_BUILD_COUNT: OnceMap<ProbeCacheKey, Arc<AtomicU64>> = OnceMap::new();
158
159/// Read the [`build_channel`] run count for `heddle_dir`. See
160/// [`CHANNEL_BUILD_COUNT`]. A dir whose channel was never built reads 0.
161#[cfg(unix)]
162#[doc(hidden)]
163pub fn channel_build_count(heddle_dir: &Path) -> u64 {
164    CHANNEL_BUILD_COUNT
165        .get(&heddle_dir.to_path_buf())
166        .map(|c| c.load(Ordering::Relaxed))
167        .unwrap_or(0)
168}
169
170/// Connect-and-handshake outcome for [`connect_local_daemon_channel`].
171///
172/// `target` is repeated for the convenience of callers that only need
173/// to know whether a daemon is reachable; the live `channel` is what
174/// actually issues RPCs.
175#[cfg(unix)]
176#[derive(Debug, Clone)]
177pub struct LocalDaemonChannel {
178    pub target: UdsTarget,
179    pub channel: tonic::transport::Channel,
180}
181
182/// Build a tonic [`tonic::transport::Channel`] over the per-repo UDS,
183/// perform the gRPC `Health.Check` handshake, and return both
184/// alongside the [`UdsTarget`].
185///
186/// On the happy path the channel is cached in [`CHANNEL_CACHE`] for
187/// the lifetime of this process and subsequent calls return clones of
188/// it in O(1). This is the path agent loops should use.
189///
190/// `connect_timeout` bounds the UDS connect *and* the Health.Check —
191/// 50–250ms is appropriate for a same-host socket; anything longer
192/// implies a hung daemon. Returns `None` on any failure mode (no
193/// daemon, connect failed, health refused) — the caller falls through
194/// to its in-process or remote fallback.
195///
196/// # First consumer (TODO)
197///
198/// `cmd_status` is the natural first consumer — its read-shaped output
199/// is built from a handful of `Repository` lookups that the
200/// `OperationLogQueryService` already covers. A future patch should
201/// branch in `crates/cli/src/cli/commands/status.rs::cmd_status` like:
202///
203/// ```ignore
204/// if let Some(LocalDaemonChannel { channel, .. }) =
205///     connect_local_daemon_channel(repo.heddle_dir(), Duration::from_millis(150)).await
206/// {
207///     let mut client = OperationLogQueryServiceClient::new(channel);
208///     // Build StatusOutput from RPCs instead of direct Repository reads.
209/// } else {
210///     // Existing in-process path.
211/// }
212/// ```
213///
214/// Held back from this patch because (a) the query surface doesn't
215/// yet cover every field in `StatusOutput`, and (b) the brief calls
216/// out the channel-construction primitive as the deliverable.
217#[cfg(unix)]
218pub async fn connect_local_daemon_channel(
219    heddle_dir: &Path,
220    connect_timeout: Duration,
221) -> Option<LocalDaemonChannel> {
222    let key: ProbeCacheKey = heddle_dir.to_path_buf();
223    if let Some(channel) = CHANNEL_CACHE.get(&key) {
224        // The detect cache holds the matching target — pull it back
225        // out so the returned struct stays self-contained.
226        let target = detect_local_daemon(heddle_dir)?;
227        return Some(LocalDaemonChannel { target, channel });
228    }
229
230    match build_channel(heddle_dir, connect_timeout).await {
231        Ok(LocalDaemonChannel { target, channel }) => {
232            CHANNEL_CACHE.insert(key, channel.clone());
233            Some(LocalDaemonChannel { target, channel })
234        }
235        Err(_) => None,
236    }
237}
238
239#[cfg(unix)]
240async fn build_channel(
241    heddle_dir: &Path,
242    connect_timeout: Duration,
243) -> std::result::Result<LocalDaemonChannel, ChannelError> {
244    CHANNEL_BUILD_COUNT
245        .get_or_init_with(&heddle_dir.to_path_buf(), || Arc::new(AtomicU64::new(0)))
246        .fetch_add(1, Ordering::Relaxed);
247    let target = detect_local_daemon(heddle_dir).ok_or(ChannelError::NoDaemon)?;
248    // `unix:` URIs aren't usable as the *origin* on a HTTP/2 channel
249    // (the authority pseudo-header has to be a plausible host). The
250    // standard tonic UDS recipe is to give the endpoint an opaque
251    // `http://heddle-uds` URI for routing and override the connector
252    // with a service that returns a `UnixStream` regardless of what
253    // URI it's asked for.
254    let endpoint = tonic::transport::Endpoint::try_from("http://heddle-uds")
255        .map_err(ChannelError::EndpointBuild)?
256        .connect_timeout(connect_timeout);
257
258    let socket_path = target.socket_path.clone();
259    let connector = tower::service_fn(move |_uri: tonic::transport::Uri| {
260        let socket_path = socket_path.clone();
261        async move {
262            let stream = tokio::net::UnixStream::connect(&socket_path).await?;
263            check_peer_uid_matches_self(&stream)?;
264            // tonic 0.14 requires the connector's response type to
265            // implement `hyper::rt::{Read, Write}`. `TokioIo` is the
266            // standard adapter and it's what tonic's own UDS connector
267            // uses internally — see
268            // `tonic/src/transport/channel/uds_connector.rs`.
269            std::io::Result::Ok(hyper_util::rt::TokioIo::new(stream))
270        }
271    });
272
273    let channel = endpoint
274        .connect_with_connector(connector)
275        .await
276        .map_err(ChannelError::Connect)?;
277
278    // Health.Check is the version handshake. Today the local daemon
279    // doesn't install a `tonic_health` reporter, so we expect either
280    // `Ok(Serving)` or `Err(Unimplemented)` — the latter is treated
281    // as "channel works, daemon predates the handshake" and accepted.
282    // Any other error means the channel is wedged and we should fall
283    // back to in-process.
284    let mut health = tonic_health::pb::health_client::HealthClient::new(channel.clone());
285    let request = tonic::Request::new(tonic_health::pb::HealthCheckRequest {
286        // Empty service name → "is the whole server serving?" per the
287        // gRPC health protocol spec.
288        service: String::new(),
289    });
290    match tokio::time::timeout(connect_timeout, health.check(request)).await {
291        Ok(Ok(response)) => {
292            let status = response.into_inner().status;
293            if status == tonic_health::pb::health_check_response::ServingStatus::Serving as i32 {
294                Ok(LocalDaemonChannel { target, channel })
295            } else {
296                Err(ChannelError::HealthNotServing)
297            }
298        }
299        // Unimplemented: daemon doesn't ship Health (today's case).
300        // We still trust the connection — the underlying HTTP/2
301        // handshake succeeded above, which is itself a strong signal.
302        Ok(Err(status)) if status.code() == tonic::Code::Unimplemented => {
303            Ok(LocalDaemonChannel { target, channel })
304        }
305        Ok(Err(status)) => Err(ChannelError::HealthRpc(status)),
306        Err(_elapsed) => Err(ChannelError::HealthRpc(tonic::Status::deadline_exceeded(
307            "Health.Check timed out",
308        ))),
309    }
310}
311
312/// Errors from the channel-build path. Kept private to the module —
313/// callers see `Option<LocalDaemonChannel>` from
314/// [`connect_local_daemon_channel`] and treat `None` as "no daemon,
315/// fall through to in-process".
316#[cfg(unix)]
317#[derive(Debug)]
318#[allow(dead_code)]
319enum ChannelError {
320    /// Detect probe said no daemon (cheap negative case).
321    NoDaemon,
322    /// Tonic refused to build the endpoint URI. Programmer error in
323    /// practice, but we surface it for the test path.
324    EndpointBuild(tonic::transport::Error),
325    /// `connect_with_connector` failed — daemon not accepting.
326    Connect(tonic::transport::Error),
327    /// Health.Check round-trip failed (transport, codec, etc.).
328    HealthRpc(tonic::Status),
329    /// Health.Check came back with `NOT_SERVING`. We don't trust it.
330    HealthNotServing,
331}
332
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct LocalDaemonProbe {
335    pub socket_path: PathBuf,
336    pub pid_path: PathBuf,
337    pub status: LocalDaemonStatus,
338}
339
340#[derive(Debug, Clone, PartialEq, Eq)]
341pub enum LocalDaemonStatus {
342    /// Socket and pidfile both exist, the pid is alive, and the pid resolves
343    /// to the same executable as this client.
344    Running { pid: u32 },
345    /// Pidfile exists but the pid is dead. The socket may be a leftover.
346    Stale { pid: u32 },
347    /// No pidfile or socket.
348    Absent,
349}
350
351/// Per-heddle-dir count of how many times [`probe`] has actually run
352/// the file-stat / liveness syscalls (the "cold setup step"). The
353/// `DETECT_CACHE` lets the warm path skip this: a warm
354/// [`detect_local_daemon`] hit returns the cached `Option<UdsTarget>`
355/// without entering `probe` at all, so it does not bump this counter.
356///
357/// Keyed per heddle dir (not process-global) so a test reading the
358/// count for *its* daemon is unaffected by probes that a concurrently
359/// running test issues against a *different* temp dir — the counter is
360/// deterministic regardless of test parallelism.
361///
362/// Test-support only — lets the serve_local test assert the *mechanism*
363/// (warm path skips the probe) instead of comparing wall-clock time,
364/// which flakes on shared CI runners (issue #722).
365#[doc(hidden)]
366static PROBE_RUN_COUNT: OnceMap<ProbeCacheKey, Arc<AtomicU64>> = OnceMap::new();
367
368/// Read the [`probe`] run count for `heddle_dir`. See
369/// [`PROBE_RUN_COUNT`]. A never-probed dir reads 0.
370#[doc(hidden)]
371pub fn probe_run_count(heddle_dir: &Path) -> u64 {
372    PROBE_RUN_COUNT
373        .get(&heddle_dir.to_path_buf())
374        .map(|c| c.load(Ordering::Relaxed))
375        .unwrap_or(0)
376}
377
378/// Probe the per-repo daemon directory. Cheap (two file stats, `kill(pid, 0)`,
379/// and same-executable identity for live pids).
380pub fn probe(heddle_dir: &Path) -> LocalDaemonProbe {
381    PROBE_RUN_COUNT
382        .get_or_init_with(&heddle_dir.to_path_buf(), || Arc::new(AtomicU64::new(0)))
383        .fetch_add(1, Ordering::Relaxed);
384    let socket_path = heddle_dir.join("sockets").join("grpc.sock");
385    let pid_path = heddle_dir.join("sockets").join("grpc.pid");
386    let status = match read_pid(&pid_path) {
387        Some(pid) if pid_alive(pid) && pid_identity_verified(pid) => {
388            LocalDaemonStatus::Running { pid }
389        }
390        Some(pid) => LocalDaemonStatus::Stale { pid },
391        None => LocalDaemonStatus::Absent,
392    };
393    LocalDaemonProbe {
394        socket_path,
395        pid_path,
396        status,
397    }
398}
399
400fn read_pid(path: &Path) -> Option<u32> {
401    // The hardened pidfile written by `daemon::local_daemon` has three
402    // lines: `<pid>\nheddle-agent\n<unix_secs>\n`. We only need the
403    // first line for liveness checks. Parse the leading line, falling
404    // back to the entire file (legacy single-line format) so older
405    // pidfiles still resolve.
406    let raw = std::fs::read_to_string(path).ok()?;
407    let first = raw.lines().next().unwrap_or("").trim();
408    first
409        .parse::<u32>()
410        .ok()
411        .or_else(|| raw.trim().parse::<u32>().ok())
412}
413
414#[cfg(unix)]
415fn pid_identity_verified(pid: u32) -> bool {
416    let Ok(pid) = i32::try_from(pid) else {
417        return false;
418    };
419    is_heddle_process(pid)
420}
421
422#[cfg(not(unix))]
423fn pid_identity_verified(_pid: u32) -> bool {
424    false
425}
426
427#[cfg(unix)]
428fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> io::Result<()> {
429    let creds = stream.peer_cred()?;
430    // SAFETY: geteuid() never fails.
431    enforce_peer_uid(creds.uid(), unsafe { libc::geteuid() })
432}
433
434#[cfg(unix)]
435fn enforce_peer_uid(peer_uid: u32, our_uid: u32) -> io::Result<()> {
436    if peer_uid != our_uid {
437        return Err(io::Error::new(
438            io::ErrorKind::PermissionDenied,
439            format!("daemon peer uid {peer_uid} does not match client uid {our_uid}"),
440        ));
441    }
442    Ok(())
443}
444
445#[cfg(unix)]
446fn pid_alive(pid: u32) -> bool {
447    // SAFETY: kill(pid, 0) only validates existence; signal 0 sends nothing.
448    unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
449}
450
451#[cfg(not(unix))]
452fn pid_alive(_pid: u32) -> bool {
453    false
454}
455
456#[cfg(test)]
457mod tests {
458    use tempfile::TempDir;
459
460    use super::*;
461
462    #[test]
463    fn absent_when_no_files() {
464        let temp = TempDir::new().unwrap();
465        let probe = probe(temp.path());
466        assert_eq!(probe.status, LocalDaemonStatus::Absent);
467    }
468
469    #[test]
470    fn stale_when_pidfile_holds_dead_pid() {
471        let temp = TempDir::new().unwrap();
472        let sockets = temp.path().join("sockets");
473        std::fs::create_dir_all(&sockets).unwrap();
474        // PID 2_147_483_646 is well beyond pid_max and not in use.
475        std::fs::write(sockets.join("grpc.pid"), "2147483646").unwrap();
476        let probe = probe(temp.path());
477        assert!(matches!(probe.status, LocalDaemonStatus::Stale { .. }));
478    }
479
480    #[test]
481    fn running_when_pidfile_holds_self_pid() {
482        let temp = TempDir::new().unwrap();
483        let sockets = temp.path().join("sockets");
484        std::fs::create_dir_all(&sockets).unwrap();
485        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
486        let probe = probe(temp.path());
487        match probe.status {
488            LocalDaemonStatus::Running { pid } => assert_eq!(pid, std::process::id()),
489            other => panic!("expected Running, got {other:?}"),
490        }
491    }
492
493    #[cfg(unix)]
494    #[test]
495    fn stale_when_pidfile_holds_live_non_heddle_pid() {
496        let mut child = std::process::Command::new("/bin/sleep")
497            .arg("30")
498            .env_clear()
499            .spawn()
500            .expect("spawn sleep");
501        let temp = TempDir::new().unwrap();
502        let sockets = temp.path().join("sockets");
503        std::fs::create_dir_all(&sockets).unwrap();
504        std::fs::write(sockets.join("grpc.pid"), child.id().to_string()).unwrap();
505
506        let probe = probe(temp.path());
507
508        let _ = child.kill();
509        let _ = child.wait();
510        match probe.status {
511            LocalDaemonStatus::Stale { pid } => assert_eq!(pid, child.id()),
512            other => panic!("expected Stale for live non-Heddle pid, got {other:?}"),
513        }
514    }
515
516    #[test]
517    fn detect_returns_target_when_running() {
518        let temp = TempDir::new().unwrap();
519        let sockets = temp.path().join("sockets");
520        std::fs::create_dir_all(&sockets).unwrap();
521        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
522        let target = detect_local_daemon(temp.path()).expect("daemon detected");
523        assert_eq!(target.pid, std::process::id());
524        assert!(
525            target.socket_path.ends_with("sockets/grpc.sock"),
526            "socket path was {:?}",
527            target.socket_path
528        );
529    }
530
531    #[test]
532    fn detect_returns_none_when_absent() {
533        let temp = TempDir::new().unwrap();
534        // A fresh temp dir with no `sockets/` subtree — probe returns
535        // Absent, detect collapses that to None.
536        assert!(detect_local_daemon(temp.path()).is_none());
537    }
538
539    #[cfg(unix)]
540    #[test]
541    fn enforce_peer_uid_accepts_matching_uid() {
542        assert!(enforce_peer_uid(1000, 1000).is_ok());
543    }
544
545    #[cfg(unix)]
546    #[test]
547    fn enforce_peer_uid_rejects_mismatched_uid() {
548        let err = enforce_peer_uid(1001, 1000).unwrap_err();
549        assert_eq!(err.kind(), std::io::ErrorKind::PermissionDenied);
550    }
551
552    #[cfg(unix)]
553    #[tokio::test]
554    async fn connect_probe_rejects_socketless_pidfile() {
555        // Pidfile points at our own pid (so the file-stat probe says
556        // `Running`), but no listener is bound to the socket path.
557        // The connect probe must catch this and return None — that's
558        // its whole job.
559        let temp = TempDir::new().unwrap();
560        let sockets = temp.path().join("sockets");
561        std::fs::create_dir_all(&sockets).unwrap();
562        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
563        let result = detect_local_daemon_with_connect_probe(
564            temp.path(),
565            std::time::Duration::from_millis(50),
566        )
567        .await;
568        assert!(
569            result.is_none(),
570            "connect probe should reject when no listener is bound"
571        );
572    }
573
574    #[cfg(unix)]
575    #[tokio::test]
576    async fn connect_probe_accepts_live_listener() {
577        use tokio::net::UnixListener;
578        let temp = TempDir::new().unwrap();
579        let sockets = temp.path().join("sockets");
580        std::fs::create_dir_all(&sockets).unwrap();
581        let socket_path = sockets.join("grpc.sock");
582        let _listener = match UnixListener::bind(&socket_path) {
583            Ok(listener) => listener,
584            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
585                eprintln!("skipping live-listener connect probe test: UDS bind denied: {err}");
586                return;
587            }
588            Err(err) => panic!("bind local daemon socket: {err}"),
589        };
590        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
591        let result = detect_local_daemon_with_connect_probe(
592            temp.path(),
593            std::time::Duration::from_millis(200),
594        )
595        .await;
596        assert!(
597            result.is_some(),
598            "connect probe should succeed when a listener is bound"
599        );
600    }
601
602    #[cfg(unix)]
603    #[tokio::test]
604    async fn check_peer_uid_matches_self_accepts_socketpair() {
605        let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
606        assert!(check_peer_uid_matches_self(&peer).is_ok());
607    }
608
609    #[cfg(unix)]
610    #[tokio::test]
611    async fn connect_channel_is_none_when_daemon_absent() {
612        // No pidfile, no socket — `connect_local_daemon_channel`
613        // should short-circuit on the detect probe and return None
614        // without attempting a connect.
615        let temp = TempDir::new().unwrap();
616        let result =
617            connect_local_daemon_channel(temp.path(), std::time::Duration::from_millis(50)).await;
618        assert!(result.is_none());
619    }
620}