heddle-cli 0.4.0

An AI-native version control system
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
// SPDX-License-Identifier: Apache-2.0
//! Local-daemon auto-detection.
//!
//! Every gRPC-using verb in the CLI checks this first. When the per-repo
//! `.heddle/sockets/grpc.sock` exists and the pidfile points at a live
//! process, callers can route their RPC over the UDS instead of opening
//! an in-process [`GrpcLocalService`]. The latency win matters for tight
//! agent loops.
//!
//! Three layers:
//!
//! 1. [`detect_local_daemon`] — file-stat probe (pidfile + liveness via
//!    `kill(pid, 0)` + same-executable identity). Cheap, syscall-only,
//!    used as the cheap negative case ("no daemon, fall through to
//!    in-process").
//! 2. [`detect_local_daemon_with_connect_probe`] — same as (1) but
//!    actually opens a `UnixStream` and checks kernel-reported peer
//!    credentials to confirm the listener is owned by our uid. Catches
//!    the "stale socket file with a live unrelated PID" race.
//! 3. [`connect_local_daemon_channel`] — full path: build a tonic
//!    [`tonic::transport::Channel`] over the UDS, run the gRPC
//!    `Health.Check` handshake, and cache the working channel for the
//!    rest of the process. This is what the read-shaped CLI verbs
//!    route through.
//!
//! All three caches are keyed by canonical heddle-dir path, so a CLI
//! invocation that touches one repo pays the probe cost exactly once.

use std::{
    io,
    path::{Path, PathBuf},
    sync::{
        Arc,
        atomic::{AtomicU64, Ordering},
    },
    time::Duration,
};

#[cfg(unix)]
use daemon::local_daemon::is_heddle_process;

use crate::util::OnceMap;

/// A reachable local daemon — the path of the UDS socket the caller
/// can connect to. Returned by [`detect_local_daemon`] when the probe
/// reports `Running`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UdsTarget {
    pub socket_path: PathBuf,
    pub pid: u32,
}

/// Cache key — canonical heddle-dir path so two probes from different
/// CWDs against the same repo share a result.
type ProbeCacheKey = PathBuf;

/// Process-wide probe cache. Each heddle dir is probed at most once
/// per process lifetime, after which subsequent calls return the
/// cached `Option<UdsTarget>` without touching the filesystem.
///
/// Keyed by canonical heddle-dir path so a process that touches more
/// than one repo (test binaries, agent dispatch loops) caches each
/// repo independently.
static DETECT_CACHE: OnceMap<ProbeCacheKey, Option<UdsTarget>> = OnceMap::new();

/// Run the probe and, when the daemon is `Running`, return the UDS
/// target a tonic client can dial. Cached for the process lifetime so
/// hot agent loops don't pay two stat-syscalls per RPC.
///
/// Probing failure (`Absent`, `Stale`) returns `None` — the caller
/// should fall through to its in-process or remote fallback. The
/// full `Health.Check` + version handshake over a tonic UDS Channel
/// layers on top of [`detect_local_daemon_with_connect_probe`] and
/// [`connect_local_daemon_channel`].
pub fn detect_local_daemon(heddle_dir: &Path) -> Option<UdsTarget> {
    let key: ProbeCacheKey = heddle_dir.to_path_buf();
    DETECT_CACHE.get_or_init_with(&key, || {
        let probe = probe(heddle_dir);
        match probe.status {
            LocalDaemonStatus::Running { pid } => Some(UdsTarget {
                socket_path: probe.socket_path,
                pid,
            }),
            LocalDaemonStatus::Stale { .. } | LocalDaemonStatus::Absent => None,
        }
    })
}

/// Stronger variant of [`detect_local_daemon`] — runs the file-stat
/// probe, then attempts a UDS connect to confirm the daemon is
/// actually accepting connections (not just a stale pidfile that
/// happens to point at a live unrelated process).
///
/// The connect step is intentionally bounded by `timeout`. Default
/// callers should pass something tight (50ms is plenty for a local
/// socket) so a hung daemon doesn't stall every CLI invocation.
///
/// Returns `None` if either the file-stat probe says `Absent`/`Stale`
/// or the UDS connect fails / times out. Caches the *first* outcome
/// for the given heddle dir; subsequent calls are O(1).
#[cfg(unix)]
pub async fn detect_local_daemon_with_connect_probe(
    heddle_dir: &Path,
    timeout: Duration,
) -> Option<UdsTarget> {
    // The file-stat probe handles the cache and the obvious negative
    // cases; only the positive path needs the live connect.
    let target = detect_local_daemon(heddle_dir)?;
    match tokio::time::timeout(
        timeout,
        tokio::net::UnixStream::connect(&target.socket_path),
    )
    .await
    {
        Ok(Ok(stream)) => match check_peer_uid_matches_self(&stream) {
            Ok(()) => Some(target),
            Err(_) => None,
        },
        // Either the connect errored (socket present but listener
        // dead — rare but possible during a graceful shutdown) or
        // the connect timed out (daemon hung). Either way it's not
        // safe to route RPCs through it. The next probe will retry.
        Ok(Err(_)) | Err(_) => None,
    }
}

/// Process-wide cache of working tonic [`tonic::transport::Channel`]s
/// keyed by canonical heddle-dir path. Once we've successfully passed
/// the Health.Check handshake, every subsequent caller in the same
/// process gets the same channel (which is itself internally pooled
/// by tonic / hyper).
///
/// `Channel` is cheap to clone (it's a handle to the underlying
/// connection pool), so handing out clones is the fastest way to
/// amortize the connect cost across a hot agent loop. Keyed by
/// canonical heddle-dir path so a process touching multiple repos
/// (test binaries, multi-repo agents) keeps a per-repo channel.
#[cfg(unix)]
static CHANNEL_CACHE: OnceMap<ProbeCacheKey, tonic::transport::Channel> = OnceMap::new();

/// Per-heddle-dir count of how many times [`build_channel`] has
/// actually opened a UDS, constructed a tonic channel, and run the
/// `Health.Check` handshake. A cache hit in
/// [`connect_local_daemon_channel`] returns a clone of the cached
/// channel without entering `build_channel`, so it does not bump this
/// counter.
///
/// Keyed per heddle dir (not process-global) so the count is
/// deterministic regardless of which other tests run concurrently.
///
/// Test-support only — lets the serve_local test assert that the second
/// `connect_local_daemon_channel` is a true O(1) cache hit (no rebuild)
/// instead of comparing wall-clock time, which flakes on shared CI
/// runners (issue #722).
#[cfg(unix)]
#[doc(hidden)]
static CHANNEL_BUILD_COUNT: OnceMap<ProbeCacheKey, Arc<AtomicU64>> = OnceMap::new();

/// Read the [`build_channel`] run count for `heddle_dir`. See
/// [`CHANNEL_BUILD_COUNT`]. A dir whose channel was never built reads 0.
#[cfg(unix)]
#[doc(hidden)]
pub fn channel_build_count(heddle_dir: &Path) -> u64 {
    CHANNEL_BUILD_COUNT
        .get(&heddle_dir.to_path_buf())
        .map(|c| c.load(Ordering::Relaxed))
        .unwrap_or(0)
}

/// Connect-and-handshake outcome for [`connect_local_daemon_channel`].
///
/// `target` is repeated for the convenience of callers that only need
/// to know whether a daemon is reachable; the live `channel` is what
/// actually issues RPCs.
#[cfg(unix)]
#[derive(Debug, Clone)]
pub struct LocalDaemonChannel {
    pub target: UdsTarget,
    pub channel: tonic::transport::Channel,
}

/// Build a tonic [`tonic::transport::Channel`] over the per-repo UDS,
/// perform the gRPC `Health.Check` handshake, and return both
/// alongside the [`UdsTarget`].
///
/// On the happy path the channel is cached in [`CHANNEL_CACHE`] for
/// the lifetime of this process and subsequent calls return clones of
/// it in O(1). This is the path agent loops should use.
///
/// `connect_timeout` bounds the UDS connect *and* the Health.Check —
/// 50–250ms is appropriate for a same-host socket; anything longer
/// implies a hung daemon. Returns `None` on any failure mode (no
/// daemon, connect failed, health refused) — the caller falls through
/// to its in-process or remote fallback.
///
/// # First consumer (TODO)
///
/// `cmd_status` is the natural first consumer — its read-shaped output
/// is built from a handful of `Repository` lookups that the
/// `OperationLogQueryService` already covers. A future patch should
/// branch in `crates/cli/src/cli/commands/status.rs::cmd_status` like:
///
/// ```ignore
/// if let Some(LocalDaemonChannel { channel, .. }) =
///     connect_local_daemon_channel(repo.heddle_dir(), Duration::from_millis(150)).await
/// {
///     let mut client = OperationLogQueryServiceClient::new(channel);
///     // Build StatusOutput from RPCs instead of direct Repository reads.
/// } else {
///     // Existing in-process path.
/// }
/// ```
///
/// Held back from this patch because (a) the query surface doesn't
/// yet cover every field in `StatusOutput`, and (b) the brief calls
/// out the channel-construction primitive as the deliverable.
#[cfg(unix)]
pub async fn connect_local_daemon_channel(
    heddle_dir: &Path,
    connect_timeout: Duration,
) -> Option<LocalDaemonChannel> {
    let key: ProbeCacheKey = heddle_dir.to_path_buf();
    if let Some(channel) = CHANNEL_CACHE.get(&key) {
        // The detect cache holds the matching target — pull it back
        // out so the returned struct stays self-contained.
        let target = detect_local_daemon(heddle_dir)?;
        return Some(LocalDaemonChannel { target, channel });
    }

    match build_channel(heddle_dir, connect_timeout).await {
        Ok(LocalDaemonChannel { target, channel }) => {
            CHANNEL_CACHE.insert(key, channel.clone());
            Some(LocalDaemonChannel { target, channel })
        }
        Err(_) => None,
    }
}

#[cfg(unix)]
async fn build_channel(
    heddle_dir: &Path,
    connect_timeout: Duration,
) -> std::result::Result<LocalDaemonChannel, ChannelError> {
    CHANNEL_BUILD_COUNT
        .get_or_init_with(&heddle_dir.to_path_buf(), || Arc::new(AtomicU64::new(0)))
        .fetch_add(1, Ordering::Relaxed);
    let target = detect_local_daemon(heddle_dir).ok_or(ChannelError::NoDaemon)?;
    // `unix:` URIs aren't usable as the *origin* on a HTTP/2 channel
    // (the authority pseudo-header has to be a plausible host). The
    // standard tonic UDS recipe is to give the endpoint an opaque
    // `http://heddle-uds` URI for routing and override the connector
    // with a service that returns a `UnixStream` regardless of what
    // URI it's asked for.
    let endpoint = tonic::transport::Endpoint::try_from("http://heddle-uds")
        .map_err(ChannelError::EndpointBuild)?
        .connect_timeout(connect_timeout);

    let socket_path = target.socket_path.clone();
    let connector = tower::service_fn(move |_uri: tonic::transport::Uri| {
        let socket_path = socket_path.clone();
        async move {
            let stream = tokio::net::UnixStream::connect(&socket_path).await?;
            check_peer_uid_matches_self(&stream)?;
            // tonic 0.14 requires the connector's response type to
            // implement `hyper::rt::{Read, Write}`. `TokioIo` is the
            // standard adapter and it's what tonic's own UDS connector
            // uses internally — see
            // `tonic/src/transport/channel/uds_connector.rs`.
            std::io::Result::Ok(hyper_util::rt::TokioIo::new(stream))
        }
    });

    let channel = endpoint
        .connect_with_connector(connector)
        .await
        .map_err(ChannelError::Connect)?;

    // Health.Check is the version handshake. Today the local daemon
    // doesn't install a `tonic_health` reporter, so we expect either
    // `Ok(Serving)` or `Err(Unimplemented)` — the latter is treated
    // as "channel works, daemon predates the handshake" and accepted.
    // Any other error means the channel is wedged and we should fall
    // back to in-process.
    let mut health = tonic_health::pb::health_client::HealthClient::new(channel.clone());
    let request = tonic::Request::new(tonic_health::pb::HealthCheckRequest {
        // Empty service name → "is the whole server serving?" per the
        // gRPC health protocol spec.
        service: String::new(),
    });
    match tokio::time::timeout(connect_timeout, health.check(request)).await {
        Ok(Ok(response)) => {
            let status = response.into_inner().status;
            if status == tonic_health::pb::health_check_response::ServingStatus::Serving as i32 {
                Ok(LocalDaemonChannel { target, channel })
            } else {
                Err(ChannelError::HealthNotServing)
            }
        }
        // Unimplemented: daemon doesn't ship Health (today's case).
        // We still trust the connection — the underlying HTTP/2
        // handshake succeeded above, which is itself a strong signal.
        Ok(Err(status)) if status.code() == tonic::Code::Unimplemented => {
            Ok(LocalDaemonChannel { target, channel })
        }
        Ok(Err(status)) => Err(ChannelError::HealthRpc(status)),
        Err(_elapsed) => Err(ChannelError::HealthRpc(tonic::Status::deadline_exceeded(
            "Health.Check timed out",
        ))),
    }
}

/// Errors from the channel-build path. Kept private to the module —
/// callers see `Option<LocalDaemonChannel>` from
/// [`connect_local_daemon_channel`] and treat `None` as "no daemon,
/// fall through to in-process".
#[cfg(unix)]
#[derive(Debug)]
#[allow(dead_code)]
enum ChannelError {
    /// Detect probe said no daemon (cheap negative case).
    NoDaemon,
    /// Tonic refused to build the endpoint URI. Programmer error in
    /// practice, but we surface it for the test path.
    EndpointBuild(tonic::transport::Error),
    /// `connect_with_connector` failed — daemon not accepting.
    Connect(tonic::transport::Error),
    /// Health.Check round-trip failed (transport, codec, etc.).
    HealthRpc(tonic::Status),
    /// Health.Check came back with `NOT_SERVING`. We don't trust it.
    HealthNotServing,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LocalDaemonProbe {
    pub socket_path: PathBuf,
    pub pid_path: PathBuf,
    pub status: LocalDaemonStatus,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LocalDaemonStatus {
    /// Socket and pidfile both exist, the pid is alive, and the pid resolves
    /// to the same executable as this client.
    Running { pid: u32 },
    /// Pidfile exists but the pid is dead. The socket may be a leftover.
    Stale { pid: u32 },
    /// No pidfile or socket.
    Absent,
}

/// Per-heddle-dir count of how many times [`probe`] has actually run
/// the file-stat / liveness syscalls (the "cold setup step"). The
/// `DETECT_CACHE` lets the warm path skip this: a warm
/// [`detect_local_daemon`] hit returns the cached `Option<UdsTarget>`
/// without entering `probe` at all, so it does not bump this counter.
///
/// Keyed per heddle dir (not process-global) so a test reading the
/// count for *its* daemon is unaffected by probes that a concurrently
/// running test issues against a *different* temp dir — the counter is
/// deterministic regardless of test parallelism.
///
/// Test-support only — lets the serve_local test assert the *mechanism*
/// (warm path skips the probe) instead of comparing wall-clock time,
/// which flakes on shared CI runners (issue #722).
#[doc(hidden)]
static PROBE_RUN_COUNT: OnceMap<ProbeCacheKey, Arc<AtomicU64>> = OnceMap::new();

/// Read the [`probe`] run count for `heddle_dir`. See
/// [`PROBE_RUN_COUNT`]. A never-probed dir reads 0.
#[doc(hidden)]
pub fn probe_run_count(heddle_dir: &Path) -> u64 {
    PROBE_RUN_COUNT
        .get(&heddle_dir.to_path_buf())
        .map(|c| c.load(Ordering::Relaxed))
        .unwrap_or(0)
}

/// Probe the per-repo daemon directory. Cheap (two file stats, `kill(pid, 0)`,
/// and same-executable identity for live pids).
pub fn probe(heddle_dir: &Path) -> LocalDaemonProbe {
    PROBE_RUN_COUNT
        .get_or_init_with(&heddle_dir.to_path_buf(), || Arc::new(AtomicU64::new(0)))
        .fetch_add(1, Ordering::Relaxed);
    let socket_path = heddle_dir.join("sockets").join("grpc.sock");
    let pid_path = heddle_dir.join("sockets").join("grpc.pid");
    let status = match read_pid(&pid_path) {
        Some(pid) if pid_alive(pid) && pid_identity_verified(pid) => {
            LocalDaemonStatus::Running { pid }
        }
        Some(pid) => LocalDaemonStatus::Stale { pid },
        None => LocalDaemonStatus::Absent,
    };
    LocalDaemonProbe {
        socket_path,
        pid_path,
        status,
    }
}

fn read_pid(path: &Path) -> Option<u32> {
    // The hardened pidfile written by `daemon::local_daemon` has three
    // lines: `<pid>\nheddle-agent\n<unix_secs>\n`. We only need the
    // first line for liveness checks. Parse the leading line, falling
    // back to the entire file (legacy single-line format) so older
    // pidfiles still resolve.
    let raw = std::fs::read_to_string(path).ok()?;
    let first = raw.lines().next().unwrap_or("").trim();
    first
        .parse::<u32>()
        .ok()
        .or_else(|| raw.trim().parse::<u32>().ok())
}

#[cfg(unix)]
fn pid_identity_verified(pid: u32) -> bool {
    let Ok(pid) = i32::try_from(pid) else {
        return false;
    };
    is_heddle_process(pid)
}

#[cfg(not(unix))]
fn pid_identity_verified(_pid: u32) -> bool {
    false
}

#[cfg(unix)]
fn check_peer_uid_matches_self(stream: &tokio::net::UnixStream) -> io::Result<()> {
    let creds = stream.peer_cred()?;
    // SAFETY: geteuid() never fails.
    enforce_peer_uid(creds.uid(), unsafe { libc::geteuid() })
}

#[cfg(unix)]
fn enforce_peer_uid(peer_uid: u32, our_uid: u32) -> io::Result<()> {
    if peer_uid != our_uid {
        return Err(io::Error::new(
            io::ErrorKind::PermissionDenied,
            format!("daemon peer uid {peer_uid} does not match client uid {our_uid}"),
        ));
    }
    Ok(())
}

#[cfg(unix)]
fn pid_alive(pid: u32) -> bool {
    // SAFETY: kill(pid, 0) only validates existence; signal 0 sends nothing.
    unsafe { libc::kill(pid as libc::pid_t, 0) == 0 }
}

#[cfg(not(unix))]
fn pid_alive(_pid: u32) -> bool {
    false
}

#[cfg(test)]
mod tests {
    use tempfile::TempDir;

    use super::*;

    #[test]
    fn absent_when_no_files() {
        let temp = TempDir::new().unwrap();
        let probe = probe(temp.path());
        assert_eq!(probe.status, LocalDaemonStatus::Absent);
    }

    #[test]
    fn stale_when_pidfile_holds_dead_pid() {
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        // PID 2_147_483_646 is well beyond pid_max and not in use.
        std::fs::write(sockets.join("grpc.pid"), "2147483646").unwrap();
        let probe = probe(temp.path());
        assert!(matches!(probe.status, LocalDaemonStatus::Stale { .. }));
    }

    #[test]
    fn running_when_pidfile_holds_self_pid() {
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
        let probe = probe(temp.path());
        match probe.status {
            LocalDaemonStatus::Running { pid } => assert_eq!(pid, std::process::id()),
            other => panic!("expected Running, got {other:?}"),
        }
    }

    #[cfg(unix)]
    #[test]
    fn stale_when_pidfile_holds_live_non_heddle_pid() {
        let mut child = std::process::Command::new("/bin/sleep")
            .arg("30")
            .env_clear()
            .spawn()
            .expect("spawn sleep");
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        std::fs::write(sockets.join("grpc.pid"), child.id().to_string()).unwrap();

        let probe = probe(temp.path());

        let _ = child.kill();
        let _ = child.wait();
        match probe.status {
            LocalDaemonStatus::Stale { pid } => assert_eq!(pid, child.id()),
            other => panic!("expected Stale for live non-Heddle pid, got {other:?}"),
        }
    }

    #[test]
    fn detect_returns_target_when_running() {
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
        let target = detect_local_daemon(temp.path()).expect("daemon detected");
        assert_eq!(target.pid, std::process::id());
        assert!(
            target.socket_path.ends_with("sockets/grpc.sock"),
            "socket path was {:?}",
            target.socket_path
        );
    }

    #[test]
    fn detect_returns_none_when_absent() {
        let temp = TempDir::new().unwrap();
        // A fresh temp dir with no `sockets/` subtree — probe returns
        // Absent, detect collapses that to None.
        assert!(detect_local_daemon(temp.path()).is_none());
    }

    #[cfg(unix)]
    #[test]
    fn enforce_peer_uid_accepts_matching_uid() {
        assert!(enforce_peer_uid(1000, 1000).is_ok());
    }

    #[cfg(unix)]
    #[test]
    fn enforce_peer_uid_rejects_mismatched_uid() {
        let err = enforce_peer_uid(1001, 1000).unwrap_err();
        assert_eq!(err.kind(), std::io::ErrorKind::PermissionDenied);
    }

    #[cfg(unix)]
    #[tokio::test]
    async fn connect_probe_rejects_socketless_pidfile() {
        // Pidfile points at our own pid (so the file-stat probe says
        // `Running`), but no listener is bound to the socket path.
        // The connect probe must catch this and return None — that's
        // its whole job.
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
        let result = detect_local_daemon_with_connect_probe(
            temp.path(),
            std::time::Duration::from_millis(50),
        )
        .await;
        assert!(
            result.is_none(),
            "connect probe should reject when no listener is bound"
        );
    }

    #[cfg(unix)]
    #[tokio::test]
    async fn connect_probe_accepts_live_listener() {
        use tokio::net::UnixListener;
        let temp = TempDir::new().unwrap();
        let sockets = temp.path().join("sockets");
        std::fs::create_dir_all(&sockets).unwrap();
        let socket_path = sockets.join("grpc.sock");
        let _listener = match UnixListener::bind(&socket_path) {
            Ok(listener) => listener,
            Err(err) if err.kind() == std::io::ErrorKind::PermissionDenied => {
                eprintln!("skipping live-listener connect probe test: UDS bind denied: {err}");
                return;
            }
            Err(err) => panic!("bind local daemon socket: {err}"),
        };
        std::fs::write(sockets.join("grpc.pid"), std::process::id().to_string()).unwrap();
        let result = detect_local_daemon_with_connect_probe(
            temp.path(),
            std::time::Duration::from_millis(200),
        )
        .await;
        assert!(
            result.is_some(),
            "connect probe should succeed when a listener is bound"
        );
    }

    #[cfg(unix)]
    #[tokio::test]
    async fn check_peer_uid_matches_self_accepts_socketpair() {
        let (peer, _local) = tokio::net::UnixStream::pair().expect("socketpair");
        assert!(check_peer_uid_matches_self(&peer).is_ok());
    }

    #[cfg(unix)]
    #[tokio::test]
    async fn connect_channel_is_none_when_daemon_absent() {
        // No pidfile, no socket — `connect_local_daemon_channel`
        // should short-circuit on the detect probe and return None
        // without attempting a connect.
        let temp = TempDir::new().unwrap();
        let result =
            connect_local_daemon_channel(temp.path(), std::time::Duration::from_millis(50)).await;
        assert!(result.is_none());
    }
}