running_process/broker/adopt.rs
1//! One-call broker adoption: negotiate → dial → ready-to-talk client (#433 R1).
2//!
3//! [`connect_to_backend`] returns a raw
4//! [`BackendConnection`] — a bare
5//! socket the consumer must still wrap in a [`FrameClient`] before it can send
6//! a single request. Every consumer (zccache, soldr, clud, fbuild) repeats the
7//! same three lines: check the disable env, call `connect_to_backend`, wrap the
8//! stream. [`BrokerSession::adopt`] is that recipe, owned once here so the
9//! contract is a single call:
10//!
11//! ```no_run
12//! use running_process::broker::adopt::BrokerSession;
13//! use running_process::broker::client::ConnectBackendRequest;
14//!
15//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
16//! let request = ConnectBackendRequest::new("broker.sock", "zccache", "1.11.20", "1.11.20");
17//! let mut session = BrokerSession::adopt(request)?;
18//! let reply = session.request(0x7A63, b"ping".to_vec())?;
19//! assert_eq!(reply.payload, b"pong");
20//! # Ok(()) }
21//! ```
22//!
23//! The blocking [`BrokerSession`] is the wire-of-record; the async
24//! [`AsyncBrokerSession`] (feature `client-async`, #433 R3) is a thin
25//! `spawn_blocking` wrapper so tokio daemons get the same one-call adoption
26//! without re-implementing the negotiation against `AsyncRead`/`AsyncWrite`.
27
28use crate::broker::backend_sdk::{FrameClient, FrameClientError};
29use crate::broker::client::{
30 broker_disabled_by_env, connect_to_backend, BackendConnection, BackendConnectionRoute,
31 BrokerClientError, BrokerDisableEnvError, ConnectBackendRequest,
32};
33use crate::broker::protocol::{Frame, Negotiated};
34
35/// A negotiated, dialed, and framed broker backend connection.
36///
37/// Produced by [`BrokerSession::adopt`]. Wraps the
38/// [`BackendConnection`] stream in a
39/// [`FrameClient`] so the caller can issue correlated request/response frames
40/// immediately, while still exposing how the connection was reached
41/// ([`route`](Self::route)), the cacheable [`endpoint`](Self::endpoint), and the
42/// broker's [`negotiated`](Self::negotiated) metadata.
43pub struct BrokerSession {
44 client: FrameClient,
45 route: BackendConnectionRoute,
46 endpoint: String,
47 negotiated: Option<Negotiated>,
48}
49
50impl BrokerSession {
51 /// Negotiate through the broker and return a ready-to-talk session.
52 ///
53 /// Honours the canonical escape hatch first: if
54 /// `RUNNING_PROCESS_DISABLE=1` is set, this returns
55 /// [`AdoptError::BrokerDisabled`] so the consumer falls back to its direct
56 /// path instead of silently dialing the broker. An invalid disable value
57 /// surfaces as [`AdoptError::DisableEnv`].
58 pub fn adopt(request: ConnectBackendRequest<'_>) -> Result<Self, AdoptError> {
59 if broker_disabled_by_env()? {
60 return Err(AdoptError::BrokerDisabled);
61 }
62 Ok(Self::from_connection(connect_to_backend(request)?))
63 }
64
65 fn from_connection(connection: BackendConnection) -> Self {
66 Self {
67 client: FrameClient::from_stream(connection.stream),
68 route: connection.route,
69 endpoint: connection.endpoint,
70 negotiated: connection.negotiated,
71 }
72 }
73
74 /// How the backend connection was reached.
75 pub fn route(&self) -> BackendConnectionRoute {
76 self.route
77 }
78
79 /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
80 pub fn endpoint(&self) -> &str {
81 &self.endpoint
82 }
83
84 /// Broker negotiation metadata, present when the broker path was used.
85 pub fn negotiated(&self) -> Option<&Negotiated> {
86 self.negotiated.as_ref()
87 }
88
89 /// Send one correlated request and await its response frame.
90 pub fn request(
91 &mut self,
92 payload_protocol: u32,
93 payload: Vec<u8>,
94 ) -> Result<Frame, FrameClientError> {
95 self.client.request(payload_protocol, payload)
96 }
97
98 /// Borrow the underlying frame client for advanced use.
99 pub fn client_mut(&mut self) -> &mut FrameClient {
100 &mut self.client
101 }
102
103 /// Consume the session and return the owned frame client.
104 pub fn into_client(self) -> FrameClient {
105 self.client
106 }
107
108 /// Consume the session and hand back the live negotiated socket as an
109 /// owned OS handle (#720).
110 ///
111 /// After adoption has driven the broker handshake to completion, a
112 /// consumer that wants to stop speaking the FrameV1 request/response wire
113 /// and run its own protocol over the same connection calls this to take
114 /// ownership of the raw socket. On Unix the result wraps an
115 /// `OwnedFd`; the Windows `OwnedHandle` path is deferred, so this returns
116 /// `IntoBackendIoError::WindowsUnsupported` there for now.
117 ///
118 /// Fails with [`IntoBackendIoError::BufferedResidual`] if the frame
119 /// reader has buffered response bytes the bare socket would not carry —
120 /// which never happens on a freshly adopted session that has issued no
121 /// [`request`](Self::request).
122 pub fn into_backend_io(self) -> Result<OwnedBackendIo, IntoBackendIoError> {
123 let buffered = self.client.buffered_len();
124 if buffered != 0 {
125 return Err(IntoBackendIoError::BufferedResidual { buffered });
126 }
127 OwnedBackendIo::from_local_socket_stream(self.client.into_stream())
128 }
129}
130
131/// A live negotiated backend socket handed back as an owned OS handle (#720).
132///
133/// Produced by [`BrokerSession::into_backend_io`] /
134/// `AsyncBrokerSession::into_backend_io`. On Unix it owns an `OwnedFd` the
135/// consumer can wrap in its own transport (e.g.
136/// `std::os::unix::net::UnixStream::from`); the Windows `OwnedHandle` path is
137/// deferred (#720), so the type is never constructed on Windows.
138#[derive(Debug)]
139pub struct OwnedBackendIo {
140 // The Windows handle path is deferred (#720). The type still exists so the
141 // `into_backend_io` signature is platform-stable, but it carries no handle
142 // on Windows and is only ever returned as `Err(WindowsUnsupported)`.
143 #[cfg(unix)]
144 fd: std::os::fd::OwnedFd,
145}
146
147impl OwnedBackendIo {
148 #[cfg(unix)]
149 fn from_local_socket_stream(
150 stream: interprocess::local_socket::Stream,
151 ) -> Result<Self, IntoBackendIoError> {
152 match stream {
153 interprocess::local_socket::Stream::UdSocket(uds) => Ok(Self {
154 fd: std::os::fd::OwnedFd::from(uds),
155 }),
156 }
157 }
158
159 #[cfg(windows)]
160 fn from_local_socket_stream(
161 _stream: interprocess::local_socket::Stream,
162 ) -> Result<Self, IntoBackendIoError> {
163 Err(IntoBackendIoError::WindowsUnsupported)
164 }
165
166 /// Consume and return the raw owned file descriptor.
167 #[cfg(unix)]
168 pub fn into_owned_fd(self) -> std::os::fd::OwnedFd {
169 self.fd
170 }
171}
172
173#[cfg(unix)]
174impl std::os::fd::AsFd for OwnedBackendIo {
175 fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
176 self.fd.as_fd()
177 }
178}
179
180/// Errors from [`BrokerSession::into_backend_io`] /
181/// [`AsyncBrokerSession::into_backend_io`].
182#[derive(Debug, thiserror::Error)]
183pub enum IntoBackendIoError {
184 /// The frame reader still holds buffered response bytes that the bare
185 /// socket would not carry, so the raw handle cannot be taken without
186 /// losing them.
187 #[error(
188 "frame client has {buffered} buffered response byte(s); cannot hand off the raw socket without losing them"
189 )]
190 BufferedResidual {
191 /// Number of bytes buffered by the frame reader.
192 buffered: usize,
193 },
194 /// The async frame client was poisoned by a prior request panic, so its
195 /// inner blocking client is gone.
196 #[cfg(feature = "client-async")]
197 #[error("async frame client was poisoned by a prior request panic")]
198 Poisoned,
199 /// `into_backend_io()` is not yet supported on Windows; the `OwnedHandle`
200 /// path is deferred (#720).
201 #[cfg(windows)]
202 #[error("into_backend_io() is not yet supported on Windows; the OwnedHandle path is deferred (#720)")]
203 WindowsUnsupported,
204}
205
206/// Errors from [`BrokerSession::adopt`] / `AsyncBrokerSession::adopt`.
207#[derive(Debug, thiserror::Error)]
208pub enum AdoptError {
209 /// `RUNNING_PROCESS_DISABLE=1` is set — the caller should use its direct
210 /// (non-broker) path. Not a failure of the broker itself.
211 #[error("broker disabled via RUNNING_PROCESS_DISABLE=1; use the direct path")]
212 BrokerDisabled,
213 /// The disable env var held an invalid value.
214 #[error(transparent)]
215 DisableEnv(#[from] BrokerDisableEnvError),
216 /// Broker negotiation or backend dial failed. Use
217 /// [`BrokerClientError::refusal_kind`] to branch on broker refusals.
218 #[error(transparent)]
219 Connect(#[from] BrokerClientError),
220 /// The async adoption worker thread failed to join (panicked or was
221 /// cancelled). Only reachable on the `client-async` path.
222 #[cfg(feature = "client-async")]
223 #[error("async adopt worker failed to join: {0}")]
224 AsyncJoin(String),
225}
226
227/// Owned inputs for [`AsyncBrokerSession::adopt`] (#433 R3).
228///
229/// The blocking [`ConnectBackendRequest`] borrows `&str`, which cannot cross a
230/// `spawn_blocking` boundary. This owned mirror carries the same fields by
231/// value; [`AsyncBrokerSession::adopt`] reconstructs a borrowed
232/// [`ConnectBackendRequest`] from it inside the worker thread.
233#[cfg(feature = "client-async")]
234#[derive(Clone, Debug)]
235pub struct OwnedConnectRequest {
236 /// Broker pipe/socket endpoint.
237 pub broker_endpoint: String,
238 /// Logical service name, such as `zccache`.
239 pub service_name: String,
240 /// Backend version the caller wants.
241 pub wanted_version: String,
242 /// Version of the caller's own service binary.
243 pub self_version: String,
244 /// Previously negotiated backend endpoint, if the caller has one.
245 pub cached_backend_endpoint: Option<String>,
246 /// Informational client version.
247 pub client_version: String,
248 /// Client library name for diagnostics.
249 pub client_lib_name: String,
250 /// Client library version for diagnostics.
251 pub client_lib_version: String,
252 /// Proposed keepalive interval.
253 pub client_keepalive_secs: u64,
254 /// Opt in to adopting a handed-off backend connection.
255 pub adopt_handed_off_connection: bool,
256 /// Deadline for the handoff-ready relay when adoption is enabled.
257 pub handoff_ready_timeout: std::time::Duration,
258}
259
260#[cfg(feature = "client-async")]
261impl OwnedConnectRequest {
262 /// Build an owned request with running-process defaults.
263 pub fn new(
264 broker_endpoint: impl Into<String>,
265 service_name: impl Into<String>,
266 wanted_version: impl Into<String>,
267 self_version: impl Into<String>,
268 ) -> Self {
269 Self {
270 broker_endpoint: broker_endpoint.into(),
271 service_name: service_name.into(),
272 wanted_version: wanted_version.into(),
273 self_version: self_version.into(),
274 cached_backend_endpoint: None,
275 client_version: String::new(),
276 client_lib_name: "running-process".to_string(),
277 client_lib_version: env!("CARGO_PKG_VERSION").to_string(),
278 client_keepalive_secs: 0,
279 adopt_handed_off_connection: false,
280 handoff_ready_timeout: crate::broker::client::DEFAULT_HANDOFF_READY_TIMEOUT,
281 }
282 }
283
284 fn as_request(&self) -> ConnectBackendRequest<'_> {
285 ConnectBackendRequest {
286 broker_endpoint: &self.broker_endpoint,
287 service_name: &self.service_name,
288 wanted_version: &self.wanted_version,
289 self_version: &self.self_version,
290 cached_backend_endpoint: self.cached_backend_endpoint.as_deref(),
291 client_version: &self.client_version,
292 client_lib_name: &self.client_lib_name,
293 client_lib_version: &self.client_lib_version,
294 client_keepalive_secs: self.client_keepalive_secs,
295 adopt_handed_off_connection: self.adopt_handed_off_connection,
296 handoff_ready_timeout: self.handoff_ready_timeout,
297 }
298 }
299}
300
301/// Async counterpart of [`BrokerSession`] for tokio daemons (#433 R3).
302///
303/// Runs the blocking negotiation on `tokio::task::spawn_blocking` and wraps the
304/// resulting [`FrameClient`] in an [`AsyncFrameClient`] so every later request
305/// is `.await`-able without a manual `spawn_blocking` at the call site.
306///
307/// [`AsyncFrameClient`]: crate::broker::backend_sdk::AsyncFrameClient
308#[cfg(feature = "client-async")]
309pub struct AsyncBrokerSession {
310 client: crate::broker::backend_sdk::AsyncFrameClient,
311 route: BackendConnectionRoute,
312 endpoint: String,
313 negotiated: Option<Negotiated>,
314}
315
316#[cfg(feature = "client-async")]
317impl AsyncBrokerSession {
318 /// Negotiate through the broker on a blocking worker and return a
319 /// ready-to-talk async session.
320 pub async fn adopt(request: OwnedConnectRequest) -> Result<Self, AdoptError> {
321 let joined = tokio::task::spawn_blocking(move || {
322 BrokerSession::adopt(request.as_request()).map(|session| {
323 (
324 session.route,
325 session.endpoint,
326 session.negotiated,
327 session.client,
328 )
329 })
330 })
331 .await
332 .map_err(|err| AdoptError::AsyncJoin(err.to_string()))?;
333 let (route, endpoint, negotiated, client) = joined?;
334 Ok(Self {
335 client: crate::broker::backend_sdk::AsyncFrameClient::from_blocking(client),
336 route,
337 endpoint,
338 negotiated,
339 })
340 }
341
342 /// How the backend connection was reached.
343 pub fn route(&self) -> BackendConnectionRoute {
344 self.route
345 }
346
347 /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
348 pub fn endpoint(&self) -> &str {
349 &self.endpoint
350 }
351
352 /// Broker negotiation metadata, present when the broker path was used.
353 pub fn negotiated(&self) -> Option<&Negotiated> {
354 self.negotiated.as_ref()
355 }
356
357 /// Send one correlated request and await its response frame.
358 pub async fn request(
359 &mut self,
360 payload_protocol: u32,
361 payload: Vec<u8>,
362 ) -> Result<Frame, FrameClientError> {
363 self.client.request(payload_protocol, payload).await
364 }
365
366 /// Consume the session and return the owned async frame client.
367 pub fn into_client(self) -> crate::broker::backend_sdk::AsyncFrameClient {
368 self.client
369 }
370
371 /// Consume the session and hand back the live negotiated socket as an
372 /// owned OS handle (#720).
373 ///
374 /// Async twin of [`BrokerSession::into_backend_io`]. No `.await` is
375 /// needed: the inner blocking client already owns the connected socket, so
376 /// taking the raw handle out is a synchronous unwrap. Fails with
377 /// [`IntoBackendIoError::Poisoned`] if a prior [`request`](Self::request)
378 /// panicked inside `spawn_blocking` and left the client slot empty.
379 pub fn into_backend_io(self) -> Result<OwnedBackendIo, IntoBackendIoError> {
380 let client = self
381 .client
382 .into_blocking()
383 .ok_or(IntoBackendIoError::Poisoned)?;
384 let buffered = client.buffered_len();
385 if buffered != 0 {
386 return Err(IntoBackendIoError::BufferedResidual { buffered });
387 }
388 OwnedBackendIo::from_local_socket_stream(client.into_stream())
389 }
390}