running_process/broker/adopt.rs
1//! One-call broker adoption: negotiate → dial → ready-to-talk client (#433 R1).
2//!
3//! [`connect_to_backend`] returns a raw
4//! [`BackendConnection`] — a bare
5//! socket the consumer must still wrap in a [`FrameClient`] before it can send
6//! a single request. Every consumer (zccache, soldr, clud, fbuild) repeats the
7//! same three lines: check the disable env, call `connect_to_backend`, wrap the
8//! stream. [`BrokerSession::adopt`] is that recipe, owned once here so the
9//! contract is a single call:
10//!
11//! ```no_run
12//! use running_process::broker::adopt::BrokerSession;
13//! use running_process::broker::client::ConnectBackendRequest;
14//!
15//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
16//! let request = ConnectBackendRequest::new("broker.sock", "zccache", "1.11.20", "1.11.20");
17//! let mut session = BrokerSession::adopt(request)?;
18//! let reply = session.request(0x7A63, b"ping".to_vec())?;
19//! assert_eq!(reply.payload, b"pong");
20//! # Ok(()) }
21//! ```
22//!
23//! The blocking [`BrokerSession`] is the wire-of-record; the async
24//! [`AsyncBrokerSession`] (feature `client-async`, #433 R3) is a thin
25//! `spawn_blocking` wrapper so tokio daemons get the same one-call adoption
26//! without re-implementing the negotiation against `AsyncRead`/`AsyncWrite`.
27
28use crate::broker::backend_sdk::{FrameClient, FrameClientError};
29use crate::broker::client::{
30 broker_disabled_by_env, connect_to_backend, BackendConnection, BackendConnectionRoute,
31 BrokerClientError, BrokerDisableEnvError, ConnectBackendRequest,
32};
33use crate::broker::protocol::{Frame, Negotiated};
34
35/// A negotiated, dialed, and framed broker backend connection.
36///
37/// Produced by [`BrokerSession::adopt`]. Wraps the
38/// [`BackendConnection`] stream in a
39/// [`FrameClient`] so the caller can issue correlated request/response frames
40/// immediately, while still exposing how the connection was reached
41/// ([`route`](Self::route)), the cacheable [`endpoint`](Self::endpoint), and the
42/// broker's [`negotiated`](Self::negotiated) metadata.
43pub struct BrokerSession {
44 client: FrameClient,
45 route: BackendConnectionRoute,
46 endpoint: String,
47 negotiated: Option<Negotiated>,
48}
49
50impl BrokerSession {
51 /// Negotiate through the broker and return a ready-to-talk session.
52 ///
53 /// Honours the canonical escape hatch first: if
54 /// `RUNNING_PROCESS_DISABLE=1` is set, this returns
55 /// [`AdoptError::BrokerDisabled`] so the consumer falls back to its direct
56 /// path instead of silently dialing the broker. An invalid disable value
57 /// surfaces as [`AdoptError::DisableEnv`].
58 pub fn adopt(request: ConnectBackendRequest<'_>) -> Result<Self, AdoptError> {
59 if broker_disabled_by_env()? {
60 return Err(AdoptError::BrokerDisabled);
61 }
62 Ok(Self::from_connection(connect_to_backend(request)?))
63 }
64
65 fn from_connection(connection: BackendConnection) -> Self {
66 Self {
67 client: FrameClient::from_stream(connection.stream),
68 route: connection.route,
69 endpoint: connection.endpoint,
70 negotiated: connection.negotiated,
71 }
72 }
73
74 /// How the backend connection was reached.
75 pub fn route(&self) -> BackendConnectionRoute {
76 self.route
77 }
78
79 /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
80 pub fn endpoint(&self) -> &str {
81 &self.endpoint
82 }
83
84 /// Broker negotiation metadata, present when the broker path was used.
85 pub fn negotiated(&self) -> Option<&Negotiated> {
86 self.negotiated.as_ref()
87 }
88
89 /// Send one correlated request and await its response frame.
90 pub fn request(
91 &mut self,
92 payload_protocol: u32,
93 payload: Vec<u8>,
94 ) -> Result<Frame, FrameClientError> {
95 self.client.request(payload_protocol, payload)
96 }
97
98 /// Borrow the underlying frame client for advanced use.
99 pub fn client_mut(&mut self) -> &mut FrameClient {
100 &mut self.client
101 }
102
103 /// Consume the session and return the owned frame client.
104 pub fn into_client(self) -> FrameClient {
105 self.client
106 }
107}
108
109/// Errors from [`BrokerSession::adopt`] / [`AsyncBrokerSession::adopt`].
110#[derive(Debug, thiserror::Error)]
111pub enum AdoptError {
112 /// `RUNNING_PROCESS_DISABLE=1` is set — the caller should use its direct
113 /// (non-broker) path. Not a failure of the broker itself.
114 #[error("broker disabled via RUNNING_PROCESS_DISABLE=1; use the direct path")]
115 BrokerDisabled,
116 /// The disable env var held an invalid value.
117 #[error(transparent)]
118 DisableEnv(#[from] BrokerDisableEnvError),
119 /// Broker negotiation or backend dial failed. Use
120 /// [`BrokerClientError::refusal_kind`] to branch on broker refusals.
121 #[error(transparent)]
122 Connect(#[from] BrokerClientError),
123 /// The async adoption worker thread failed to join (panicked or was
124 /// cancelled). Only reachable on the `client-async` path.
125 #[cfg(feature = "client-async")]
126 #[error("async adopt worker failed to join: {0}")]
127 AsyncJoin(String),
128}
129
130/// Owned inputs for [`AsyncBrokerSession::adopt`] (#433 R3).
131///
132/// The blocking [`ConnectBackendRequest`] borrows `&str`, which cannot cross a
133/// `spawn_blocking` boundary. This owned mirror carries the same fields by
134/// value; [`AsyncBrokerSession::adopt`] reconstructs a borrowed
135/// [`ConnectBackendRequest`] from it inside the worker thread.
136#[cfg(feature = "client-async")]
137#[derive(Clone, Debug)]
138pub struct OwnedConnectRequest {
139 /// Broker pipe/socket endpoint.
140 pub broker_endpoint: String,
141 /// Logical service name, such as `zccache`.
142 pub service_name: String,
143 /// Backend version the caller wants.
144 pub wanted_version: String,
145 /// Version of the caller's own service binary.
146 pub self_version: String,
147 /// Previously negotiated backend endpoint, if the caller has one.
148 pub cached_backend_endpoint: Option<String>,
149 /// Informational client version.
150 pub client_version: String,
151 /// Client library name for diagnostics.
152 pub client_lib_name: String,
153 /// Client library version for diagnostics.
154 pub client_lib_version: String,
155 /// Proposed keepalive interval.
156 pub client_keepalive_secs: u64,
157 /// Opt in to adopting a handed-off backend connection.
158 pub adopt_handed_off_connection: bool,
159 /// Deadline for the handoff-ready relay when adoption is enabled.
160 pub handoff_ready_timeout: std::time::Duration,
161}
162
163#[cfg(feature = "client-async")]
164impl OwnedConnectRequest {
165 /// Build an owned request with running-process defaults.
166 pub fn new(
167 broker_endpoint: impl Into<String>,
168 service_name: impl Into<String>,
169 wanted_version: impl Into<String>,
170 self_version: impl Into<String>,
171 ) -> Self {
172 Self {
173 broker_endpoint: broker_endpoint.into(),
174 service_name: service_name.into(),
175 wanted_version: wanted_version.into(),
176 self_version: self_version.into(),
177 cached_backend_endpoint: None,
178 client_version: String::new(),
179 client_lib_name: "running-process".to_string(),
180 client_lib_version: env!("CARGO_PKG_VERSION").to_string(),
181 client_keepalive_secs: 0,
182 adopt_handed_off_connection: false,
183 handoff_ready_timeout: crate::broker::client::DEFAULT_HANDOFF_READY_TIMEOUT,
184 }
185 }
186
187 fn as_request(&self) -> ConnectBackendRequest<'_> {
188 ConnectBackendRequest {
189 broker_endpoint: &self.broker_endpoint,
190 service_name: &self.service_name,
191 wanted_version: &self.wanted_version,
192 self_version: &self.self_version,
193 cached_backend_endpoint: self.cached_backend_endpoint.as_deref(),
194 client_version: &self.client_version,
195 client_lib_name: &self.client_lib_name,
196 client_lib_version: &self.client_lib_version,
197 client_keepalive_secs: self.client_keepalive_secs,
198 adopt_handed_off_connection: self.adopt_handed_off_connection,
199 handoff_ready_timeout: self.handoff_ready_timeout,
200 }
201 }
202}
203
204/// Async counterpart of [`BrokerSession`] for tokio daemons (#433 R3).
205///
206/// Runs the blocking negotiation on `tokio::task::spawn_blocking` and wraps the
207/// resulting [`FrameClient`] in an [`AsyncFrameClient`] so every later request
208/// is `.await`-able without a manual `spawn_blocking` at the call site.
209///
210/// [`AsyncFrameClient`]: crate::broker::backend_sdk::AsyncFrameClient
211#[cfg(feature = "client-async")]
212pub struct AsyncBrokerSession {
213 client: crate::broker::backend_sdk::AsyncFrameClient,
214 route: BackendConnectionRoute,
215 endpoint: String,
216 negotiated: Option<Negotiated>,
217}
218
219#[cfg(feature = "client-async")]
220impl AsyncBrokerSession {
221 /// Negotiate through the broker on a blocking worker and return a
222 /// ready-to-talk async session.
223 pub async fn adopt(request: OwnedConnectRequest) -> Result<Self, AdoptError> {
224 let joined = tokio::task::spawn_blocking(move || {
225 BrokerSession::adopt(request.as_request()).map(|session| {
226 (
227 session.route,
228 session.endpoint,
229 session.negotiated,
230 session.client,
231 )
232 })
233 })
234 .await
235 .map_err(|err| AdoptError::AsyncJoin(err.to_string()))?;
236 let (route, endpoint, negotiated, client) = joined?;
237 Ok(Self {
238 client: crate::broker::backend_sdk::AsyncFrameClient::from_blocking(client),
239 route,
240 endpoint,
241 negotiated,
242 })
243 }
244
245 /// How the backend connection was reached.
246 pub fn route(&self) -> BackendConnectionRoute {
247 self.route
248 }
249
250 /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
251 pub fn endpoint(&self) -> &str {
252 &self.endpoint
253 }
254
255 /// Broker negotiation metadata, present when the broker path was used.
256 pub fn negotiated(&self) -> Option<&Negotiated> {
257 self.negotiated.as_ref()
258 }
259
260 /// Send one correlated request and await its response frame.
261 pub async fn request(
262 &mut self,
263 payload_protocol: u32,
264 payload: Vec<u8>,
265 ) -> Result<Frame, FrameClientError> {
266 self.client.request(payload_protocol, payload).await
267 }
268
269 /// Consume the session and return the owned async frame client.
270 pub fn into_client(self) -> crate::broker::backend_sdk::AsyncFrameClient {
271 self.client
272 }
273}