running_process/broker/backend_handle.rs
1//! Public handle for a verified backend daemon.
2//!
3//! `BackendHandle` is the shared probe-and-verify abstraction for broker-managed
4//! daemons and direct-daemon consumers. A cache manifest records where a daemon
5//! is listening and which process identity it claimed when the manifest was
6//! written. Probing turns that persisted identity into an owned handle only
7//! after the endpoint tuple, active IPC response, current boot ID, process
8//! liveness, executable path, and executable digest still match.
9//!
10//! Consumers should use this module at the boundary where they would otherwise
11//! trust a manifest, PID file, socket path, or named-pipe path from disk.
12//!
13//! ```
14//! use running_process::broker::backend_handle::BackendHandle;
15//! use running_process::broker::protocol::CacheManifest;
16//!
17//! fn existing_backend(manifest: &CacheManifest) -> Option<BackendHandle> {
18//! let handle = BackendHandle::probe_manifest(manifest)?;
19//! handle.is_alive().then_some(handle)
20//! }
21//! ```
22//!
23//! Direct-daemon consumers that just spawned a backend can persist
24//! [`DaemonProcess`] and later probe it without duplicating the liveness and
25//! executable-hash checks:
26//!
27//! ```no_run
28//! use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
29//! use running_process::broker::protocol::Endpoint;
30//!
31//! # fn example() -> running_process::broker::backend_handle::Result<()> {
32//! let endpoint = Endpoint {
33//! namespace_id: "local-dev".to_owned(),
34//! path: "running-process-example.sock".to_owned(),
35//! };
36//! let daemon = DaemonProcess::current_process(endpoint.clone(), Some(300))?;
37//!
38//! let handle =
39//! BackendHandle::probe_with_service("soldr", "1.2.3", &endpoint, &daemon)?;
40//! assert_eq!(handle.service_name, "soldr");
41//! # Ok(())
42//! # }
43//! ```
44
45use std::io;
46use std::time::{Duration, Instant};
47
48use crate::broker::backend_lifecycle::identity::IdentityError;
49use crate::broker::backend_lifecycle::probe::{self, ProbeError};
50use crate::broker::backend_lifecycle::verify_pid::{self, ProcessHandle, VerifyPidError};
51use crate::broker::protocol::{CacheManifest, Endpoint};
52
53pub use crate::broker::backend_lifecycle::DaemonProcess;
54
55/// Result type returned by backend-handle operations.
56pub type Result<T> = std::result::Result<T, BackendHandleError>;
57
58/// A verified handle to a running backend daemon.
59///
60/// The handle carries the daemon identity needed to defend against stale
61/// manifests and PID recycling before consumers connect to the IPC endpoint.
62///
63/// A handle is created only through one of the `probe*` constructors. The
64/// constructor performs all identity checks first; successful callers may then
65/// use [`Self::is_alive`] for a cheap liveness check or [`Self::connect`] to
66/// open a fresh local-socket connection.
67pub struct BackendHandle {
68 /// Logical service name from the manifest or direct probe caller.
69 pub service_name: String,
70 /// Service version from the manifest or direct probe caller.
71 pub service_version: String,
72 /// Verified daemon process identity.
73 pub daemon_process: DaemonProcess,
74 #[cfg(unix)]
75 pub(crate) pid_handle: Option<ProcessHandle>,
76 #[cfg(windows)]
77 pub(crate) process_handle: Option<ProcessHandle>,
78}
79
80impl BackendHandle {
81 /// Connect to an existing backend by endpoint and verify process identity.
82 ///
83 /// This probe verifies the endpoint identity tuple, requires the endpoint
84 /// to answer the nonce-based IPC identity probe, then verifies current boot
85 /// ID, process liveness, executable path, and executable SHA-256. It
86 /// returns `None` for stale manifests, dead PIDs, mismatched daemon
87 /// binaries, or endpoints that do not answer as the expected backend.
88 ///
89 /// Use this when the caller already has service metadata elsewhere and only
90 /// needs to know whether the daemon identity is still valid.
91 ///
92 /// ```no_run
93 /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
94 /// use running_process::broker::protocol::Endpoint;
95 ///
96 /// # fn example(endpoint: Endpoint, expected: DaemonProcess) {
97 /// if let Some(handle) = BackendHandle::probe(&endpoint, &expected) {
98 /// assert!(handle.is_alive());
99 /// }
100 /// # }
101 /// ```
102 pub fn probe(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
103 Self::probe_with_service("", "", endpoint, expected).ok()
104 }
105
106 /// Probe an existing backend and attach service metadata to the handle.
107 ///
108 /// This is the preferred constructor for direct-daemon consumers because it
109 /// preserves the logical service tuple alongside the verified process
110 /// identity.
111 ///
112 /// ```no_run
113 /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
114 /// use running_process::broker::protocol::Endpoint;
115 ///
116 /// # fn example(endpoint: Endpoint, expected: DaemonProcess)
117 /// # -> running_process::broker::backend_handle::Result<BackendHandle>
118 /// # {
119 /// BackendHandle::probe_with_service("zccache", "0.8.0", &endpoint, &expected)
120 /// # }
121 /// ```
122 pub fn probe_with_service(
123 service_name: impl Into<String>,
124 service_version: impl Into<String>,
125 endpoint: &Endpoint,
126 expected: &DaemonProcess,
127 ) -> Result<Self> {
128 let process_handle = probe::probe_endpoint(endpoint, expected)?;
129 Ok(Self::from_verified(
130 service_name.into(),
131 service_version.into(),
132 expected.clone(),
133 process_handle,
134 ))
135 }
136
137 /// Probe the `current_daemon` recorded in a cache manifest.
138 ///
139 /// Returns `None` when the manifest has no daemon entry or when the daemon
140 /// entry no longer matches a live process on the current boot.
141 ///
142 /// ```
143 /// use running_process::broker::backend_handle::BackendHandle;
144 /// use running_process::broker::protocol::CacheManifest;
145 ///
146 /// # fn example(manifest: &CacheManifest) {
147 /// match BackendHandle::probe_manifest(manifest) {
148 /// Some(handle) if handle.is_alive() => {
149 /// // Reuse the verified backend.
150 /// }
151 /// _ => {
152 /// // Spawn or discover a replacement backend.
153 /// }
154 /// }
155 /// # }
156 /// ```
157 pub fn probe_manifest(manifest: &CacheManifest) -> Option<Self> {
158 Self::try_from_manifest(manifest).ok().flatten()
159 }
160
161 /// Fallible variant of [`Self::probe_manifest`] that preserves parse errors.
162 ///
163 /// Use this in maintenance tools and diagnostics where malformed manifest
164 /// identities should be reported separately from a normal cache miss.
165 pub fn try_from_manifest(manifest: &CacheManifest) -> Result<Option<Self>> {
166 let Some(daemon_process) = DaemonProcess::from_manifest_current_daemon(manifest)? else {
167 return Ok(None);
168 };
169 let handle = Self::probe_with_service(
170 manifest.service_name.clone(),
171 manifest.service_version.clone(),
172 &daemon_process.ipc_endpoint,
173 &daemon_process,
174 )?;
175 Ok(Some(handle))
176 }
177
178 /// Check liveness without opening a new IPC connection.
179 ///
180 /// On platforms with an owned process-handle primitive, this checks the
181 /// handle captured during probing. Otherwise it falls back to opening the
182 /// process ID again.
183 pub fn is_alive(&self) -> bool {
184 self.platform_handle()
185 .map(|handle| handle.is_alive())
186 .unwrap_or_else(|| verify_pid::process_is_alive(self.daemon_process.pid))
187 }
188
189 /// Open a fresh IPC connection to this backend.
190 ///
191 /// The process identity is verified when the handle is created. Callers that
192 /// cache handles for a long time should call [`Self::is_alive`] or reprobe
193 /// from the latest manifest before opening a connection.
194 ///
195 /// ```no_run
196 /// use running_process::broker::backend_handle::BackendHandle;
197 ///
198 /// async fn connect_to_verified_backend(
199 /// handle: &BackendHandle,
200 /// ) -> running_process::broker::backend_handle::Result<()> {
201 /// let connection = handle.connect().await?;
202 /// let _stream = connection.into_inner();
203 /// Ok(())
204 /// }
205 /// ```
206 pub async fn connect(&self) -> Result<Connection> {
207 Connection::connect(&self.daemon_process.ipc_endpoint).map_err(BackendHandleError::Connect)
208 }
209
210 /// Duplicate a broker-owned pipe handle into this verified backend process.
211 ///
212 /// This is the Windows bridge between `BackendHandle` identity verification
213 /// and the optional Phase 6 `DuplicateHandle` transport. The caller still
214 /// owns delivery of the paired handoff token to the backend and must wait
215 /// for backend acknowledgement before reporting handoff success.
216 #[cfg(windows)]
217 pub fn try_duplicate_windows_handoff_handle(
218 &self,
219 pipe_handle: crate::broker::server::handoff::WindowsHandleValue,
220 handoff_token: crate::broker::server::handoff::HandoffToken,
221 ) -> crate::broker::server::handoff::DuplicateHandleResult {
222 let attempt = crate::broker::server::handoff::DuplicateHandleAttempt::new(
223 pipe_handle,
224 self.daemon_process.pid,
225 handoff_token,
226 );
227 crate::broker::server::handoff::try_duplicate_handle(&attempt)
228 }
229
230 /// Send a graceful shutdown signal and wait until the process exits.
231 ///
232 /// On Windows this foundation returns `GracefulTerminateUnsupported` until
233 /// the broker shutdown request protocol lands.
234 ///
235 /// Dropping the handle without calling this method leaves the backend
236 /// running.
237 pub async fn shutdown(self, timeout: Duration) -> Result<()> {
238 verify_pid::signal_terminate(self.daemon_process.pid)?;
239 let deadline = Instant::now() + timeout;
240 while Instant::now() < deadline {
241 if !self.is_alive() {
242 return Ok(());
243 }
244 std::thread::sleep(Duration::from_millis(20));
245 }
246 Err(BackendHandleError::ShutdownTimeout {
247 pid: self.daemon_process.pid,
248 })
249 }
250
251 /// Force-kill the daemon process.
252 ///
253 /// This is the last-resort teardown path for a daemon that ignored graceful
254 /// shutdown or whose IPC protocol is unavailable.
255 pub fn force_kill(self) -> Result<()> {
256 verify_pid::force_kill_pid(self.daemon_process.pid)?;
257 Ok(())
258 }
259
260 fn from_verified(
261 service_name: String,
262 service_version: String,
263 daemon_process: DaemonProcess,
264 process_handle: ProcessHandle,
265 ) -> Self {
266 #[cfg(unix)]
267 {
268 Self {
269 service_name,
270 service_version,
271 daemon_process,
272 pid_handle: Some(process_handle),
273 }
274 }
275
276 #[cfg(windows)]
277 {
278 Self {
279 service_name,
280 service_version,
281 daemon_process,
282 process_handle: Some(process_handle),
283 }
284 }
285 }
286
287 fn platform_handle(&self) -> Option<&ProcessHandle> {
288 #[cfg(unix)]
289 {
290 self.pid_handle.as_ref()
291 }
292
293 #[cfg(windows)]
294 {
295 self.process_handle.as_ref()
296 }
297 }
298}
299
300/// A fresh IPC connection to a verified backend daemon.
301///
302/// `Connection` is intentionally thin: `BackendHandle` owns identity and
303/// liveness, while this type owns a single local-socket stream opened from the
304/// verified endpoint.
305pub struct Connection {
306 stream: interprocess::local_socket::Stream,
307}
308
309impl Connection {
310 /// Connect to a backend endpoint using the platform local-socket name type.
311 pub fn connect(endpoint: &Endpoint) -> io::Result<Self> {
312 if endpoint.path.is_empty() {
313 return Err(io::Error::new(
314 io::ErrorKind::InvalidInput,
315 "backend endpoint path is empty",
316 ));
317 }
318 let name = endpoint_name(&endpoint.path)?;
319
320 use interprocess::local_socket::traits::Stream as _;
321 let stream = interprocess::local_socket::Stream::connect(name)?;
322 Ok(Self { stream })
323 }
324
325 /// Return the underlying `interprocess` stream.
326 pub fn into_inner(self) -> interprocess::local_socket::Stream {
327 self.stream
328 }
329}
330
331/// Errors returned by `BackendHandle`.
332#[derive(Debug, thiserror::Error)]
333pub enum BackendHandleError {
334 /// Daemon identity normalization failed.
335 #[error(transparent)]
336 Identity(#[from] IdentityError),
337 /// Endpoint/process probing failed.
338 #[error(transparent)]
339 Probe(#[from] ProbeError),
340 /// Opening an IPC connection failed.
341 #[error("backend IPC connection failed: {0}")]
342 Connect(io::Error),
343 /// Process verification or signalling failed.
344 #[error(transparent)]
345 VerifyPid(#[from] VerifyPidError),
346 /// Graceful shutdown timed out.
347 #[error("backend shutdown timed out for pid {pid}")]
348 ShutdownTimeout {
349 /// Process ID that did not exit before the timeout.
350 pid: u32,
351 },
352}
353
354fn endpoint_name(path: &str) -> io::Result<interprocess::local_socket::Name<'_>> {
355 use interprocess::local_socket::prelude::*;
356
357 #[cfg(unix)]
358 {
359 use interprocess::local_socket::GenericFilePath;
360 path.to_fs_name::<GenericFilePath>()
361 }
362
363 #[cfg(windows)]
364 {
365 use interprocess::local_socket::GenericNamespaced;
366 path.to_ns_name::<GenericNamespaced>()
367 }
368}