Skip to main content

running_process/broker/
client.rs

1//! Client-side helpers for the v1 broker Hello path.
2
3use std::io;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7
8use interprocess::local_socket::prelude::*;
9use prost::Message;
10
11use crate::broker::capabilities::{handoff_transport_available, CAP_HANDLE_PASSING};
12use crate::broker::protocol::{
13    hello_reply::Result as HelloReplyResult, read_frame, write_frame, AdminReply, AdminRequest,
14    ErrorCode, Frame, FrameKind, FramingError, HandoffAck, Hello, HelloReply, Negotiated,
15    PayloadEncoding,
16};
17use crate::broker::server::handoff::validate_handoff_frame;
18use crate::broker::server::{local_socket_name, ADMIN_PAYLOAD_PROTOCOL};
19
20const PROTOCOL_VERSION: u32 = 1;
21const CONTROL_PAYLOAD_PROTOCOL: u32 = 0;
22
23/// Default wall-clock bound on waiting for the broker's handoff-ready relay
24/// before silently downgrading to the `backend_pipe` reconnect path.
25pub const DEFAULT_HANDOFF_READY_TIMEOUT: Duration = Duration::from_secs(2);
26
27/// Canonical emergency escape hatch for participating broker consumers.
28pub const RUNNING_PROCESS_DISABLE_ENV: &str = "RUNNING_PROCESS_DISABLE";
29/// Value that disables broker usage and keeps the consumer on its direct path.
30pub const RUNNING_PROCESS_DISABLE_VALUE: &str = "1";
31
32/// Return whether the canonical broker escape hatch is enabled.
33///
34/// This helper only parses the shared environment contract. Consumers still
35/// own the direct fallback path they should use when this returns `true`.
36pub fn broker_disabled_by_env() -> Result<bool, BrokerDisableEnvError> {
37    let Some(value) = std::env::var_os(RUNNING_PROCESS_DISABLE_ENV) else {
38        return Ok(false);
39    };
40    let value = value.to_string_lossy();
41    if value == RUNNING_PROCESS_DISABLE_VALUE {
42        Ok(true)
43    } else {
44        Err(BrokerDisableEnvError {
45            value: value.into_owned(),
46        })
47    }
48}
49
50/// Inputs for [`connect_to_backend`].
51#[derive(Clone, Debug)]
52pub struct ConnectBackendRequest<'a> {
53    /// Broker pipe/socket endpoint.
54    pub broker_endpoint: &'a str,
55    /// Logical service name, such as `zccache`.
56    pub service_name: &'a str,
57    /// Backend version the caller wants.
58    pub wanted_version: &'a str,
59    /// Version of the caller's own service binary.
60    pub self_version: &'a str,
61    /// Previously negotiated backend endpoint, if the caller has one.
62    pub cached_backend_endpoint: Option<&'a str>,
63    /// Informational client version.
64    pub client_version: &'a str,
65    /// Client library name for diagnostics.
66    pub client_lib_name: &'a str,
67    /// Client library version for diagnostics.
68    pub client_lib_version: &'a str,
69    /// Proposed keepalive interval.
70    pub client_keepalive_secs: u64,
71    /// Opt in to adopting a handed-off backend connection (#354, slice 7).
72    ///
73    /// Default `false`: the client always reconnects to
74    /// `Negotiated.backend_pipe`, exactly as before. When `true` AND the
75    /// broker negotiated [`CAP_HANDLE_PASSING`] AND issued a non-empty
76    /// `Negotiated.handle_passed_token`, the client waits up to
77    /// [`Self::handoff_ready_timeout`] for the broker's handoff-ready relay
78    /// (an EVENT frame under the `0xD0FF` handoff payload protocol carrying
79    /// the backend's `HandoffAck`) on the SAME broker connection. On a valid
80    /// accepted relay with a matching token echo, the client keeps that
81    /// connection as the backend connection
82    /// ([`BackendConnectionRoute::HandlePassed`]). Any failure — missing
83    /// relay, timeout, refused or malformed ACK, token mismatch — silently
84    /// downgrades to the `backend_pipe` reconnect; adoption failure is never
85    /// an error by itself.
86    pub adopt_handed_off_connection: bool,
87    /// Deadline for the handoff-ready relay when
88    /// [`Self::adopt_handed_off_connection`] is set.
89    pub handoff_ready_timeout: Duration,
90}
91
92impl<'a> ConnectBackendRequest<'a> {
93    /// Build a request with running-process defaults.
94    pub fn new(
95        broker_endpoint: &'a str,
96        service_name: &'a str,
97        wanted_version: &'a str,
98        self_version: &'a str,
99    ) -> Self {
100        Self {
101            broker_endpoint,
102            service_name,
103            wanted_version,
104            self_version,
105            cached_backend_endpoint: None,
106            client_version: "",
107            client_lib_name: "running-process",
108            client_lib_version: env!("CARGO_PKG_VERSION"),
109            client_keepalive_secs: 0,
110            adopt_handed_off_connection: false,
111            handoff_ready_timeout: DEFAULT_HANDOFF_READY_TIMEOUT,
112        }
113    }
114
115    fn can_hello_skip(&self) -> bool {
116        self.cached_backend_endpoint.is_some() && self.wanted_version == self.self_version
117    }
118
119    fn hello(&self) -> Hello {
120        Hello {
121            client_min_protocol: PROTOCOL_VERSION,
122            client_max_protocol: PROTOCOL_VERSION,
123            service_name: self.service_name.into(),
124            wanted_version: self.wanted_version.into(),
125            client_version: self.client_version.into(),
126            client_capabilities: client_capabilities(),
127            auth_token: Vec::new(),
128            request_id: "hello".into(),
129            connection_id: 0,
130            peer_pid: std::process::id(),
131            client_lib_name: self.client_lib_name.into(),
132            client_lib_version: self.client_lib_version.into(),
133            peer_attestation_nonce: Vec::new(),
134            capability_token: Vec::new(),
135            client_keepalive_secs: self.client_keepalive_secs,
136        }
137    }
138}
139
140/// Capability bitmap this client advertises in `Hello.client_capabilities`.
141///
142/// [`CAP_HANDLE_PASSING`] is advertised only when the build carries a
143/// platform handoff transport (Windows `DuplicateHandle`, Unix
144/// `SCM_RIGHTS`) — currently both, but kept explicit so an exotic target
145/// degrades cleanly to the reconnect path.
146fn client_capabilities() -> u64 {
147    if handoff_transport_available() {
148        CAP_HANDLE_PASSING
149    } else {
150        0
151    }
152}
153
154/// How [`connect_to_backend`] reached the returned backend endpoint.
155#[derive(Clone, Copy, Debug, PartialEq, Eq)]
156pub enum BackendConnectionRoute {
157    /// Connected directly to a cached backend endpoint.
158    HelloSkip,
159    /// Asked the broker via Hello, then connected to the negotiated endpoint.
160    BrokerNegotiated,
161    /// Adopted the existing broker connection after a confirmed handoff.
162    ///
163    /// The broker handed the client's connection to the backend
164    /// (`DuplicateHandle`/`SCM_RIGHTS`) and relayed the backend's accepted
165    /// `HandoffAck` back to the client, so the socket that carried Hello is
166    /// now served by the backend. No connection to `backend_pipe` was
167    /// opened; [`BackendConnection::endpoint`] still reports the negotiated
168    /// `backend_pipe` so callers can cache it for future Hello-skip.
169    HandlePassed,
170}
171
172/// Open backend connection returned by [`connect_to_backend`].
173#[derive(Debug)]
174pub struct BackendConnection {
175    /// Connected local socket stream.
176    pub stream: interprocess::local_socket::Stream,
177    /// Endpoint that was connected.
178    ///
179    /// For [`BackendConnectionRoute::HandlePassed`] this is the negotiated
180    /// `backend_pipe` — useful as the Hello-skip cache key — even though the
181    /// stream is the original broker connection rather than a fresh connect
182    /// to that endpoint.
183    pub endpoint: String,
184    /// Route used to establish the connection.
185    pub route: BackendConnectionRoute,
186    /// Broker negotiation metadata when the broker path was used.
187    pub negotiated: Option<Negotiated>,
188}
189
190impl BackendConnection {
191    /// Pending one-time handoff token issued by the broker, if any.
192    ///
193    /// Non-empty only when both sides negotiated `CAP_HANDLE_PASSING`. By
194    /// default the client still connects via `Negotiated.backend_pipe` and
195    /// the route stays [`BackendConnectionRoute::BrokerNegotiated`]; when the
196    /// caller opted in via
197    /// [`ConnectBackendRequest::adopt_handed_off_connection`] and the broker
198    /// confirmed the handoff, the route is
199    /// [`BackendConnectionRoute::HandlePassed`] and this token is the one the
200    /// confirmation echoed (#354).
201    pub fn handoff_token(&self) -> Option<&[u8]> {
202        self.negotiated
203            .as_ref()
204            .map(|negotiated| negotiated.handle_passed_token.as_slice())
205            .filter(|token| !token.is_empty())
206    }
207}
208
209/// Connect to a backend with the v1 Hello-skip fast path.
210///
211/// When `cached_backend_endpoint` is present and `wanted_version ==
212/// self_version`, this tries the cached backend endpoint first. On miss,
213/// or when the versions differ, it sends a broker `Hello`, reads the
214/// broker `HelloReply`, and connects to `Negotiated.backend_pipe`.
215///
216/// With [`ConnectBackendRequest::adopt_handed_off_connection`] set and a
217/// negotiated handoff (capability bit + non-empty token), the client first
218/// waits — bounded by [`ConnectBackendRequest::handoff_ready_timeout`] — for
219/// the broker's handoff-ready relay on the same connection and, when the
220/// relay confirms the backend accepted, keeps that connection as the backend
221/// connection. Any adoption failure silently falls back to the
222/// `backend_pipe` reconnect below; reconnect remains the authoritative
223/// correctness path.
224pub fn connect_to_backend(
225    request: ConnectBackendRequest<'_>,
226) -> Result<BackendConnection, BrokerClientError> {
227    if request.can_hello_skip() {
228        if let Some(endpoint) = request.cached_backend_endpoint {
229            if let Ok(stream) = connect_local_socket(endpoint) {
230                return Ok(BackendConnection {
231                    stream,
232                    endpoint: endpoint.into(),
233                    route: BackendConnectionRoute::HelloSkip,
234                    negotiated: None,
235                });
236            }
237        }
238    }
239
240    let (broker_stream, negotiated) = broker_hello(&request)?;
241    if request.adopt_handed_off_connection && handoff_negotiated(&negotiated) {
242        if let Some(adopted) = await_handoff_ready(
243            broker_stream,
244            negotiated.handle_passed_token.clone(),
245            request.handoff_ready_timeout,
246        ) {
247            return Ok(BackendConnection {
248                endpoint: negotiated.backend_pipe.clone(),
249                stream: adopted,
250                route: BackendConnectionRoute::HandlePassed,
251                negotiated: Some(negotiated),
252            });
253        }
254    }
255
256    if negotiated.backend_pipe.is_empty() {
257        return Err(BrokerClientError::EmptyBackendPipe);
258    }
259    let stream = connect_local_socket(&negotiated.backend_pipe)
260        .map_err(BrokerClientError::BackendConnect)?;
261    Ok(BackendConnection {
262        endpoint: negotiated.backend_pipe.clone(),
263        stream,
264        route: BackendConnectionRoute::BrokerNegotiated,
265        negotiated: Some(negotiated),
266    })
267}
268
269/// True when the broker negotiated handle passing for this connection: the
270/// server capability bit is set AND a one-time token was issued.
271fn handoff_negotiated(negotiated: &Negotiated) -> bool {
272    negotiated.server_capabilities & CAP_HANDLE_PASSING == CAP_HANDLE_PASSING
273        && !negotiated.handle_passed_token.is_empty()
274}
275
276/// Wait (bounded) for the broker's handoff-ready relay on the Hello
277/// connection and return the stream when adoption is confirmed.
278///
279/// The relay is an EVENT frame under the handoff payload protocol
280/// (`0xD0FF`) whose payload is the backend's `HandoffAck`; the client
281/// requires the token echo to match its negotiated one-time token and
282/// `accepted = true`. The blocking framed read runs on a helper thread so
283/// the wait is strictly deadline-bounded even though local-socket streams
284/// have no portable read timeout; on timeout the stream stays with the
285/// helper thread (which exits as soon as the abandoned read resolves) and
286/// the caller falls back to reconnect. Every failure returns `None` —
287/// adoption is best-effort by contract.
288fn await_handoff_ready(
289    stream: interprocess::local_socket::Stream,
290    expected_token: Vec<u8>,
291    timeout: Duration,
292) -> Option<interprocess::local_socket::Stream> {
293    let (result_tx, result_rx) = mpsc::channel();
294    thread::spawn(move || {
295        let mut stream = stream;
296        let outcome = read_handoff_ready(&mut stream, &expected_token).map(|()| stream);
297        let _ = result_tx.send(outcome);
298    });
299    match result_rx.recv_timeout(timeout) {
300        Ok(Ok(stream)) => Some(stream),
301        Ok(Err(_)) | Err(_) => None,
302    }
303}
304
305/// Read and validate one handoff-ready relay frame.
306///
307/// Errors carry a static description for diagnostics, but the adoption
308/// contract maps every failure to the silent reconnect downgrade.
309fn read_handoff_ready(
310    stream: &mut interprocess::local_socket::Stream,
311    expected_token: &[u8],
312) -> Result<(), &'static str> {
313    let bytes = read_frame(stream).map_err(|_| "failed to read handoff-ready frame")?;
314    let frame =
315        Frame::decode(bytes.as_slice()).map_err(|_| "failed to decode handoff-ready Frame")?;
316    validate_handoff_frame(&frame, FrameKind::Event)?;
317    let ack = HandoffAck::decode(frame.payload.as_slice())
318        .map_err(|_| "failed to decode handoff-ready HandoffAck payload")?;
319    if ack.token != expected_token {
320        return Err("handoff-ready token echo does not match the negotiated token");
321    }
322    if !ack.accepted {
323        return Err("broker relayed a refused handoff");
324    }
325    Ok(())
326}
327
328/// Send one typed admin request to a broker endpoint and return its reply.
329pub fn send_admin_request(
330    broker_endpoint: &str,
331    request: AdminRequest,
332) -> Result<AdminReply, BrokerClientError> {
333    let mut stream =
334        connect_local_socket(broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
335    let request_frame = Frame {
336        envelope_version: PROTOCOL_VERSION,
337        kind: FrameKind::Request as i32,
338        payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
339        payload: request.encode_to_vec(),
340        request_id: 1,
341        payload_encoding: PayloadEncoding::None as i32,
342        deadline_unix_ms: 0,
343        traceparent: String::new(),
344        tracestate: String::new(),
345    };
346    write_frame(&mut stream, &request_frame.encode_to_vec())?;
347
348    let response_bytes = read_frame(&mut stream)?;
349    let response_frame =
350        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
351    validate_response_frame(
352        &response_frame,
353        ADMIN_PAYLOAD_PROTOCOL,
354        "payload_protocol is not admin",
355    )?;
356    AdminReply::decode(response_frame.payload.as_slice())
357        .map_err(BrokerClientError::DecodeAdminReply)
358}
359
360/// Open a platform local socket by broker endpoint string.
361pub fn connect_local_socket(endpoint: &str) -> io::Result<interprocess::local_socket::Stream> {
362    let name = local_socket_name(endpoint)?;
363    LocalSocketStream::connect(name)
364}
365
366fn broker_hello(
367    request: &ConnectBackendRequest<'_>,
368) -> Result<(interprocess::local_socket::Stream, Negotiated), BrokerClientError> {
369    let mut stream =
370        connect_local_socket(request.broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
371    let hello = request.hello();
372    let request_frame = Frame {
373        envelope_version: PROTOCOL_VERSION,
374        kind: FrameKind::Request as i32,
375        payload_protocol: CONTROL_PAYLOAD_PROTOCOL,
376        payload: hello.encode_to_vec(),
377        request_id: 1,
378        payload_encoding: PayloadEncoding::None as i32,
379        deadline_unix_ms: 0,
380        traceparent: String::new(),
381        tracestate: String::new(),
382    };
383    write_frame(&mut stream, &request_frame.encode_to_vec())?;
384
385    let response_bytes = read_frame(&mut stream)?;
386    let response_frame =
387        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
388    validate_response_frame(
389        &response_frame,
390        CONTROL_PAYLOAD_PROTOCOL,
391        "payload_protocol is not control-plane",
392    )?;
393    let reply = HelloReply::decode(response_frame.payload.as_slice())
394        .map_err(BrokerClientError::DecodeHelloReply)?;
395    match reply
396        .result
397        .ok_or(BrokerClientError::MissingHelloReplyResult)?
398    {
399        HelloReplyResult::Negotiated(negotiated) => Ok((stream, negotiated)),
400        HelloReplyResult::Refused(refused) => Err(BrokerClientError::Refused {
401            code: ErrorCode::try_from(refused.code).unwrap_or(ErrorCode::Unspecified),
402            reason: refused.reason,
403            retry_after_ms: refused.retry_after_ms,
404        }),
405    }
406}
407
408fn validate_response_frame(
409    frame: &Frame,
410    expected_payload_protocol: u32,
411    payload_protocol_error: &'static str,
412) -> Result<(), BrokerClientError> {
413    if frame.envelope_version != PROTOCOL_VERSION {
414        return Err(BrokerClientError::UnexpectedResponseFrame(
415            "envelope_version is not v1",
416        ));
417    }
418    if FrameKind::try_from(frame.kind) != Ok(FrameKind::Response) {
419        return Err(BrokerClientError::UnexpectedResponseFrame(
420            "kind is not RESPONSE",
421        ));
422    }
423    if frame.payload_protocol != expected_payload_protocol {
424        return Err(BrokerClientError::UnexpectedResponseFrame(
425            payload_protocol_error,
426        ));
427    }
428    if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
429        return Err(BrokerClientError::UnexpectedResponseFrame(
430            "payload is compressed",
431        ));
432    }
433    Ok(())
434}
435
436/// Invalid value for the canonical broker disable variable.
437#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
438#[error("RUNNING_PROCESS_DISABLE must be unset or 1, got {value:?}")]
439pub struct BrokerDisableEnvError {
440    /// Value read from `RUNNING_PROCESS_DISABLE`.
441    pub value: String,
442}
443
444/// Errors produced by broker client helpers.
445#[derive(Debug, thiserror::Error)]
446pub enum BrokerClientError {
447    /// Could not connect to the broker.
448    #[error("failed to connect to broker: {0}")]
449    BrokerConnect(io::Error),
450    /// Broker negotiation succeeded but the returned backend endpoint failed.
451    #[error("failed to connect to negotiated backend: {0}")]
452    BackendConnect(io::Error),
453    /// Frame read/write failed.
454    #[error(transparent)]
455    Framing(#[from] FramingError),
456    /// Broker response frame was malformed.
457    #[error("failed to decode broker response Frame: {0}")]
458    DecodeFrame(prost::DecodeError),
459    /// Broker response payload was not a valid `HelloReply`.
460    #[error("failed to decode broker HelloReply: {0}")]
461    DecodeHelloReply(prost::DecodeError),
462    /// Broker response payload was not a valid `AdminReply`.
463    #[error("failed to decode broker AdminReply: {0}")]
464    DecodeAdminReply(prost::DecodeError),
465    /// Broker returned an unexpected response envelope.
466    #[error("unexpected broker response frame: {0}")]
467    UnexpectedResponseFrame(&'static str),
468    /// Broker returned `HelloReply` without a result.
469    #[error("broker HelloReply did not contain a result")]
470    MissingHelloReplyResult,
471    /// Broker refused the Hello request.
472    #[error("broker refused Hello: {reason} ({code:?}, retry_after_ms={retry_after_ms})")]
473    Refused {
474        /// Stable refusal code.
475        code: ErrorCode,
476        /// Human-readable reason.
477        reason: String,
478        /// Retry hint.
479        retry_after_ms: u64,
480    },
481    /// Broker returned an empty backend endpoint.
482    #[error("broker negotiated an empty backend endpoint")]
483    EmptyBackendPipe,
484}