Skip to main content

running_process/broker/
client.rs

1//! Client-side helpers for the v1 broker Hello path.
2
3use std::io;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7
8use interprocess::local_socket::prelude::*;
9use prost::Message;
10
11use crate::broker::capabilities::{handoff_transport_available, CAP_HANDLE_PASSING};
12use crate::broker::protocol::{
13    hello_reply::Result as HelloReplyResult, read_frame, validate_frame_envelope, write_frame,
14    AdminReply, AdminRequest, ErrorCode, Frame, FrameKind, FrameValidationError, FramingError,
15    HandoffAck, Hello, HelloReply, Negotiated, PayloadEncoding, ADMIN_PAYLOAD_PROTOCOL,
16    CONTROL_PAYLOAD_PROTOCOL, PROTOCOL_VERSION,
17};
18use crate::broker::server::handoff::validate_handoff_frame;
19use crate::broker::server::local_socket_name;
20
21/// Default wall-clock bound on waiting for the broker's handoff-ready relay
22/// before silently downgrading to the `backend_pipe` reconnect path.
23pub const DEFAULT_HANDOFF_READY_TIMEOUT: Duration = Duration::from_secs(2);
24
25/// Canonical emergency escape hatch for participating broker consumers.
26pub const RUNNING_PROCESS_DISABLE_ENV: &str = "RUNNING_PROCESS_DISABLE";
27/// Value that disables broker usage and keeps the consumer on its direct path.
28pub const RUNNING_PROCESS_DISABLE_VALUE: &str = "1";
29/// TEST-ONLY seam that points the client at a fake backend endpoint (#354).
30///
31/// When set and non-empty, [`connect_to_backend`] connects directly to the
32/// given endpoint (same local-socket transport as the Hello-skip cache path)
33/// and skips broker discovery, Hello negotiation, and version checks
34/// entirely. A connect failure is returned as-is — there is no fallback to
35/// the real broker path, so tests that set this seam stay deterministic.
36///
37/// **Never set this in production.** It bypasses every broker safety check.
38/// The canonical escape hatch [`RUNNING_PROCESS_DISABLE_ENV`]`=1` takes
39/// precedence: when the broker is disabled, the fake-backend seam is ignored
40/// too.
41pub const RUNNING_PROCESS_FAKE_BACKEND_ENV: &str = "RUNNING_PROCESS_FAKE_BACKEND";
42
43/// Return whether the canonical broker escape hatch is enabled.
44///
45/// This helper only parses the shared environment contract. Consumers still
46/// own the direct fallback path they should use when this returns `true`.
47pub fn broker_disabled_by_env() -> Result<bool, BrokerDisableEnvError> {
48    let Some(value) = std::env::var_os(RUNNING_PROCESS_DISABLE_ENV) else {
49        return Ok(false);
50    };
51    let value = value.to_string_lossy();
52    if value == RUNNING_PROCESS_DISABLE_VALUE {
53        Ok(true)
54    } else {
55        Err(BrokerDisableEnvError {
56            value: value.into_owned(),
57        })
58    }
59}
60
61/// Inputs for [`connect_to_backend`].
62#[derive(Clone, Debug)]
63pub struct ConnectBackendRequest<'a> {
64    /// Broker pipe/socket endpoint.
65    pub broker_endpoint: &'a str,
66    /// Logical service name, such as `zccache`.
67    pub service_name: &'a str,
68    /// Backend version the caller wants.
69    pub wanted_version: &'a str,
70    /// Version of the caller's own service binary.
71    pub self_version: &'a str,
72    /// Previously negotiated backend endpoint, if the caller has one.
73    pub cached_backend_endpoint: Option<&'a str>,
74    /// Informational client version.
75    pub client_version: &'a str,
76    /// Client library name for diagnostics.
77    pub client_lib_name: &'a str,
78    /// Client library version for diagnostics.
79    pub client_lib_version: &'a str,
80    /// Proposed keepalive interval.
81    pub client_keepalive_secs: u64,
82    /// Opt in to adopting a handed-off backend connection (#354, slice 7).
83    ///
84    /// Default `false`: the client always reconnects to
85    /// `Negotiated.backend_pipe`, exactly as before. When `true` AND the
86    /// broker negotiated [`CAP_HANDLE_PASSING`] AND issued a non-empty
87    /// `Negotiated.handle_passed_token`, the client waits up to
88    /// [`Self::handoff_ready_timeout`] for the broker's handoff-ready relay
89    /// (an EVENT frame under the `0xD0FF` handoff payload protocol carrying
90    /// the backend's `HandoffAck`) on the SAME broker connection. On a valid
91    /// accepted relay with a matching token echo, the client keeps that
92    /// connection as the backend connection
93    /// ([`BackendConnectionRoute::HandlePassed`]). Any failure — missing
94    /// relay, timeout, refused or malformed ACK, token mismatch — silently
95    /// downgrades to the `backend_pipe` reconnect; adoption failure is never
96    /// an error by itself.
97    pub adopt_handed_off_connection: bool,
98    /// Deadline for the handoff-ready relay when
99    /// [`Self::adopt_handed_off_connection`] is set.
100    pub handoff_ready_timeout: Duration,
101}
102
103impl<'a> ConnectBackendRequest<'a> {
104    /// Build a request with running-process defaults.
105    pub fn new(
106        broker_endpoint: &'a str,
107        service_name: &'a str,
108        wanted_version: &'a str,
109        self_version: &'a str,
110    ) -> Self {
111        Self {
112            broker_endpoint,
113            service_name,
114            wanted_version,
115            self_version,
116            cached_backend_endpoint: None,
117            client_version: "",
118            client_lib_name: "running-process",
119            client_lib_version: env!("CARGO_PKG_VERSION"),
120            client_keepalive_secs: 0,
121            adopt_handed_off_connection: false,
122            handoff_ready_timeout: DEFAULT_HANDOFF_READY_TIMEOUT,
123        }
124    }
125
126    fn can_hello_skip(&self) -> bool {
127        self.cached_backend_endpoint.is_some() && self.wanted_version == self.self_version
128    }
129
130    fn hello(&self) -> Hello {
131        Hello {
132            client_min_protocol: PROTOCOL_VERSION,
133            client_max_protocol: PROTOCOL_VERSION,
134            service_name: self.service_name.into(),
135            wanted_version: self.wanted_version.into(),
136            client_version: self.client_version.into(),
137            client_capabilities: client_capabilities(),
138            auth_token: Vec::new(),
139            request_id: "hello".into(),
140            connection_id: 0,
141            peer_pid: std::process::id(),
142            client_lib_name: self.client_lib_name.into(),
143            client_lib_version: self.client_lib_version.into(),
144            peer_attestation_nonce: Vec::new(),
145            capability_token: Vec::new(),
146            client_keepalive_secs: self.client_keepalive_secs,
147        }
148    }
149}
150
151/// Capability bitmap this client advertises in `Hello.client_capabilities`.
152///
153/// [`CAP_HANDLE_PASSING`] is advertised only when the build carries a
154/// platform handoff transport (Windows `DuplicateHandle`, Unix
155/// `SCM_RIGHTS`) — currently both, but kept explicit so an exotic target
156/// degrades cleanly to the reconnect path.
157fn client_capabilities() -> u64 {
158    if handoff_transport_available() {
159        CAP_HANDLE_PASSING
160    } else {
161        0
162    }
163}
164
165/// How [`connect_to_backend`] reached the returned backend endpoint.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub enum BackendConnectionRoute {
168    /// Connected directly to a known backend endpoint, skipping Hello.
169    ///
170    /// Used for the cached-endpoint fast path and, deliberately reused to
171    /// avoid a new enum variant (a semver hazard for exhaustive matches),
172    /// for the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam. A
173    /// fake-backend connection is distinguishable because the caller set the
174    /// env var and [`BackendConnection::endpoint`] equals its value.
175    HelloSkip,
176    /// Asked the broker via Hello, then connected to the negotiated endpoint.
177    BrokerNegotiated,
178    /// Adopted the existing broker connection after a confirmed handoff.
179    ///
180    /// The broker handed the client's connection to the backend
181    /// (`DuplicateHandle`/`SCM_RIGHTS`) and relayed the backend's accepted
182    /// `HandoffAck` back to the client, so the socket that carried Hello is
183    /// now served by the backend. No connection to `backend_pipe` was
184    /// opened; [`BackendConnection::endpoint`] still reports the negotiated
185    /// `backend_pipe` so callers can cache it for future Hello-skip.
186    HandlePassed,
187}
188
189/// Open backend connection returned by [`connect_to_backend`].
190#[derive(Debug)]
191pub struct BackendConnection {
192    /// Connected local socket stream.
193    pub stream: interprocess::local_socket::Stream,
194    /// Endpoint that was connected.
195    ///
196    /// For [`BackendConnectionRoute::HandlePassed`] this is the negotiated
197    /// `backend_pipe` — useful as the Hello-skip cache key — even though the
198    /// stream is the original broker connection rather than a fresh connect
199    /// to that endpoint.
200    pub endpoint: String,
201    /// Route used to establish the connection.
202    pub route: BackendConnectionRoute,
203    /// Broker negotiation metadata when the broker path was used.
204    pub negotiated: Option<Negotiated>,
205}
206
207impl BackendConnection {
208    /// Pending one-time handoff token issued by the broker, if any.
209    ///
210    /// Non-empty only when both sides negotiated `CAP_HANDLE_PASSING`. By
211    /// default the client still connects via `Negotiated.backend_pipe` and
212    /// the route stays [`BackendConnectionRoute::BrokerNegotiated`]; when the
213    /// caller opted in via
214    /// [`ConnectBackendRequest::adopt_handed_off_connection`] and the broker
215    /// confirmed the handoff, the route is
216    /// [`BackendConnectionRoute::HandlePassed`] and this token is the one the
217    /// confirmation echoed (#354).
218    pub fn handoff_token(&self) -> Option<&[u8]> {
219        self.negotiated
220            .as_ref()
221            .map(|negotiated| negotiated.handle_passed_token.as_slice())
222            .filter(|token| !token.is_empty())
223    }
224}
225
226/// Connect to a backend with the v1 Hello-skip fast path.
227///
228/// TEST seam: when [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] is set to a
229/// non-empty endpoint (and `RUNNING_PROCESS_DISABLE=1` is not engaged), the
230/// client connects directly to that endpoint and returns
231/// [`BackendConnectionRoute::HelloSkip`] with no negotiation. A connect
232/// failure is returned ([`BrokerClientError::BackendConnect`]) without
233/// falling back to the broker path. Never set the seam in production.
234///
235/// When `cached_backend_endpoint` is present and `wanted_version ==
236/// self_version`, this tries the cached backend endpoint first. On miss,
237/// or when the versions differ, it sends a broker `Hello`, reads the
238/// broker `HelloReply`, and connects to `Negotiated.backend_pipe`.
239///
240/// With [`ConnectBackendRequest::adopt_handed_off_connection`] set and a
241/// negotiated handoff (capability bit + non-empty token), the client first
242/// waits — bounded by [`ConnectBackendRequest::handoff_ready_timeout`] — for
243/// the broker's handoff-ready relay on the same connection and, when the
244/// relay confirms the backend accepted, keeps that connection as the backend
245/// connection. Any adoption failure silently falls back to the
246/// `backend_pipe` reconnect below; reconnect remains the authoritative
247/// correctness path.
248pub fn connect_to_backend(
249    request: ConnectBackendRequest<'_>,
250) -> Result<BackendConnection, BrokerClientError> {
251    if let Some(endpoint) = fake_backend_endpoint_from_env() {
252        let stream = connect_local_socket(&endpoint).map_err(BrokerClientError::BackendConnect)?;
253        return Ok(BackendConnection {
254            stream,
255            endpoint,
256            route: BackendConnectionRoute::HelloSkip,
257            negotiated: None,
258        });
259    }
260
261    if request.can_hello_skip() {
262        if let Some(endpoint) = request.cached_backend_endpoint {
263            if let Ok(stream) = connect_local_socket(endpoint) {
264                return Ok(BackendConnection {
265                    stream,
266                    endpoint: endpoint.into(),
267                    route: BackendConnectionRoute::HelloSkip,
268                    negotiated: None,
269                });
270            }
271        }
272    }
273
274    let (broker_stream, negotiated) = broker_hello(&request)?;
275    if request.adopt_handed_off_connection && handoff_negotiated(&negotiated) {
276        if let Some(adopted) = await_handoff_ready(
277            broker_stream,
278            negotiated.handle_passed_token.clone(),
279            request.handoff_ready_timeout,
280        ) {
281            return Ok(BackendConnection {
282                endpoint: negotiated.backend_pipe.clone(),
283                stream: adopted,
284                route: BackendConnectionRoute::HandlePassed,
285                negotiated: Some(negotiated),
286            });
287        }
288    }
289
290    if negotiated.backend_pipe.is_empty() {
291        return Err(BrokerClientError::EmptyBackendPipe);
292    }
293    let stream = connect_local_socket(&negotiated.backend_pipe)
294        .map_err(BrokerClientError::BackendConnect)?;
295    Ok(BackendConnection {
296        endpoint: negotiated.backend_pipe.clone(),
297        stream,
298        route: BackendConnectionRoute::BrokerNegotiated,
299        negotiated: Some(negotiated),
300    })
301}
302
303/// Read the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam, if active.
304///
305/// Returns `Some(endpoint)` only when the variable is set to a non-empty
306/// value AND the canonical disable hatch is not engaged
307/// (`RUNNING_PROCESS_DISABLE=1` takes precedence — a disabled broker ignores
308/// the fake seam too, mirroring the consumer-side disable contract). An
309/// invalid `RUNNING_PROCESS_DISABLE` value is a configuration error that
310/// [`broker_disabled_by_env`] surfaces to consumers before they reach
311/// `connect_to_backend`; it does not suppress the seam here.
312fn fake_backend_endpoint_from_env() -> Option<String> {
313    let value = std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV)?;
314    let value = value.to_string_lossy();
315    if value.is_empty() {
316        return None;
317    }
318    if matches!(broker_disabled_by_env(), Ok(true)) {
319        return None;
320    }
321    Some(value.into_owned())
322}
323
324/// True when the broker negotiated handle passing for this connection: the
325/// server capability bit is set AND a one-time token was issued.
326fn handoff_negotiated(negotiated: &Negotiated) -> bool {
327    negotiated.server_capabilities & CAP_HANDLE_PASSING == CAP_HANDLE_PASSING
328        && !negotiated.handle_passed_token.is_empty()
329}
330
331/// Wait (bounded) for the broker's handoff-ready relay on the Hello
332/// connection and return the stream when adoption is confirmed.
333///
334/// The relay is an EVENT frame under the handoff payload protocol
335/// (`0xD0FF`) whose payload is the backend's `HandoffAck`; the client
336/// requires the token echo to match its negotiated one-time token and
337/// `accepted = true`. The blocking framed read runs on a helper thread so
338/// the wait is strictly deadline-bounded even though local-socket streams
339/// have no portable read timeout; on timeout the stream stays with the
340/// helper thread (which exits as soon as the abandoned read resolves) and
341/// the caller falls back to reconnect. Every failure returns `None` —
342/// adoption is best-effort by contract.
343fn await_handoff_ready(
344    stream: interprocess::local_socket::Stream,
345    expected_token: Vec<u8>,
346    timeout: Duration,
347) -> Option<interprocess::local_socket::Stream> {
348    let (result_tx, result_rx) = mpsc::channel();
349    thread::spawn(move || {
350        let mut stream = stream;
351        let outcome = read_handoff_ready(&mut stream, &expected_token).map(|()| stream);
352        let _ = result_tx.send(outcome);
353    });
354    match result_rx.recv_timeout(timeout) {
355        Ok(Ok(stream)) => Some(stream),
356        Ok(Err(_)) | Err(_) => None,
357    }
358}
359
360/// Read and validate one handoff-ready relay frame.
361///
362/// Errors carry a static description for diagnostics, but the adoption
363/// contract maps every failure to the silent reconnect downgrade.
364fn read_handoff_ready(
365    stream: &mut interprocess::local_socket::Stream,
366    expected_token: &[u8],
367) -> Result<(), &'static str> {
368    let bytes = read_frame(stream).map_err(|_| "failed to read handoff-ready frame")?;
369    let frame =
370        Frame::decode(bytes.as_slice()).map_err(|_| "failed to decode handoff-ready Frame")?;
371    validate_handoff_frame(&frame, FrameKind::Event)?;
372    let ack = HandoffAck::decode(frame.payload.as_slice())
373        .map_err(|_| "failed to decode handoff-ready HandoffAck payload")?;
374    if ack.token != expected_token {
375        return Err("handoff-ready token echo does not match the negotiated token");
376    }
377    if !ack.accepted {
378        return Err("broker relayed a refused handoff");
379    }
380    Ok(())
381}
382
383/// Send one typed admin request to a broker endpoint and return its reply.
384pub fn send_admin_request(
385    broker_endpoint: &str,
386    request: AdminRequest,
387) -> Result<AdminReply, BrokerClientError> {
388    let mut stream =
389        connect_local_socket(broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
390    let request_frame = Frame {
391        envelope_version: PROTOCOL_VERSION,
392        kind: FrameKind::Request as i32,
393        payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
394        payload: request.encode_to_vec(),
395        request_id: 1,
396        payload_encoding: PayloadEncoding::None as i32,
397        deadline_unix_ms: 0,
398        traceparent: String::new(),
399        tracestate: String::new(),
400    };
401    write_frame(&mut stream, &request_frame.encode_to_vec())?;
402
403    let response_bytes = read_frame(&mut stream)?;
404    let response_frame =
405        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
406    validate_response_frame(
407        &response_frame,
408        ADMIN_PAYLOAD_PROTOCOL,
409        "payload_protocol is not admin",
410    )?;
411    AdminReply::decode(response_frame.payload.as_slice())
412        .map_err(BrokerClientError::DecodeAdminReply)
413}
414
415/// Open a platform local socket by broker endpoint string.
416pub fn connect_local_socket(endpoint: &str) -> io::Result<interprocess::local_socket::Stream> {
417    let name = local_socket_name(endpoint)?;
418    LocalSocketStream::connect(name)
419}
420
421fn broker_hello(
422    request: &ConnectBackendRequest<'_>,
423) -> Result<(interprocess::local_socket::Stream, Negotiated), BrokerClientError> {
424    let mut stream =
425        connect_local_socket(request.broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
426    let hello = request.hello();
427    let request_frame = Frame {
428        envelope_version: PROTOCOL_VERSION,
429        kind: FrameKind::Request as i32,
430        payload_protocol: CONTROL_PAYLOAD_PROTOCOL,
431        payload: hello.encode_to_vec(),
432        request_id: 1,
433        payload_encoding: PayloadEncoding::None as i32,
434        deadline_unix_ms: 0,
435        traceparent: String::new(),
436        tracestate: String::new(),
437    };
438    write_frame(&mut stream, &request_frame.encode_to_vec())?;
439
440    let response_bytes = read_frame(&mut stream)?;
441    let response_frame =
442        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
443    validate_response_frame(
444        &response_frame,
445        CONTROL_PAYLOAD_PROTOCOL,
446        "payload_protocol is not control-plane",
447    )?;
448    let reply = HelloReply::decode(response_frame.payload.as_slice())
449        .map_err(BrokerClientError::DecodeHelloReply)?;
450    match reply
451        .result
452        .ok_or(BrokerClientError::MissingHelloReplyResult)?
453    {
454        HelloReplyResult::Negotiated(negotiated) => Ok((stream, negotiated)),
455        HelloReplyResult::Refused(refused) => Err(BrokerClientError::Refused {
456            code: ErrorCode::try_from(refused.code).unwrap_or(ErrorCode::Unspecified),
457            reason: refused.reason,
458            retry_after_ms: refused.retry_after_ms,
459        }),
460    }
461}
462
463fn validate_response_frame(
464    frame: &Frame,
465    expected_payload_protocol: u32,
466    payload_protocol_error: &'static str,
467) -> Result<(), BrokerClientError> {
468    validate_frame_envelope(frame, FrameKind::Response, expected_payload_protocol).map_err(
469        |error| {
470            BrokerClientError::UnexpectedResponseFrame(match error {
471                FrameValidationError::EnvelopeVersion { .. } => "envelope_version is not v1",
472                FrameValidationError::Kind { .. } => "kind is not RESPONSE",
473                FrameValidationError::PayloadProtocol { .. } => payload_protocol_error,
474                FrameValidationError::PayloadEncoding { .. } => "payload is compressed",
475            })
476        },
477    )
478}
479
480/// Invalid value for the canonical broker disable variable.
481#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
482#[error("RUNNING_PROCESS_DISABLE must be unset or 1, got {value:?}")]
483pub struct BrokerDisableEnvError {
484    /// Value read from `RUNNING_PROCESS_DISABLE`.
485    pub value: String,
486}
487
488/// Errors produced by broker client helpers.
489#[derive(Debug, thiserror::Error)]
490pub enum BrokerClientError {
491    /// Could not connect to the broker.
492    #[error("failed to connect to broker: {0}")]
493    BrokerConnect(io::Error),
494    /// Broker negotiation succeeded but the returned backend endpoint failed.
495    #[error("failed to connect to negotiated backend: {0}")]
496    BackendConnect(io::Error),
497    /// Frame read/write failed.
498    #[error(transparent)]
499    Framing(#[from] FramingError),
500    /// Broker response frame was malformed.
501    #[error("failed to decode broker response Frame: {0}")]
502    DecodeFrame(prost::DecodeError),
503    /// Broker response payload was not a valid `HelloReply`.
504    #[error("failed to decode broker HelloReply: {0}")]
505    DecodeHelloReply(prost::DecodeError),
506    /// Broker response payload was not a valid `AdminReply`.
507    #[error("failed to decode broker AdminReply: {0}")]
508    DecodeAdminReply(prost::DecodeError),
509    /// Broker returned an unexpected response envelope.
510    #[error("unexpected broker response frame: {0}")]
511    UnexpectedResponseFrame(&'static str),
512    /// Broker returned `HelloReply` without a result.
513    #[error("broker HelloReply did not contain a result")]
514    MissingHelloReplyResult,
515    /// Broker refused the Hello request.
516    #[error("broker refused Hello: {reason} ({code:?}, retry_after_ms={retry_after_ms})")]
517    Refused {
518        /// Stable refusal code.
519        code: ErrorCode,
520        /// Human-readable reason.
521        reason: String,
522        /// Retry hint.
523        retry_after_ms: u64,
524    },
525    /// Broker returned an empty backend endpoint.
526    #[error("broker negotiated an empty backend endpoint")]
527    EmptyBackendPipe,
528}