Skip to main content

running_process/broker/
backend_handle.rs

1//! Public handle for a verified backend daemon.
2//!
3//! `BackendHandle` is the shared probe-and-verify abstraction for broker-managed
4//! daemons and direct-daemon consumers. A cache manifest records where a daemon
5//! is listening and which process identity it claimed when the manifest was
6//! written. Probing turns that persisted identity into an owned handle only
7//! after the endpoint tuple, active IPC response, current boot ID, process
8//! liveness, executable path, and executable digest still match.
9//!
10//! Consumers should use this module at the boundary where they would otherwise
11//! trust a manifest, PID file, socket path, or named-pipe path from disk.
12//!
13//! ```
14//! use running_process::broker::backend_handle::BackendHandle;
15//! use running_process::broker::protocol::CacheManifest;
16//!
17//! fn existing_backend(manifest: &CacheManifest) -> Option<BackendHandle> {
18//!     let handle = BackendHandle::probe_manifest(manifest)?;
19//!     handle.is_alive().then_some(handle)
20//! }
21//! ```
22//!
23//! Direct-daemon consumers that just spawned a backend can persist
24//! [`DaemonProcess`] and later probe it without duplicating the liveness and
25//! executable-hash checks:
26//!
27//! ```no_run
28//! use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
29//! use running_process::broker::protocol::Endpoint;
30//!
31//! # fn example() -> running_process::broker::backend_handle::Result<()> {
32//! let endpoint = Endpoint {
33//!     namespace_id: "local-dev".to_owned(),
34//!     path: "running-process-example.sock".to_owned(),
35//! };
36//! let daemon = DaemonProcess::current_process(endpoint.clone(), Some(300))?;
37//!
38//! let handle =
39//!     BackendHandle::probe_with_service("soldr", "1.2.3", &endpoint, &daemon)?;
40//! assert_eq!(handle.service_name, "soldr");
41//! # Ok(())
42//! # }
43//! ```
44
45use std::io;
46use std::time::{Duration, Instant};
47
48use crate::broker::backend_lifecycle::identity::IdentityError;
49use crate::broker::backend_lifecycle::probe::{self, ProbeError};
50use crate::broker::backend_lifecycle::verify_pid::{self, ProcessHandle, VerifyPidError};
51use crate::broker::protocol::{CacheManifest, Endpoint};
52
53pub use crate::broker::backend_lifecycle::DaemonProcess;
54
55/// Result type returned by backend-handle operations.
56pub type Result<T> = std::result::Result<T, BackendHandleError>;
57
58/// A verified handle to a running backend daemon.
59///
60/// The handle carries the daemon identity needed to defend against stale
61/// manifests and PID recycling before consumers connect to the IPC endpoint.
62///
63/// A handle is created only through one of the `probe*` constructors. The
64/// constructor performs all identity checks first; successful callers may then
65/// use [`Self::is_alive`] for a cheap liveness check or [`Self::connect`] to
66/// open a fresh local-socket connection.
67pub struct BackendHandle {
68    /// Logical service name from the manifest or direct probe caller.
69    pub service_name: String,
70    /// Service version from the manifest or direct probe caller.
71    pub service_version: String,
72    /// Verified daemon process identity.
73    pub daemon_process: DaemonProcess,
74    #[cfg(unix)]
75    pub(crate) pid_handle: Option<ProcessHandle>,
76    #[cfg(windows)]
77    pub(crate) process_handle: Option<ProcessHandle>,
78}
79
80impl BackendHandle {
81    /// Connect to an existing backend by endpoint and verify process identity.
82    ///
83    /// This probe verifies the endpoint identity tuple, requires the endpoint
84    /// to answer the nonce-based IPC identity probe, then verifies current boot
85    /// ID, process liveness, executable path, and executable SHA-256. It
86    /// returns `None` for stale manifests, dead PIDs, mismatched daemon
87    /// binaries, or endpoints that do not answer as the expected backend.
88    ///
89    /// Use this when the caller already has service metadata elsewhere and only
90    /// needs to know whether the daemon identity is still valid.
91    ///
92    /// **BLOCKING.** Performs synchronous IPC up to
93    /// [`probe::DEFAULT_ENDPOINT_PROBE_TIMEOUT`]
94    /// (500 ms). From a tokio task, call from `spawn_blocking` or
95    /// switch to [`Self::probe_async`] (requires the `client-async`
96    /// feature).
97    ///
98    /// ```no_run
99    /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
100    /// use running_process::broker::protocol::Endpoint;
101    ///
102    /// # fn example(endpoint: Endpoint, expected: DaemonProcess) {
103    /// if let Some(handle) = BackendHandle::probe(&endpoint, &expected) {
104    ///     assert!(handle.is_alive());
105    /// }
106    /// # }
107    /// ```
108    pub fn probe(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
109        Self::probe_with_service("", "", endpoint, expected).ok()
110    }
111
112    /// Async counterpart of [`Self::probe`] (#414).
113    ///
114    /// Performs the same identity checks but all I/O runs on the
115    /// current tokio runtime, so tokio daemons (zccache, soldr, clud)
116    /// can call this directly instead of wrapping in `spawn_blocking`.
117    ///
118    /// Available when the `client-async` cargo feature is enabled.
119    #[cfg(feature = "client-async")]
120    pub async fn probe_async(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
121        Self::probe_with_service_async("", "", endpoint, expected)
122            .await
123            .ok()
124    }
125
126    /// Probe an existing backend and attach service metadata to the handle.
127    ///
128    /// This is the preferred constructor for direct-daemon consumers because it
129    /// preserves the logical service tuple alongside the verified process
130    /// identity.
131    ///
132    /// **BLOCKING.** Performs synchronous IPC up to
133    /// [`probe::DEFAULT_ENDPOINT_PROBE_TIMEOUT`]
134    /// (500 ms). From a tokio task, call from `spawn_blocking` or use
135    /// [`Self::probe_with_service_async`] (requires the
136    /// `client-async` feature) instead — calling this directly from
137    /// an async context will block the runtime worker thread.
138    ///
139    /// ```no_run
140    /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
141    /// use running_process::broker::protocol::Endpoint;
142    ///
143    /// # fn example(endpoint: Endpoint, expected: DaemonProcess)
144    /// #     -> running_process::broker::backend_handle::Result<BackendHandle>
145    /// # {
146    /// BackendHandle::probe_with_service("zccache", "0.8.0", &endpoint, &expected)
147    /// # }
148    /// ```
149    pub fn probe_with_service(
150        service_name: impl Into<String>,
151        service_version: impl Into<String>,
152        endpoint: &Endpoint,
153        expected: &DaemonProcess,
154    ) -> Result<Self> {
155        let process_handle = probe::probe_endpoint(endpoint, expected)?;
156        Ok(Self::from_verified(
157            service_name.into(),
158            service_version.into(),
159            expected.clone(),
160            process_handle,
161        ))
162    }
163
164    /// Async counterpart of [`Self::probe_with_service`] (#414).
165    ///
166    /// Performs the same identity checks (endpoint tuple, PID, exe
167    /// path, exe SHA-256, boot ID, and the live nonce probe) but all
168    /// I/O runs on the current tokio runtime. This is the preferred
169    /// entry point for tokio daemons (zccache, soldr, clud) — calling
170    /// the blocking [`Self::probe_with_service`] from an async
171    /// context blocks the runtime worker thread.
172    ///
173    /// Available when the `client-async` cargo feature is enabled.
174    ///
175    /// ```no_run
176    /// # #[cfg(feature = "client-async")]
177    /// # async fn example(
178    /// #     endpoint: running_process::broker::protocol::Endpoint,
179    /// #     expected: running_process::broker::backend_handle::DaemonProcess,
180    /// # ) -> running_process::broker::backend_handle::Result<()> {
181    /// use running_process::broker::backend_handle::BackendHandle;
182    ///
183    /// let handle = BackendHandle::probe_with_service_async(
184    ///     "zccache", "0.8.0", &endpoint, &expected,
185    /// ).await?;
186    /// assert!(handle.is_alive());
187    /// # Ok(()) }
188    /// ```
189    #[cfg(feature = "client-async")]
190    pub async fn probe_with_service_async(
191        service_name: impl Into<String>,
192        service_version: impl Into<String>,
193        endpoint: &Endpoint,
194        expected: &DaemonProcess,
195    ) -> Result<Self> {
196        let process_handle =
197            crate::broker::backend_lifecycle::probe_async::probe_endpoint_async(endpoint, expected)
198                .await?;
199        Ok(Self::from_verified(
200            service_name.into(),
201            service_version.into(),
202            expected.clone(),
203            process_handle,
204        ))
205    }
206
207    /// Probe the `current_daemon` recorded in a cache manifest.
208    ///
209    /// Returns `None` when the manifest has no daemon entry or when the daemon
210    /// entry no longer matches a live process on the current boot.
211    ///
212    /// ```
213    /// use running_process::broker::backend_handle::BackendHandle;
214    /// use running_process::broker::protocol::CacheManifest;
215    ///
216    /// # fn example(manifest: &CacheManifest) {
217    /// match BackendHandle::probe_manifest(manifest) {
218    ///     Some(handle) if handle.is_alive() => {
219    ///         // Reuse the verified backend.
220    ///     }
221    ///     _ => {
222    ///         // Spawn or discover a replacement backend.
223    ///     }
224    /// }
225    /// # }
226    /// ```
227    pub fn probe_manifest(manifest: &CacheManifest) -> Option<Self> {
228        Self::try_from_manifest(manifest).ok().flatten()
229    }
230
231    /// Fallible variant of [`Self::probe_manifest`] that preserves parse errors.
232    ///
233    /// Use this in maintenance tools and diagnostics where malformed manifest
234    /// identities should be reported separately from a normal cache miss.
235    pub fn try_from_manifest(manifest: &CacheManifest) -> Result<Option<Self>> {
236        let Some(daemon_process) = DaemonProcess::from_manifest_current_daemon(manifest)? else {
237            return Ok(None);
238        };
239        let handle = Self::probe_with_service(
240            manifest.service_name.clone(),
241            manifest.service_version.clone(),
242            &daemon_process.ipc_endpoint,
243            &daemon_process,
244        )?;
245        Ok(Some(handle))
246    }
247
248    /// Check liveness without opening a new IPC connection.
249    ///
250    /// On platforms with an owned process-handle primitive, this checks the
251    /// handle captured during probing. Otherwise it falls back to opening the
252    /// process ID again.
253    pub fn is_alive(&self) -> bool {
254        self.platform_handle()
255            .map(|handle| handle.is_alive())
256            .unwrap_or_else(|| verify_pid::process_is_alive(self.daemon_process.pid))
257    }
258
259    /// Open a fresh IPC connection to this backend.
260    ///
261    /// The process identity is verified when the handle is created. Callers that
262    /// cache handles for a long time should call [`Self::is_alive`] or reprobe
263    /// from the latest manifest before opening a connection.
264    ///
265    /// ```no_run
266    /// use running_process::broker::backend_handle::BackendHandle;
267    ///
268    /// async fn connect_to_verified_backend(
269    ///     handle: &BackendHandle,
270    /// ) -> running_process::broker::backend_handle::Result<()> {
271    ///     let connection = handle.connect().await?;
272    ///     let _stream = connection.into_inner();
273    ///     Ok(())
274    /// }
275    /// ```
276    pub async fn connect(&self) -> Result<Connection> {
277        Connection::connect(&self.daemon_process.ipc_endpoint).map_err(BackendHandleError::Connect)
278    }
279
280    /// Duplicate a broker-owned pipe handle into this verified backend process.
281    ///
282    /// This is the Windows bridge between `BackendHandle` identity verification
283    /// and the optional Phase 6 `DuplicateHandle` transport. The caller still
284    /// owns delivery of the paired handoff token to the backend and must wait
285    /// for backend acknowledgement before reporting handoff success.
286    #[cfg(windows)]
287    pub fn try_duplicate_windows_handoff_handle(
288        &self,
289        pipe_handle: crate::broker::server::handoff::WindowsHandleValue,
290        handoff_token: crate::broker::server::handoff::HandoffToken,
291    ) -> crate::broker::server::handoff::DuplicateHandleResult {
292        let attempt = crate::broker::server::handoff::DuplicateHandleAttempt::new(
293            pipe_handle,
294            self.daemon_process.pid,
295            handoff_token,
296        );
297        crate::broker::server::handoff::try_duplicate_handle(&attempt)
298    }
299
300    /// Send a graceful shutdown signal and wait until the process exits.
301    ///
302    /// On Windows this foundation returns `GracefulTerminateUnsupported` until
303    /// the broker shutdown request protocol lands.
304    ///
305    /// Dropping the handle without calling this method leaves the backend
306    /// running.
307    pub async fn shutdown(self, timeout: Duration) -> Result<()> {
308        verify_pid::signal_terminate(self.daemon_process.pid)?;
309        let deadline = Instant::now() + timeout;
310        while Instant::now() < deadline {
311            if !self.is_alive() {
312                return Ok(());
313            }
314            std::thread::sleep(Duration::from_millis(20));
315        }
316        Err(BackendHandleError::ShutdownTimeout {
317            pid: self.daemon_process.pid,
318        })
319    }
320
321    /// Force-kill the daemon process.
322    ///
323    /// This is the last-resort teardown path for a daemon that ignored graceful
324    /// shutdown or whose IPC protocol is unavailable.
325    pub fn force_kill(self) -> Result<()> {
326        verify_pid::force_kill_pid(self.daemon_process.pid)?;
327        Ok(())
328    }
329
330    fn from_verified(
331        service_name: String,
332        service_version: String,
333        daemon_process: DaemonProcess,
334        process_handle: ProcessHandle,
335    ) -> Self {
336        #[cfg(unix)]
337        {
338            Self {
339                service_name,
340                service_version,
341                daemon_process,
342                pid_handle: Some(process_handle),
343            }
344        }
345
346        #[cfg(windows)]
347        {
348            Self {
349                service_name,
350                service_version,
351                daemon_process,
352                process_handle: Some(process_handle),
353            }
354        }
355    }
356
357    fn platform_handle(&self) -> Option<&ProcessHandle> {
358        #[cfg(unix)]
359        {
360            self.pid_handle.as_ref()
361        }
362
363        #[cfg(windows)]
364        {
365            self.process_handle.as_ref()
366        }
367    }
368}
369
370/// A fresh IPC connection to a verified backend daemon.
371///
372/// `Connection` is intentionally thin: `BackendHandle` owns identity and
373/// liveness, while this type owns a single local-socket stream opened from the
374/// verified endpoint.
375pub struct Connection {
376    stream: interprocess::local_socket::Stream,
377}
378
379impl Connection {
380    /// Connect to a backend endpoint using the platform local-socket name type.
381    pub fn connect(endpoint: &Endpoint) -> io::Result<Self> {
382        if endpoint.path.is_empty() {
383            return Err(io::Error::new(
384                io::ErrorKind::InvalidInput,
385                "backend endpoint path is empty",
386            ));
387        }
388        let name = endpoint_name(&endpoint.path)?;
389
390        use interprocess::local_socket::traits::Stream as _;
391        let stream = interprocess::local_socket::Stream::connect(name)?;
392        Ok(Self { stream })
393    }
394
395    /// Return the underlying `interprocess` stream.
396    pub fn into_inner(self) -> interprocess::local_socket::Stream {
397        self.stream
398    }
399}
400
401/// Errors returned by `BackendHandle`.
402#[derive(Debug, thiserror::Error)]
403pub enum BackendHandleError {
404    /// Daemon identity normalization failed.
405    #[error(transparent)]
406    Identity(#[from] IdentityError),
407    /// Endpoint/process probing failed.
408    #[error(transparent)]
409    Probe(#[from] ProbeError),
410    /// Opening an IPC connection failed.
411    #[error("backend IPC connection failed: {0}")]
412    Connect(io::Error),
413    /// Process verification or signalling failed.
414    #[error(transparent)]
415    VerifyPid(#[from] VerifyPidError),
416    /// Graceful shutdown timed out.
417    #[error("backend shutdown timed out for pid {pid}")]
418    ShutdownTimeout {
419        /// Process ID that did not exit before the timeout.
420        pid: u32,
421    },
422}
423
424fn endpoint_name(path: &str) -> io::Result<interprocess::local_socket::Name<'_>> {
425    use interprocess::local_socket::prelude::*;
426
427    #[cfg(unix)]
428    {
429        use interprocess::local_socket::GenericFilePath;
430        path.to_fs_name::<GenericFilePath>()
431    }
432
433    #[cfg(windows)]
434    {
435        use interprocess::local_socket::GenericNamespaced;
436        path.to_ns_name::<GenericNamespaced>()
437    }
438}