Skip to main content

running_process/broker/
adopt.rs

1//! One-call broker adoption: negotiate → dial → ready-to-talk client (#433 R1).
2//!
3//! [`connect_to_backend`] returns a raw
4//! [`BackendConnection`] — a bare
5//! socket the consumer must still wrap in a [`FrameClient`] before it can send
6//! a single request. Every consumer (zccache, soldr, clud, fbuild) repeats the
7//! same three lines: check the disable env, call `connect_to_backend`, wrap the
8//! stream. [`BrokerSession::adopt`] is that recipe, owned once here so the
9//! contract is a single call:
10//!
11//! ```no_run
12//! use running_process::broker::adopt::BrokerSession;
13//! use running_process::broker::client::ConnectBackendRequest;
14//!
15//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
16//! let request = ConnectBackendRequest::new("broker.sock", "zccache", "1.11.20", "1.11.20");
17//! let mut session = BrokerSession::adopt(request)?;
18//! let reply = session.request(0x7A63, b"ping".to_vec())?;
19//! assert_eq!(reply.payload, b"pong");
20//! # Ok(()) }
21//! ```
22//!
23//! The blocking [`BrokerSession`] is the wire-of-record; the async
24//! [`AsyncBrokerSession`] (feature `client-async`, #433 R3) is a thin
25//! `spawn_blocking` wrapper so tokio daemons get the same one-call adoption
26//! without re-implementing the negotiation against `AsyncRead`/`AsyncWrite`.
27
28use crate::broker::backend_sdk::{FrameClient, FrameClientError};
29use crate::broker::client::{
30    broker_disabled_by_env, connect_to_backend, BackendConnection, BackendConnectionRoute,
31    BrokerClientError, BrokerDisableEnvError, ConnectBackendRequest,
32};
33use crate::broker::protocol::{Frame, Negotiated};
34
35/// A negotiated, dialed, and framed broker backend connection.
36///
37/// Produced by [`BrokerSession::adopt`]. Wraps the
38/// [`BackendConnection`] stream in a
39/// [`FrameClient`] so the caller can issue correlated request/response frames
40/// immediately, while still exposing how the connection was reached
41/// ([`route`](Self::route)), the cacheable [`endpoint`](Self::endpoint), and the
42/// broker's [`negotiated`](Self::negotiated) metadata.
43pub struct BrokerSession {
44    client: FrameClient,
45    route: BackendConnectionRoute,
46    endpoint: String,
47    negotiated: Option<Negotiated>,
48}
49
50impl BrokerSession {
51    /// Negotiate through the broker and return a ready-to-talk session.
52    ///
53    /// Honours the canonical escape hatch first: if
54    /// `RUNNING_PROCESS_DISABLE=1` is set, this returns
55    /// [`AdoptError::BrokerDisabled`] so the consumer falls back to its direct
56    /// path instead of silently dialing the broker. An invalid disable value
57    /// surfaces as [`AdoptError::DisableEnv`].
58    pub fn adopt(request: ConnectBackendRequest<'_>) -> Result<Self, AdoptError> {
59        if broker_disabled_by_env()? {
60            return Err(AdoptError::BrokerDisabled);
61        }
62        Ok(Self::from_connection(connect_to_backend(request)?))
63    }
64
65    fn from_connection(connection: BackendConnection) -> Self {
66        Self {
67            client: FrameClient::from_stream(connection.stream),
68            route: connection.route,
69            endpoint: connection.endpoint,
70            negotiated: connection.negotiated,
71        }
72    }
73
74    /// How the backend connection was reached.
75    pub fn route(&self) -> BackendConnectionRoute {
76        self.route
77    }
78
79    /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
80    pub fn endpoint(&self) -> &str {
81        &self.endpoint
82    }
83
84    /// Broker negotiation metadata, present when the broker path was used.
85    pub fn negotiated(&self) -> Option<&Negotiated> {
86        self.negotiated.as_ref()
87    }
88
89    /// Send one correlated request and await its response frame.
90    pub fn request(
91        &mut self,
92        payload_protocol: u32,
93        payload: Vec<u8>,
94    ) -> Result<Frame, FrameClientError> {
95        self.client.request(payload_protocol, payload)
96    }
97
98    /// Borrow the underlying frame client for advanced use.
99    pub fn client_mut(&mut self) -> &mut FrameClient {
100        &mut self.client
101    }
102
103    /// Consume the session and return the owned frame client.
104    pub fn into_client(self) -> FrameClient {
105        self.client
106    }
107}
108
109/// Errors from [`BrokerSession::adopt`] / [`AsyncBrokerSession::adopt`].
110#[derive(Debug, thiserror::Error)]
111pub enum AdoptError {
112    /// `RUNNING_PROCESS_DISABLE=1` is set — the caller should use its direct
113    /// (non-broker) path. Not a failure of the broker itself.
114    #[error("broker disabled via RUNNING_PROCESS_DISABLE=1; use the direct path")]
115    BrokerDisabled,
116    /// The disable env var held an invalid value.
117    #[error(transparent)]
118    DisableEnv(#[from] BrokerDisableEnvError),
119    /// Broker negotiation or backend dial failed. Use
120    /// [`BrokerClientError::refusal_kind`] to branch on broker refusals.
121    #[error(transparent)]
122    Connect(#[from] BrokerClientError),
123    /// The async adoption worker thread failed to join (panicked or was
124    /// cancelled). Only reachable on the `client-async` path.
125    #[cfg(feature = "client-async")]
126    #[error("async adopt worker failed to join: {0}")]
127    AsyncJoin(String),
128}
129
130/// Owned inputs for [`AsyncBrokerSession::adopt`] (#433 R3).
131///
132/// The blocking [`ConnectBackendRequest`] borrows `&str`, which cannot cross a
133/// `spawn_blocking` boundary. This owned mirror carries the same fields by
134/// value; [`AsyncBrokerSession::adopt`] reconstructs a borrowed
135/// [`ConnectBackendRequest`] from it inside the worker thread.
136#[cfg(feature = "client-async")]
137#[derive(Clone, Debug)]
138pub struct OwnedConnectRequest {
139    /// Broker pipe/socket endpoint.
140    pub broker_endpoint: String,
141    /// Logical service name, such as `zccache`.
142    pub service_name: String,
143    /// Backend version the caller wants.
144    pub wanted_version: String,
145    /// Version of the caller's own service binary.
146    pub self_version: String,
147    /// Previously negotiated backend endpoint, if the caller has one.
148    pub cached_backend_endpoint: Option<String>,
149    /// Informational client version.
150    pub client_version: String,
151    /// Client library name for diagnostics.
152    pub client_lib_name: String,
153    /// Client library version for diagnostics.
154    pub client_lib_version: String,
155    /// Proposed keepalive interval.
156    pub client_keepalive_secs: u64,
157    /// Opt in to adopting a handed-off backend connection.
158    pub adopt_handed_off_connection: bool,
159    /// Deadline for the handoff-ready relay when adoption is enabled.
160    pub handoff_ready_timeout: std::time::Duration,
161}
162
163#[cfg(feature = "client-async")]
164impl OwnedConnectRequest {
165    /// Build an owned request with running-process defaults.
166    pub fn new(
167        broker_endpoint: impl Into<String>,
168        service_name: impl Into<String>,
169        wanted_version: impl Into<String>,
170        self_version: impl Into<String>,
171    ) -> Self {
172        Self {
173            broker_endpoint: broker_endpoint.into(),
174            service_name: service_name.into(),
175            wanted_version: wanted_version.into(),
176            self_version: self_version.into(),
177            cached_backend_endpoint: None,
178            client_version: String::new(),
179            client_lib_name: "running-process".to_string(),
180            client_lib_version: env!("CARGO_PKG_VERSION").to_string(),
181            client_keepalive_secs: 0,
182            adopt_handed_off_connection: false,
183            handoff_ready_timeout: crate::broker::client::DEFAULT_HANDOFF_READY_TIMEOUT,
184        }
185    }
186
187    fn as_request(&self) -> ConnectBackendRequest<'_> {
188        ConnectBackendRequest {
189            broker_endpoint: &self.broker_endpoint,
190            service_name: &self.service_name,
191            wanted_version: &self.wanted_version,
192            self_version: &self.self_version,
193            cached_backend_endpoint: self.cached_backend_endpoint.as_deref(),
194            client_version: &self.client_version,
195            client_lib_name: &self.client_lib_name,
196            client_lib_version: &self.client_lib_version,
197            client_keepalive_secs: self.client_keepalive_secs,
198            adopt_handed_off_connection: self.adopt_handed_off_connection,
199            handoff_ready_timeout: self.handoff_ready_timeout,
200        }
201    }
202}
203
204/// Async counterpart of [`BrokerSession`] for tokio daemons (#433 R3).
205///
206/// Runs the blocking negotiation on `tokio::task::spawn_blocking` and wraps the
207/// resulting [`FrameClient`] in an [`AsyncFrameClient`] so every later request
208/// is `.await`-able without a manual `spawn_blocking` at the call site.
209///
210/// [`AsyncFrameClient`]: crate::broker::backend_sdk::AsyncFrameClient
211#[cfg(feature = "client-async")]
212pub struct AsyncBrokerSession {
213    client: crate::broker::backend_sdk::AsyncFrameClient,
214    route: BackendConnectionRoute,
215    endpoint: String,
216    negotiated: Option<Negotiated>,
217}
218
219#[cfg(feature = "client-async")]
220impl AsyncBrokerSession {
221    /// Negotiate through the broker on a blocking worker and return a
222    /// ready-to-talk async session.
223    pub async fn adopt(request: OwnedConnectRequest) -> Result<Self, AdoptError> {
224        let joined = tokio::task::spawn_blocking(move || {
225            BrokerSession::adopt(request.as_request()).map(|session| {
226                (
227                    session.route,
228                    session.endpoint,
229                    session.negotiated,
230                    session.client,
231                )
232            })
233        })
234        .await
235        .map_err(|err| AdoptError::AsyncJoin(err.to_string()))?;
236        let (route, endpoint, negotiated, client) = joined?;
237        Ok(Self {
238            client: crate::broker::backend_sdk::AsyncFrameClient::from_blocking(client),
239            route,
240            endpoint,
241            negotiated,
242        })
243    }
244
245    /// How the backend connection was reached.
246    pub fn route(&self) -> BackendConnectionRoute {
247        self.route
248    }
249
250    /// Negotiated backend endpoint, suitable as a Hello-skip cache key.
251    pub fn endpoint(&self) -> &str {
252        &self.endpoint
253    }
254
255    /// Broker negotiation metadata, present when the broker path was used.
256    pub fn negotiated(&self) -> Option<&Negotiated> {
257        self.negotiated.as_ref()
258    }
259
260    /// Send one correlated request and await its response frame.
261    pub async fn request(
262        &mut self,
263        payload_protocol: u32,
264        payload: Vec<u8>,
265    ) -> Result<Frame, FrameClientError> {
266        self.client.request(payload_protocol, payload).await
267    }
268
269    /// Consume the session and return the owned async frame client.
270    pub fn into_client(self) -> crate::broker::backend_sdk::AsyncFrameClient {
271        self.client
272    }
273}