running_process/broker/backend_handle.rs
1//! Public handle for a verified backend daemon.
2//!
3//! `BackendHandle` is the shared probe-and-verify abstraction for broker-managed
4//! daemons and direct-daemon consumers. A cache manifest records where a daemon
5//! is listening and which process identity it claimed when the manifest was
6//! written. Probing turns that persisted identity into an owned handle only
7//! after the endpoint tuple, active IPC response, current boot ID, process
8//! liveness, executable path, and executable digest still match.
9//!
10//! Consumers should use this module at the boundary where they would otherwise
11//! trust a manifest, PID file, socket path, or named-pipe path from disk.
12//!
13//! ```
14//! use running_process::broker::backend_handle::BackendHandle;
15//! use running_process::broker::protocol::CacheManifest;
16//!
17//! fn existing_backend(manifest: &CacheManifest) -> Option<BackendHandle> {
18//! let handle = BackendHandle::probe_manifest(manifest)?;
19//! handle.is_alive().then_some(handle)
20//! }
21//! ```
22//!
23//! Direct-daemon consumers that just spawned a backend can persist
24//! [`DaemonProcess`] and later probe it without duplicating the liveness and
25//! executable-hash checks:
26//!
27//! ```no_run
28//! use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
29//! use running_process::broker::protocol::Endpoint;
30//!
31//! # fn example() -> running_process::broker::backend_handle::Result<()> {
32//! let endpoint = Endpoint {
33//! namespace_id: "local-dev".to_owned(),
34//! path: "running-process-example.sock".to_owned(),
35//! };
36//! let daemon = DaemonProcess::current_process(endpoint.clone(), Some(300))?;
37//!
38//! let handle =
39//! BackendHandle::probe_with_service("soldr", "1.2.3", &endpoint, &daemon)?;
40//! assert_eq!(handle.service_name, "soldr");
41//! # Ok(())
42//! # }
43//! ```
44
45use std::io;
46use std::time::{Duration, Instant};
47
48use crate::broker::backend_lifecycle::identity::IdentityError;
49use crate::broker::backend_lifecycle::probe::{self, ProbeError};
50use crate::broker::backend_lifecycle::verify_pid::{self, ProcessHandle, VerifyPidError};
51use crate::broker::protocol::{CacheManifest, Endpoint};
52
53pub use crate::broker::backend_lifecycle::DaemonProcess;
54
55/// Result type returned by backend-handle operations.
56pub type Result<T> = std::result::Result<T, BackendHandleError>;
57
58/// A verified handle to a running backend daemon.
59///
60/// The handle carries the daemon identity needed to defend against stale
61/// manifests and PID recycling before consumers connect to the IPC endpoint.
62///
63/// A handle is created only through one of the `probe*` constructors. The
64/// constructor performs all identity checks first; successful callers may then
65/// use [`Self::is_alive`] for a cheap liveness check or [`Self::connect`] to
66/// open a fresh local-socket connection.
67pub struct BackendHandle {
68 /// Logical service name from the manifest or direct probe caller.
69 pub service_name: String,
70 /// Service version from the manifest or direct probe caller.
71 pub service_version: String,
72 /// Verified daemon process identity.
73 pub daemon_process: DaemonProcess,
74 #[cfg(unix)]
75 pub(crate) pid_handle: Option<ProcessHandle>,
76 #[cfg(windows)]
77 pub(crate) process_handle: Option<ProcessHandle>,
78}
79
80impl BackendHandle {
81 /// Connect to an existing backend by endpoint and verify process identity.
82 ///
83 /// This probe verifies the endpoint identity tuple, requires the endpoint
84 /// to answer the nonce-based IPC identity probe, then verifies current boot
85 /// ID, process liveness, executable path, and executable SHA-256. It
86 /// returns `None` for stale manifests, dead PIDs, mismatched daemon
87 /// binaries, or endpoints that do not answer as the expected backend.
88 ///
89 /// Use this when the caller already has service metadata elsewhere and only
90 /// needs to know whether the daemon identity is still valid.
91 ///
92 /// **BLOCKING.** Performs synchronous IPC up to
93 /// [`probe::DEFAULT_ENDPOINT_PROBE_TIMEOUT`]
94 /// (500 ms). From a tokio task, call from `spawn_blocking` or
95 /// switch to [`Self::probe_async`] (requires the `client-async`
96 /// feature).
97 ///
98 /// ```no_run
99 /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
100 /// use running_process::broker::protocol::Endpoint;
101 ///
102 /// # fn example(endpoint: Endpoint, expected: DaemonProcess) {
103 /// if let Some(handle) = BackendHandle::probe(&endpoint, &expected) {
104 /// assert!(handle.is_alive());
105 /// }
106 /// # }
107 /// ```
108 pub fn probe(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
109 Self::probe_with_service("", "", endpoint, expected).ok()
110 }
111
112 /// Async counterpart of [`Self::probe`] (#414).
113 ///
114 /// Performs the same identity checks but all I/O runs on the
115 /// current tokio runtime, so tokio daemons (zccache, soldr, clud)
116 /// can call this directly instead of wrapping in `spawn_blocking`.
117 ///
118 /// Available when the `client-async` cargo feature is enabled.
119 #[cfg(feature = "client-async")]
120 pub async fn probe_async(endpoint: &Endpoint, expected: &DaemonProcess) -> Option<Self> {
121 Self::probe_with_service_async("", "", endpoint, expected)
122 .await
123 .ok()
124 }
125
126 /// Probe an existing backend and attach service metadata to the handle.
127 ///
128 /// This is the preferred constructor for direct-daemon consumers because it
129 /// preserves the logical service tuple alongside the verified process
130 /// identity.
131 ///
132 /// **BLOCKING.** Performs synchronous IPC up to
133 /// [`probe::DEFAULT_ENDPOINT_PROBE_TIMEOUT`]
134 /// (500 ms). From a tokio task, call from `spawn_blocking` or use
135 /// [`Self::probe_with_service_async`] (requires the
136 /// `client-async` feature) instead — calling this directly from
137 /// an async context will block the runtime worker thread.
138 ///
139 /// ```no_run
140 /// use running_process::broker::backend_handle::{BackendHandle, DaemonProcess};
141 /// use running_process::broker::protocol::Endpoint;
142 ///
143 /// # fn example(endpoint: Endpoint, expected: DaemonProcess)
144 /// # -> running_process::broker::backend_handle::Result<BackendHandle>
145 /// # {
146 /// BackendHandle::probe_with_service("zccache", "0.8.0", &endpoint, &expected)
147 /// # }
148 /// ```
149 pub fn probe_with_service(
150 service_name: impl Into<String>,
151 service_version: impl Into<String>,
152 endpoint: &Endpoint,
153 expected: &DaemonProcess,
154 ) -> Result<Self> {
155 let process_handle = probe::probe_endpoint(endpoint, expected)?;
156 Ok(Self::from_verified(
157 service_name.into(),
158 service_version.into(),
159 expected.clone(),
160 process_handle,
161 ))
162 }
163
164 /// Async counterpart of [`Self::probe_with_service`] (#414).
165 ///
166 /// Performs the same identity checks (endpoint tuple, PID, exe
167 /// path, exe SHA-256, boot ID, and the live nonce probe) but all
168 /// I/O runs on the current tokio runtime. This is the preferred
169 /// entry point for tokio daemons (zccache, soldr, clud) — calling
170 /// the blocking [`Self::probe_with_service`] from an async
171 /// context blocks the runtime worker thread.
172 ///
173 /// Available when the `client-async` cargo feature is enabled.
174 ///
175 /// ```no_run
176 /// # #[cfg(feature = "client-async")]
177 /// # async fn example(
178 /// # endpoint: running_process::broker::protocol::Endpoint,
179 /// # expected: running_process::broker::backend_handle::DaemonProcess,
180 /// # ) -> running_process::broker::backend_handle::Result<()> {
181 /// use running_process::broker::backend_handle::BackendHandle;
182 ///
183 /// let handle = BackendHandle::probe_with_service_async(
184 /// "zccache", "0.8.0", &endpoint, &expected,
185 /// ).await?;
186 /// assert!(handle.is_alive());
187 /// # Ok(()) }
188 /// ```
189 #[cfg(feature = "client-async")]
190 pub async fn probe_with_service_async(
191 service_name: impl Into<String>,
192 service_version: impl Into<String>,
193 endpoint: &Endpoint,
194 expected: &DaemonProcess,
195 ) -> Result<Self> {
196 let process_handle =
197 crate::broker::backend_lifecycle::probe_async::probe_endpoint_async(endpoint, expected)
198 .await?;
199 Ok(Self::from_verified(
200 service_name.into(),
201 service_version.into(),
202 expected.clone(),
203 process_handle,
204 ))
205 }
206
207 /// Probe the `current_daemon` recorded in a cache manifest.
208 ///
209 /// Returns `None` when the manifest has no daemon entry or when the daemon
210 /// entry no longer matches a live process on the current boot.
211 ///
212 /// ```
213 /// use running_process::broker::backend_handle::BackendHandle;
214 /// use running_process::broker::protocol::CacheManifest;
215 ///
216 /// # fn example(manifest: &CacheManifest) {
217 /// match BackendHandle::probe_manifest(manifest) {
218 /// Some(handle) if handle.is_alive() => {
219 /// // Reuse the verified backend.
220 /// }
221 /// _ => {
222 /// // Spawn or discover a replacement backend.
223 /// }
224 /// }
225 /// # }
226 /// ```
227 pub fn probe_manifest(manifest: &CacheManifest) -> Option<Self> {
228 Self::try_from_manifest(manifest).ok().flatten()
229 }
230
231 /// Fallible variant of [`Self::probe_manifest`] that preserves parse errors.
232 ///
233 /// Use this in maintenance tools and diagnostics where malformed manifest
234 /// identities should be reported separately from a normal cache miss.
235 pub fn try_from_manifest(manifest: &CacheManifest) -> Result<Option<Self>> {
236 let Some(daemon_process) = DaemonProcess::from_manifest_current_daemon(manifest)? else {
237 return Ok(None);
238 };
239 let handle = Self::probe_with_service(
240 manifest.service_name.clone(),
241 manifest.service_version.clone(),
242 &daemon_process.ipc_endpoint,
243 &daemon_process,
244 )?;
245 Ok(Some(handle))
246 }
247
248 /// Check liveness without opening a new IPC connection.
249 ///
250 /// On platforms with an owned process-handle primitive, this checks the
251 /// handle captured during probing. Otherwise it falls back to opening the
252 /// process ID again.
253 pub fn is_alive(&self) -> bool {
254 self.platform_handle()
255 .map(|handle| handle.is_alive())
256 .unwrap_or_else(|| verify_pid::process_is_alive(self.daemon_process.pid))
257 }
258
259 /// Open a fresh IPC connection to this backend.
260 ///
261 /// The process identity is verified when the handle is created. Callers that
262 /// cache handles for a long time should call [`Self::is_alive`] or reprobe
263 /// from the latest manifest before opening a connection.
264 ///
265 /// ```no_run
266 /// use running_process::broker::backend_handle::BackendHandle;
267 ///
268 /// async fn connect_to_verified_backend(
269 /// handle: &BackendHandle,
270 /// ) -> running_process::broker::backend_handle::Result<()> {
271 /// let connection = handle.connect().await?;
272 /// let _stream = connection.into_inner();
273 /// Ok(())
274 /// }
275 /// ```
276 pub async fn connect(&self) -> Result<Connection> {
277 Connection::connect(&self.daemon_process.ipc_endpoint).map_err(BackendHandleError::Connect)
278 }
279
280 /// Duplicate a broker-owned pipe handle into this verified backend process.
281 ///
282 /// This is the Windows bridge between `BackendHandle` identity verification
283 /// and the optional Phase 6 `DuplicateHandle` transport. The caller still
284 /// owns delivery of the paired handoff token to the backend and must wait
285 /// for backend acknowledgement before reporting handoff success.
286 #[cfg(windows)]
287 pub fn try_duplicate_windows_handoff_handle(
288 &self,
289 pipe_handle: crate::broker::server::handoff::WindowsHandleValue,
290 handoff_token: crate::broker::server::handoff::HandoffToken,
291 ) -> crate::broker::server::handoff::DuplicateHandleResult {
292 let attempt = crate::broker::server::handoff::DuplicateHandleAttempt::new(
293 pipe_handle,
294 self.daemon_process.pid,
295 handoff_token,
296 );
297 crate::broker::server::handoff::try_duplicate_handle(&attempt)
298 }
299
300 /// Send a graceful shutdown signal and wait until the process exits.
301 ///
302 /// On Windows this foundation returns `GracefulTerminateUnsupported` until
303 /// the broker shutdown request protocol lands.
304 ///
305 /// Dropping the handle without calling this method leaves the backend
306 /// running.
307 pub async fn shutdown(self, timeout: Duration) -> Result<()> {
308 verify_pid::signal_terminate(self.daemon_process.pid)?;
309 let deadline = Instant::now() + timeout;
310 while Instant::now() < deadline {
311 if !self.is_alive() {
312 return Ok(());
313 }
314 std::thread::sleep(Duration::from_millis(20));
315 }
316 Err(BackendHandleError::ShutdownTimeout {
317 pid: self.daemon_process.pid,
318 })
319 }
320
321 /// Force-kill the daemon process.
322 ///
323 /// This is the last-resort teardown path for a daemon that ignored graceful
324 /// shutdown or whose IPC protocol is unavailable.
325 pub fn force_kill(self) -> Result<()> {
326 verify_pid::force_kill_pid(self.daemon_process.pid)?;
327 Ok(())
328 }
329
330 fn from_verified(
331 service_name: String,
332 service_version: String,
333 daemon_process: DaemonProcess,
334 process_handle: ProcessHandle,
335 ) -> Self {
336 #[cfg(unix)]
337 {
338 Self {
339 service_name,
340 service_version,
341 daemon_process,
342 pid_handle: Some(process_handle),
343 }
344 }
345
346 #[cfg(windows)]
347 {
348 Self {
349 service_name,
350 service_version,
351 daemon_process,
352 process_handle: Some(process_handle),
353 }
354 }
355 }
356
357 fn platform_handle(&self) -> Option<&ProcessHandle> {
358 #[cfg(unix)]
359 {
360 self.pid_handle.as_ref()
361 }
362
363 #[cfg(windows)]
364 {
365 self.process_handle.as_ref()
366 }
367 }
368}
369
370/// A fresh IPC connection to a verified backend daemon.
371///
372/// `Connection` is intentionally thin: `BackendHandle` owns identity and
373/// liveness, while this type owns a single local-socket stream opened from the
374/// verified endpoint.
375pub struct Connection {
376 stream: interprocess::local_socket::Stream,
377}
378
379impl Connection {
380 /// Connect to a backend endpoint using the platform local-socket name type.
381 pub fn connect(endpoint: &Endpoint) -> io::Result<Self> {
382 if endpoint.path.is_empty() {
383 return Err(io::Error::new(
384 io::ErrorKind::InvalidInput,
385 "backend endpoint path is empty",
386 ));
387 }
388 let name = endpoint_name(&endpoint.path)?;
389
390 use interprocess::local_socket::traits::Stream as _;
391 let stream = interprocess::local_socket::Stream::connect(name)?;
392 Ok(Self { stream })
393 }
394
395 /// Return the underlying `interprocess` stream.
396 pub fn into_inner(self) -> interprocess::local_socket::Stream {
397 self.stream
398 }
399}
400
401/// Errors returned by `BackendHandle`.
402#[derive(Debug, thiserror::Error)]
403pub enum BackendHandleError {
404 /// Daemon identity normalization failed.
405 #[error(transparent)]
406 Identity(#[from] IdentityError),
407 /// Endpoint/process probing failed.
408 #[error(transparent)]
409 Probe(#[from] ProbeError),
410 /// Opening an IPC connection failed.
411 #[error("backend IPC connection failed: {0}")]
412 Connect(io::Error),
413 /// Process verification or signalling failed.
414 #[error(transparent)]
415 VerifyPid(#[from] VerifyPidError),
416 /// Graceful shutdown timed out.
417 #[error("backend shutdown timed out for pid {pid}")]
418 ShutdownTimeout {
419 /// Process ID that did not exit before the timeout.
420 pid: u32,
421 },
422}
423
424fn endpoint_name(path: &str) -> io::Result<interprocess::local_socket::Name<'_>> {
425 use interprocess::local_socket::prelude::*;
426
427 #[cfg(unix)]
428 {
429 use interprocess::local_socket::GenericFilePath;
430 path.to_fs_name::<GenericFilePath>()
431 }
432
433 #[cfg(windows)]
434 {
435 use interprocess::local_socket::GenericNamespaced;
436 path.to_ns_name::<GenericNamespaced>()
437 }
438}