Skip to main content

running_process/broker/
backend_handle.rs

1//! Public handle for a verified backend daemon.
2//!
3//! `BackendHandle` is the shared probe-and-verify abstraction for broker-managed
4//! daemons and direct-daemon consumers. A cache manifest records where a daemon
5//! is listening and which process identity it claimed when the manifest was
6//! written. Probing turns that persisted identity into an owned handle only
7//! after the endpoint tuple, active IPC response, current boot ID, process
8//! liveness, executable path, and executable digest still match.
9//!
10//! Consumers should use this module at the boundary where they would otherwise
11//! trust a manifest, PID file, socket path, or named-pipe path from disk.
12//!
13//! ```
14//! use running_process::broker::backend_handle::BackendHandle;
15//! use running_process::broker::protocol::CacheManifest;
16//!
17//! fn existing_backend(manifest: &CacheManifest) -> Option<BackendHandle> {
18//!     let handle = BackendHandle::probe_manifest(manifest)?;
19//!     handle.is_alive().then_some(handle)
20//! }
21//! ```
22//!
23//! Direct-daemon consumers that just spawned a backend can persist
24//! [`DaemonProcess`] and later probe it without duplicating the liveness and
25//! executable-hash checks:
26//!
27//! ```no_run
28//! use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
29//! use running_process::broker::protocol::Endpoint;
30//!
31//! # fn example() -> running_process::broker::backend_handle::Result<()> {
32//! let endpoint = Endpoint {
33//!     namespace_id: "local-dev".to_owned(),
34//!     path: "running-process-example.sock".to_owned(),
35//! };
36//! let daemon = DaemonProcess::current_process(endpoint.clone(), Some(300))?;
37//!
38//! let handle =
39//!     BackendHandle::probe_with_service("soldr", "1.2.3", &endpoint, &daemon)?;
40//! assert_eq!(handle.service_name, "soldr");
41//! # Ok(())
42//! # }
43//! ```
44
45use std::io;
46use std::time::{Duration, Instant};
47
48use crate::broker::backend_lifecycle::identity::IdentityError;
49use crate::broker::backend_lifecycle::probe::{self, ProbeError};
50use crate::broker::backend_lifecycle::verify_pid::{self, ProcessHandle, VerifyPidError};
51use crate::broker::protocol::{CacheManifest, Endpoint};
52
53pub use crate::broker::backend_lifecycle::DaemonProcess;
54
55/// Result type returned by backend-handle operations.
56pub type Result<T> = std::result::Result<T, BackendHandleError>;
57
58/// A verified handle to a running backend daemon.
59///
60/// The handle carries the daemon identity needed to defend against stale
61/// manifests and PID recycling before consumers connect to the IPC endpoint.
62///
63/// A handle is created only through one of the `probe*` constructors. The
64/// constructor performs all identity checks first; successful callers may then
65/// use [`Self::is_alive`] for a cheap liveness check or [`Self::connect`] to
66/// open a fresh local-socket connection.
67pub struct BackendHandle {
68    /// Logical service name from the manifest or direct probe caller.
69    pub service_name: String,
70    /// Service version from the manifest or direct probe caller.
71    pub service_version: String,
72    /// Verified daemon process identity.
73    pub daemon_process: DaemonProcess,
74    #[cfg(unix)]
75    pub(crate) pid_handle: Option<ProcessHandle>,
76    #[cfg(windows)]
77    pub(crate) process_handle: Option<ProcessHandle>,
78}
79
80impl BackendHandle {
81    /// Connect to an existing backend by endpoint and verify process identity.
82    ///
83    /// This probe verifies the endpoint identity tuple, requires the endpoint
84    /// to answer the nonce-based IPC identity probe, then verifies current boot
85    /// ID, process liveness, executable path, and executable SHA-256. It
86    /// returns `None` for stale manifests, dead PIDs, mismatched daemon
87    /// binaries, or endpoints that do not answer as the expected backend.
88    ///
89    /// Use this when the caller already has service metadata elsewhere and only
90    /// needs to know whether the daemon identity is still valid.
91    ///
92    /// ```no_run
93    /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
94    /// use running_process::broker::protocol::Endpoint;
95    ///
96    /// # fn example(endpoint: Endpoint, expected: DaemonProcess) {
97    /// if let Some(handle) = BackendHandle::probe(&endpoint, &expected) {
98    ///     assert!(handle.is_alive());
99    /// }
100    /// # }
101    /// ```
102    pub fn probe(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
103        Self::probe_with_service("", "", endpoint, expected).ok()
104    }
105
106    /// Probe an existing backend and attach service metadata to the handle.
107    ///
108    /// This is the preferred constructor for direct-daemon consumers because it
109    /// preserves the logical service tuple alongside the verified process
110    /// identity.
111    ///
112    /// ```no_run
113    /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
114    /// use running_process::broker::protocol::Endpoint;
115    ///
116    /// # fn example(endpoint: Endpoint, expected: DaemonProcess)
117    /// #     -> running_process::broker::backend_handle::Result<BackendHandle>
118    /// # {
119    /// BackendHandle::probe_with_service("zccache", "0.8.0", &endpoint, &expected)
120    /// # }
121    /// ```
122    pub fn probe_with_service(
123        service_name: impl Into<String>,
124        service_version: impl Into<String>,
125        endpoint: &Endpoint,
126        expected: &DaemonProcess,
127    ) -> Result<Self> {
128        let process_handle = probe::probe_endpoint(endpoint, expected)?;
129        Ok(Self::from_verified(
130            service_name.into(),
131            service_version.into(),
132            expected.clone(),
133            process_handle,
134        ))
135    }
136
137    /// Probe the `current_daemon` recorded in a cache manifest.
138    ///
139    /// Returns `None` when the manifest has no daemon entry or when the daemon
140    /// entry no longer matches a live process on the current boot.
141    ///
142    /// ```
143    /// use running_process::broker::backend_handle::BackendHandle;
144    /// use running_process::broker::protocol::CacheManifest;
145    ///
146    /// # fn example(manifest: &CacheManifest) {
147    /// match BackendHandle::probe_manifest(manifest) {
148    ///     Some(handle) if handle.is_alive() => {
149    ///         // Reuse the verified backend.
150    ///     }
151    ///     _ => {
152    ///         // Spawn or discover a replacement backend.
153    ///     }
154    /// }
155    /// # }
156    /// ```
157    pub fn probe_manifest(manifest: &CacheManifest) -> Option<Self> {
158        Self::try_from_manifest(manifest).ok().flatten()
159    }
160
161    /// Fallible variant of [`Self::probe_manifest`] that preserves parse errors.
162    ///
163    /// Use this in maintenance tools and diagnostics where malformed manifest
164    /// identities should be reported separately from a normal cache miss.
165    pub fn try_from_manifest(manifest: &CacheManifest) -> Result<Option<Self>> {
166        let Some(daemon_process) = DaemonProcess::from_manifest_current_daemon(manifest)? else {
167            return Ok(None);
168        };
169        let handle = Self::probe_with_service(
170            manifest.service_name.clone(),
171            manifest.service_version.clone(),
172            &daemon_process.ipc_endpoint,
173            &daemon_process,
174        )?;
175        Ok(Some(handle))
176    }
177
178    /// Check liveness without opening a new IPC connection.
179    ///
180    /// On platforms with an owned process-handle primitive, this checks the
181    /// handle captured during probing. Otherwise it falls back to opening the
182    /// process ID again.
183    pub fn is_alive(&self) -> bool {
184        self.platform_handle()
185            .map(|handle| handle.is_alive())
186            .unwrap_or_else(|| verify_pid::process_is_alive(self.daemon_process.pid))
187    }
188
189    /// Open a fresh IPC connection to this backend.
190    ///
191    /// The process identity is verified when the handle is created. Callers that
192    /// cache handles for a long time should call [`Self::is_alive`] or reprobe
193    /// from the latest manifest before opening a connection.
194    ///
195    /// ```no_run
196    /// use running_process::broker::backend_handle::BackendHandle;
197    ///
198    /// async fn connect_to_verified_backend(
199    ///     handle: &BackendHandle,
200    /// ) -> running_process::broker::backend_handle::Result<()> {
201    ///     let connection = handle.connect().await?;
202    ///     let _stream = connection.into_inner();
203    ///     Ok(())
204    /// }
205    /// ```
206    pub async fn connect(&self) -> Result<Connection> {
207        Connection::connect(&self.daemon_process.ipc_endpoint).map_err(BackendHandleError::Connect)
208    }
209
210    /// Duplicate a broker-owned pipe handle into this verified backend process.
211    ///
212    /// This is the Windows bridge between `BackendHandle` identity verification
213    /// and the optional Phase 6 `DuplicateHandle` transport. The caller still
214    /// owns delivery of the paired handoff token to the backend and must wait
215    /// for backend acknowledgement before reporting handoff success.
216    #[cfg(windows)]
217    pub fn try_duplicate_windows_handoff_handle(
218        &self,
219        pipe_handle: crate::broker::server::handoff::WindowsHandleValue,
220        handoff_token: crate::broker::server::handoff::HandoffToken,
221    ) -> crate::broker::server::handoff::DuplicateHandleResult {
222        let attempt = crate::broker::server::handoff::DuplicateHandleAttempt::new(
223            pipe_handle,
224            self.daemon_process.pid,
225            handoff_token,
226        );
227        crate::broker::server::handoff::try_duplicate_handle(&attempt)
228    }
229
230    /// Send a graceful shutdown signal and wait until the process exits.
231    ///
232    /// On Windows this foundation returns `GracefulTerminateUnsupported` until
233    /// the broker shutdown request protocol lands.
234    ///
235    /// Dropping the handle without calling this method leaves the backend
236    /// running.
237    pub async fn shutdown(self, timeout: Duration) -> Result<()> {
238        verify_pid::signal_terminate(self.daemon_process.pid)?;
239        let deadline = Instant::now() + timeout;
240        while Instant::now() < deadline {
241            if !self.is_alive() {
242                return Ok(());
243            }
244            std::thread::sleep(Duration::from_millis(20));
245        }
246        Err(BackendHandleError::ShutdownTimeout {
247            pid: self.daemon_process.pid,
248        })
249    }
250
251    /// Force-kill the daemon process.
252    ///
253    /// This is the last-resort teardown path for a daemon that ignored graceful
254    /// shutdown or whose IPC protocol is unavailable.
255    pub fn force_kill(self) -> Result<()> {
256        verify_pid::force_kill_pid(self.daemon_process.pid)?;
257        Ok(())
258    }
259
260    fn from_verified(
261        service_name: String,
262        service_version: String,
263        daemon_process: DaemonProcess,
264        process_handle: ProcessHandle,
265    ) -> Self {
266        #[cfg(unix)]
267        {
268            Self {
269                service_name,
270                service_version,
271                daemon_process,
272                pid_handle: Some(process_handle),
273            }
274        }
275
276        #[cfg(windows)]
277        {
278            Self {
279                service_name,
280                service_version,
281                daemon_process,
282                process_handle: Some(process_handle),
283            }
284        }
285    }
286
287    fn platform_handle(&self) -> Option<&ProcessHandle> {
288        #[cfg(unix)]
289        {
290            self.pid_handle.as_ref()
291        }
292
293        #[cfg(windows)]
294        {
295            self.process_handle.as_ref()
296        }
297    }
298}
299
300/// A fresh IPC connection to a verified backend daemon.
301///
302/// `Connection` is intentionally thin: `BackendHandle` owns identity and
303/// liveness, while this type owns a single local-socket stream opened from the
304/// verified endpoint.
305pub struct Connection {
306    stream: interprocess::local_socket::Stream,
307}
308
309impl Connection {
310    /// Connect to a backend endpoint using the platform local-socket name type.
311    pub fn connect(endpoint: &Endpoint) -> io::Result<Self> {
312        if endpoint.path.is_empty() {
313            return Err(io::Error::new(
314                io::ErrorKind::InvalidInput,
315                "backend endpoint path is empty",
316            ));
317        }
318        let name = endpoint_name(&endpoint.path)?;
319
320        use interprocess::local_socket::traits::Stream as _;
321        let stream = interprocess::local_socket::Stream::connect(name)?;
322        Ok(Self { stream })
323    }
324
325    /// Return the underlying `interprocess` stream.
326    pub fn into_inner(self) -> interprocess::local_socket::Stream {
327        self.stream
328    }
329}
330
331/// Errors returned by `BackendHandle`.
332#[derive(Debug, thiserror::Error)]
333pub enum BackendHandleError {
334    /// Daemon identity normalization failed.
335    #[error(transparent)]
336    Identity(#[from] IdentityError),
337    /// Endpoint/process probing failed.
338    #[error(transparent)]
339    Probe(#[from] ProbeError),
340    /// Opening an IPC connection failed.
341    #[error("backend IPC connection failed: {0}")]
342    Connect(io::Error),
343    /// Process verification or signalling failed.
344    #[error(transparent)]
345    VerifyPid(#[from] VerifyPidError),
346    /// Graceful shutdown timed out.
347    #[error("backend shutdown timed out for pid {pid}")]
348    ShutdownTimeout {
349        /// Process ID that did not exit before the timeout.
350        pid: u32,
351    },
352}
353
354fn endpoint_name(path: &str) -> io::Result<interprocess::local_socket::Name<'_>> {
355    use interprocess::local_socket::prelude::*;
356
357    #[cfg(unix)]
358    {
359        use interprocess::local_socket::GenericFilePath;
360        path.to_fs_name::<GenericFilePath>()
361    }
362
363    #[cfg(windows)]
364    {
365        use interprocess::local_socket::GenericNamespaced;
366        path.to_ns_name::<GenericNamespaced>()
367    }
368}