Skip to main content

running_process/broker/
client.rs

1//! Client-side helpers for the v1 broker Hello path.
2
3use std::io;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7
8use interprocess::local_socket::prelude::*;
9use prost::Message;
10
11use crate::broker::capabilities::{handoff_transport_available, CAP_HANDLE_PASSING};
12use crate::broker::protocol::{
13    hello_reply::Result as HelloReplyResult, read_frame, validate_frame_envelope, write_frame,
14    AdminReply, AdminRequest, ErrorCode, Frame, FrameKind, FrameValidationError, FramingError,
15    HandoffAck, Hello, HelloReply, Negotiated, PayloadEncoding, ADMIN_PAYLOAD_PROTOCOL,
16    CONTROL_PAYLOAD_PROTOCOL, PROTOCOL_VERSION,
17};
18use crate::broker::server::handoff::validate_handoff_frame;
19use crate::broker::server::local_socket_name;
20
21/// Default wall-clock bound on waiting for the broker's handoff-ready relay
22/// before silently downgrading to the `backend_pipe` reconnect path.
23pub const DEFAULT_HANDOFF_READY_TIMEOUT: Duration = Duration::from_secs(2);
24
25/// Canonical emergency escape hatch for participating broker consumers.
26pub const RUNNING_PROCESS_DISABLE_ENV: &str = "RUNNING_PROCESS_DISABLE";
27/// Value that disables broker usage and keeps the consumer on its direct path.
28pub const RUNNING_PROCESS_DISABLE_VALUE: &str = "1";
29/// TEST-ONLY seam that points the client at a fake backend endpoint (#354).
30///
31/// When set and non-empty, [`connect_to_backend`] connects directly to the
32/// given endpoint (same local-socket transport as the Hello-skip cache path)
33/// and skips broker discovery, Hello negotiation, and version checks
34/// entirely. A connect failure is returned as-is — there is no fallback to
35/// the real broker path, so tests that set this seam stay deterministic.
36///
37/// **Never set this in production.** It bypasses every broker safety check.
38/// The canonical escape hatch [`RUNNING_PROCESS_DISABLE_ENV`]`=1` takes
39/// precedence: when the broker is disabled, the fake-backend seam is ignored
40/// too.
41pub const RUNNING_PROCESS_FAKE_BACKEND_ENV: &str = "RUNNING_PROCESS_FAKE_BACKEND";
42
43/// Return whether the canonical broker escape hatch is enabled.
44///
45/// This helper only parses the shared environment contract. Consumers still
46/// own the direct fallback path they should use when this returns `true`.
47pub fn broker_disabled_by_env() -> Result<bool, BrokerDisableEnvError> {
48    let Some(value) = std::env::var_os(RUNNING_PROCESS_DISABLE_ENV) else {
49        return Ok(false);
50    };
51    let value = value.to_string_lossy();
52    if value == RUNNING_PROCESS_DISABLE_VALUE {
53        Ok(true)
54    } else {
55        Err(BrokerDisableEnvError {
56            value: value.into_owned(),
57        })
58    }
59}
60
61/// Inputs for [`connect_to_backend`].
62#[derive(Clone, Debug)]
63pub struct ConnectBackendRequest<'a> {
64    /// Broker pipe/socket endpoint.
65    pub broker_endpoint: &'a str,
66    /// Logical service name, such as `zccache`.
67    pub service_name: &'a str,
68    /// Backend version the caller wants.
69    pub wanted_version: &'a str,
70    /// Version of the caller's own service binary.
71    pub self_version: &'a str,
72    /// Previously negotiated backend endpoint, if the caller has one.
73    pub cached_backend_endpoint: Option<&'a str>,
74    /// Informational client version.
75    pub client_version: &'a str,
76    /// Client library name for diagnostics.
77    pub client_lib_name: &'a str,
78    /// Client library version for diagnostics.
79    pub client_lib_version: &'a str,
80    /// Proposed keepalive interval.
81    pub client_keepalive_secs: u64,
82    /// Opt in to adopting a handed-off backend connection (#354, slice 7).
83    ///
84    /// Default `false`: the client always reconnects to
85    /// `Negotiated.backend_pipe`, exactly as before. When `true` AND the
86    /// broker negotiated [`CAP_HANDLE_PASSING`] AND issued a non-empty
87    /// `Negotiated.handle_passed_token`, the client waits up to
88    /// [`Self::handoff_ready_timeout`] for the broker's handoff-ready relay
89    /// (an EVENT frame under the `0xD0FF` handoff payload protocol carrying
90    /// the backend's `HandoffAck`) on the SAME broker connection. On a valid
91    /// accepted relay with a matching token echo, the client keeps that
92    /// connection as the backend connection
93    /// ([`BackendConnectionRoute::HandlePassed`]). Any failure — missing
94    /// relay, timeout, refused or malformed ACK, token mismatch — silently
95    /// downgrades to the `backend_pipe` reconnect; adoption failure is never
96    /// an error by itself.
97    pub adopt_handed_off_connection: bool,
98    /// Deadline for the handoff-ready relay when
99    /// [`Self::adopt_handed_off_connection`] is set.
100    pub handoff_ready_timeout: Duration,
101}
102
103impl<'a> ConnectBackendRequest<'a> {
104    /// Build a request with running-process defaults.
105    pub fn new(
106        broker_endpoint: &'a str,
107        service_name: &'a str,
108        wanted_version: &'a str,
109        self_version: &'a str,
110    ) -> Self {
111        Self {
112            broker_endpoint,
113            service_name,
114            wanted_version,
115            self_version,
116            cached_backend_endpoint: None,
117            client_version: "",
118            client_lib_name: "running-process",
119            client_lib_version: env!("CARGO_PKG_VERSION"),
120            client_keepalive_secs: 0,
121            adopt_handed_off_connection: false,
122            handoff_ready_timeout: DEFAULT_HANDOFF_READY_TIMEOUT,
123        }
124    }
125
126    fn can_hello_skip(&self) -> bool {
127        self.cached_backend_endpoint.is_some() && self.wanted_version == self.self_version
128    }
129
130    fn hello(&self) -> Hello {
131        Hello {
132            client_min_protocol: PROTOCOL_VERSION,
133            client_max_protocol: PROTOCOL_VERSION,
134            service_name: self.service_name.into(),
135            wanted_version: self.wanted_version.into(),
136            client_version: self.client_version.into(),
137            client_capabilities: client_capabilities(),
138            auth_token: Vec::new(),
139            request_id: "hello".into(),
140            connection_id: 0,
141            peer_pid: std::process::id(),
142            client_lib_name: self.client_lib_name.into(),
143            client_lib_version: self.client_lib_version.into(),
144            peer_attestation_nonce: Vec::new(),
145            capability_token: Vec::new(),
146            client_keepalive_secs: self.client_keepalive_secs,
147        }
148    }
149}
150
151/// Capability bitmap this client advertises in `Hello.client_capabilities`.
152///
153/// [`CAP_HANDLE_PASSING`] is advertised only when the build carries a
154/// platform handoff transport (Windows `DuplicateHandle`, Unix
155/// `SCM_RIGHTS`) — currently both, but kept explicit so an exotic target
156/// degrades cleanly to the reconnect path.
157fn client_capabilities() -> u64 {
158    if handoff_transport_available() {
159        CAP_HANDLE_PASSING
160    } else {
161        0
162    }
163}
164
165/// How [`connect_to_backend`] reached the returned backend endpoint.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub enum BackendConnectionRoute {
168    /// Connected directly to a known backend endpoint, skipping Hello.
169    ///
170    /// Used for the cached-endpoint fast path and, deliberately reused to
171    /// avoid a new enum variant (a semver hazard for exhaustive matches),
172    /// for the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam. A
173    /// fake-backend connection is distinguishable because the caller set the
174    /// env var and [`BackendConnection::endpoint`] equals its value.
175    HelloSkip,
176    /// Asked the broker via Hello, then connected to the negotiated endpoint.
177    BrokerNegotiated,
178    /// Adopted the existing broker connection after a confirmed handoff.
179    ///
180    /// The broker handed the client's connection to the backend
181    /// (`DuplicateHandle`/`SCM_RIGHTS`) and relayed the backend's accepted
182    /// `HandoffAck` back to the client, so the socket that carried Hello is
183    /// now served by the backend. No connection to `backend_pipe` was
184    /// opened; [`BackendConnection::endpoint`] still reports the negotiated
185    /// `backend_pipe` so callers can cache it for future Hello-skip.
186    HandlePassed,
187}
188
189/// Open backend connection returned by [`connect_to_backend`].
190#[derive(Debug)]
191pub struct BackendConnection {
192    /// Connected local socket stream.
193    pub stream: interprocess::local_socket::Stream,
194    /// Endpoint that was connected.
195    ///
196    /// For [`BackendConnectionRoute::HandlePassed`] this is the negotiated
197    /// `backend_pipe` — useful as the Hello-skip cache key — even though the
198    /// stream is the original broker connection rather than a fresh connect
199    /// to that endpoint.
200    pub endpoint: String,
201    /// Route used to establish the connection.
202    pub route: BackendConnectionRoute,
203    /// Broker negotiation metadata when the broker path was used.
204    pub negotiated: Option<Negotiated>,
205}
206
207impl BackendConnection {
208    /// Pending one-time handoff token issued by the broker, if any.
209    ///
210    /// Non-empty only when both sides negotiated `CAP_HANDLE_PASSING`. By
211    /// default the client still connects via `Negotiated.backend_pipe` and
212    /// the route stays [`BackendConnectionRoute::BrokerNegotiated`]; when the
213    /// caller opted in via
214    /// [`ConnectBackendRequest::adopt_handed_off_connection`] and the broker
215    /// confirmed the handoff, the route is
216    /// [`BackendConnectionRoute::HandlePassed`] and this token is the one the
217    /// confirmation echoed (#354).
218    pub fn handoff_token(&self) -> Option<&[u8]> {
219        self.negotiated
220            .as_ref()
221            .map(|negotiated| negotiated.handle_passed_token.as_slice())
222            .filter(|token| !token.is_empty())
223    }
224}
225
226/// Connect to a backend with the v1 Hello-skip fast path.
227///
228/// TEST seam: when [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] is set to a
229/// non-empty endpoint (and `RUNNING_PROCESS_DISABLE=1` is not engaged), the
230/// client connects directly to that endpoint and returns
231/// [`BackendConnectionRoute::HelloSkip`] with no negotiation. A connect
232/// failure is returned ([`BrokerClientError::BackendConnect`]) without
233/// falling back to the broker path. Never set the seam in production.
234///
235/// When `cached_backend_endpoint` is present and `wanted_version ==
236/// self_version`, this tries the cached backend endpoint first. On miss,
237/// or when the versions differ, it sends a broker `Hello`, reads the
238/// broker `HelloReply`, and connects to `Negotiated.backend_pipe`.
239///
240/// With [`ConnectBackendRequest::adopt_handed_off_connection`] set and a
241/// negotiated handoff (capability bit + non-empty token), the client first
242/// waits — bounded by [`ConnectBackendRequest::handoff_ready_timeout`] — for
243/// the broker's handoff-ready relay on the same connection and, when the
244/// relay confirms the backend accepted, keeps that connection as the backend
245/// connection. Any adoption failure silently falls back to the
246/// `backend_pipe` reconnect below; reconnect remains the authoritative
247/// correctness path.
248pub fn connect_to_backend(
249    request: ConnectBackendRequest<'_>,
250) -> Result<BackendConnection, BrokerClientError> {
251    #[cfg(feature = "test-seams")]
252    if let Some(endpoint) = fake_backend_endpoint_from_env() {
253        let stream = connect_local_socket(&endpoint).map_err(BrokerClientError::BackendConnect)?;
254        return Ok(BackendConnection {
255            stream,
256            endpoint,
257            route: BackendConnectionRoute::HelloSkip,
258            negotiated: None,
259        });
260    }
261
262    if request.can_hello_skip() {
263        if let Some(endpoint) = request.cached_backend_endpoint {
264            if let Ok(stream) = connect_local_socket(endpoint) {
265                return Ok(BackendConnection {
266                    stream,
267                    endpoint: endpoint.into(),
268                    route: BackendConnectionRoute::HelloSkip,
269                    negotiated: None,
270                });
271            }
272        }
273    }
274
275    let (broker_stream, negotiated) = broker_hello(&request)?;
276    if request.adopt_handed_off_connection && handoff_negotiated(&negotiated) {
277        if let Some(adopted) = await_handoff_ready(
278            broker_stream,
279            negotiated.handle_passed_token.clone(),
280            request.handoff_ready_timeout,
281        ) {
282            return Ok(BackendConnection {
283                endpoint: negotiated.backend_pipe.clone(),
284                stream: adopted,
285                route: BackendConnectionRoute::HandlePassed,
286                negotiated: Some(negotiated),
287            });
288        }
289    }
290
291    if negotiated.backend_pipe.is_empty() {
292        return Err(BrokerClientError::EmptyBackendPipe);
293    }
294    let stream = connect_local_socket(&negotiated.backend_pipe)
295        .map_err(BrokerClientError::BackendConnect)?;
296    Ok(BackendConnection {
297        endpoint: negotiated.backend_pipe.clone(),
298        stream,
299        route: BackendConnectionRoute::BrokerNegotiated,
300        negotiated: Some(negotiated),
301    })
302}
303
304/// Read the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam, if active.
305///
306/// Returns `Some(endpoint)` only when the variable is set to a non-empty
307/// value AND the canonical disable hatch is not engaged
308/// (`RUNNING_PROCESS_DISABLE=1` takes precedence — a disabled broker ignores
309/// the fake seam too, mirroring the consumer-side disable contract). An
310/// invalid `RUNNING_PROCESS_DISABLE` value is a configuration error that
311/// [`broker_disabled_by_env`] surfaces to consumers before they reach
312/// `connect_to_backend`; it does not suppress the seam here.
313///
314/// Gated behind the off-by-default `test-seams` feature (#433 R4) so the test
315/// backdoor is physically absent from every production build of
316/// [`connect_to_backend`]. Consumers depend on `running-process` with
317/// `features = ["client", ...]`; `test-seams` is never in that set.
318#[cfg(feature = "test-seams")]
319fn fake_backend_endpoint_from_env() -> Option<String> {
320    let value = std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV)?;
321    let value = value.to_string_lossy();
322    if value.is_empty() {
323        return None;
324    }
325    if matches!(broker_disabled_by_env(), Ok(true)) {
326        return None;
327    }
328    Some(value.into_owned())
329}
330
331/// True when the broker negotiated handle passing for this connection: the
332/// server capability bit is set AND a one-time token was issued.
333fn handoff_negotiated(negotiated: &Negotiated) -> bool {
334    negotiated.server_capabilities & CAP_HANDLE_PASSING == CAP_HANDLE_PASSING
335        && !negotiated.handle_passed_token.is_empty()
336}
337
338/// Wait (bounded) for the broker's handoff-ready relay on the Hello
339/// connection and return the stream when adoption is confirmed.
340///
341/// The relay is an EVENT frame under the handoff payload protocol
342/// (`0xD0FF`) whose payload is the backend's `HandoffAck`; the client
343/// requires the token echo to match its negotiated one-time token and
344/// `accepted = true`. The blocking framed read runs on a helper thread so
345/// the wait is strictly deadline-bounded even though local-socket streams
346/// have no portable read timeout; on timeout the stream stays with the
347/// helper thread (which exits as soon as the abandoned read resolves) and
348/// the caller falls back to reconnect. Every failure returns `None` —
349/// adoption is best-effort by contract.
350fn await_handoff_ready(
351    stream: interprocess::local_socket::Stream,
352    expected_token: Vec<u8>,
353    timeout: Duration,
354) -> Option<interprocess::local_socket::Stream> {
355    let (result_tx, result_rx) = mpsc::channel();
356    thread::spawn(move || {
357        let mut stream = stream;
358        let outcome = read_handoff_ready(&mut stream, &expected_token).map(|()| stream);
359        let _ = result_tx.send(outcome);
360    });
361    match result_rx.recv_timeout(timeout) {
362        Ok(Ok(stream)) => Some(stream),
363        Ok(Err(_)) | Err(_) => None,
364    }
365}
366
367/// Read and validate one handoff-ready relay frame.
368///
369/// Errors carry a static description for diagnostics, but the adoption
370/// contract maps every failure to the silent reconnect downgrade.
371fn read_handoff_ready(
372    stream: &mut interprocess::local_socket::Stream,
373    expected_token: &[u8],
374) -> Result<(), &'static str> {
375    let bytes = read_frame(stream).map_err(|_| "failed to read handoff-ready frame")?;
376    let frame =
377        Frame::decode(bytes.as_slice()).map_err(|_| "failed to decode handoff-ready Frame")?;
378    validate_handoff_frame(&frame, FrameKind::Event)?;
379    let ack = HandoffAck::decode(frame.payload.as_slice())
380        .map_err(|_| "failed to decode handoff-ready HandoffAck payload")?;
381    if ack.token != expected_token {
382        return Err("handoff-ready token echo does not match the negotiated token");
383    }
384    if !ack.accepted {
385        return Err("broker relayed a refused handoff");
386    }
387    Ok(())
388}
389
390/// Send one typed admin request to a broker endpoint and return its reply.
391pub fn send_admin_request(
392    broker_endpoint: &str,
393    request: AdminRequest,
394) -> Result<AdminReply, BrokerClientError> {
395    let mut stream =
396        connect_local_socket(broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
397    let request_frame = Frame {
398        envelope_version: PROTOCOL_VERSION,
399        kind: FrameKind::Request as i32,
400        payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
401        payload: request.encode_to_vec(),
402        request_id: 1,
403        payload_encoding: PayloadEncoding::None as i32,
404        deadline_unix_ms: 0,
405        traceparent: String::new(),
406        tracestate: String::new(),
407    };
408    write_frame(&mut stream, &request_frame.encode_to_vec())?;
409
410    let response_bytes = read_frame(&mut stream)?;
411    let response_frame =
412        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
413    validate_response_frame(
414        &response_frame,
415        ADMIN_PAYLOAD_PROTOCOL,
416        "payload_protocol is not admin",
417    )?;
418    AdminReply::decode(response_frame.payload.as_slice())
419        .map_err(BrokerClientError::DecodeAdminReply)
420}
421
422/// Open a platform local socket by broker endpoint string.
423pub fn connect_local_socket(endpoint: &str) -> io::Result<interprocess::local_socket::Stream> {
424    let name = local_socket_name(endpoint)?;
425    LocalSocketStream::connect(name)
426}
427
428fn broker_hello(
429    request: &ConnectBackendRequest<'_>,
430) -> Result<(interprocess::local_socket::Stream, Negotiated), BrokerClientError> {
431    let mut stream =
432        connect_local_socket(request.broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
433    let hello = request.hello();
434    let request_frame = Frame {
435        envelope_version: PROTOCOL_VERSION,
436        kind: FrameKind::Request as i32,
437        payload_protocol: CONTROL_PAYLOAD_PROTOCOL,
438        payload: hello.encode_to_vec(),
439        request_id: 1,
440        payload_encoding: PayloadEncoding::None as i32,
441        deadline_unix_ms: 0,
442        traceparent: String::new(),
443        tracestate: String::new(),
444    };
445    write_frame(&mut stream, &request_frame.encode_to_vec())?;
446
447    let response_bytes = read_frame(&mut stream)?;
448    let response_frame =
449        Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
450    validate_response_frame(
451        &response_frame,
452        CONTROL_PAYLOAD_PROTOCOL,
453        "payload_protocol is not control-plane",
454    )?;
455    let reply = HelloReply::decode(response_frame.payload.as_slice())
456        .map_err(BrokerClientError::DecodeHelloReply)?;
457    match reply
458        .result
459        .ok_or(BrokerClientError::MissingHelloReplyResult)?
460    {
461        HelloReplyResult::Negotiated(negotiated) => Ok((stream, negotiated)),
462        HelloReplyResult::Refused(refused) => Err(BrokerClientError::Refused {
463            code: ErrorCode::try_from(refused.code).unwrap_or(ErrorCode::Unspecified),
464            reason: refused.reason,
465            retry_after_ms: refused.retry_after_ms,
466        }),
467    }
468}
469
470fn validate_response_frame(
471    frame: &Frame,
472    expected_payload_protocol: u32,
473    payload_protocol_error: &'static str,
474) -> Result<(), BrokerClientError> {
475    validate_frame_envelope(frame, FrameKind::Response, expected_payload_protocol).map_err(
476        |error| {
477            BrokerClientError::UnexpectedResponseFrame(match error {
478                FrameValidationError::EnvelopeVersion { .. } => "envelope_version is not v1",
479                FrameValidationError::Kind { .. } => "kind is not RESPONSE",
480                FrameValidationError::PayloadProtocol { .. } => payload_protocol_error,
481                FrameValidationError::PayloadEncoding { .. } => "payload is compressed",
482            })
483        },
484    )
485}
486
487/// Invalid value for the canonical broker disable variable.
488#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
489#[error("RUNNING_PROCESS_DISABLE must be unset or 1, got {value:?}")]
490pub struct BrokerDisableEnvError {
491    /// Value read from `RUNNING_PROCESS_DISABLE`.
492    pub value: String,
493}
494
495/// Errors produced by broker client helpers.
496#[derive(Debug, thiserror::Error)]
497pub enum BrokerClientError {
498    /// Could not connect to the broker.
499    #[error("failed to connect to broker: {0}")]
500    BrokerConnect(io::Error),
501    /// Broker negotiation succeeded but the returned backend endpoint failed.
502    #[error("failed to connect to negotiated backend: {0}")]
503    BackendConnect(io::Error),
504    /// Frame read/write failed.
505    #[error(transparent)]
506    Framing(#[from] FramingError),
507    /// Broker response frame was malformed.
508    #[error("failed to decode broker response Frame: {0}")]
509    DecodeFrame(prost::DecodeError),
510    /// Broker response payload was not a valid `HelloReply`.
511    #[error("failed to decode broker HelloReply: {0}")]
512    DecodeHelloReply(prost::DecodeError),
513    /// Broker response payload was not a valid `AdminReply`.
514    #[error("failed to decode broker AdminReply: {0}")]
515    DecodeAdminReply(prost::DecodeError),
516    /// Broker returned an unexpected response envelope.
517    #[error("unexpected broker response frame: {0}")]
518    UnexpectedResponseFrame(&'static str),
519    /// Broker returned `HelloReply` without a result.
520    #[error("broker HelloReply did not contain a result")]
521    MissingHelloReplyResult,
522    /// Broker refused the Hello request.
523    #[error("broker refused Hello: {reason} ({code:?}, retry_after_ms={retry_after_ms})")]
524    Refused {
525        /// Stable refusal code.
526        code: ErrorCode,
527        /// Human-readable reason.
528        reason: String,
529        /// Retry hint.
530        retry_after_ms: u64,
531    },
532    /// Broker returned an empty backend endpoint.
533    #[error("broker negotiated an empty backend endpoint")]
534    EmptyBackendPipe,
535}
536
537impl BrokerClientError {
538    /// Classify a broker refusal into a stable, matchable kind (#433 R7).
539    ///
540    /// Returns `Some` only for [`BrokerClientError::Refused`]; every other
541    /// (transport/decoding) error returns `None`. Consumers branch on
542    /// [`RefusalKind`] instead of pattern-matching the raw `i32`
543    /// [`ErrorCode`], so retry/escalate decisions stay readable and survive
544    /// the addition of future codes (mapped to [`RefusalKind::Other`]).
545    pub fn refusal_kind(&self) -> Option<RefusalKind> {
546        match self {
547            BrokerClientError::Refused { code, .. } => Some(RefusalKind::from_code(*code)),
548            _ => None,
549        }
550    }
551}
552
553/// Stable, matchable classification of a broker `HelloReply::Refused` code.
554///
555/// This is the consumer-facing decision surface for the broker's refusal
556/// codes: the wire carries an [`ErrorCode`] `i32`, but a future broker may add
557/// codes a consumer's build predates. Matching on `RefusalKind` keeps consumer
558/// retry logic exhaustive and forward-compatible — any unrecognized code lands
559/// in [`RefusalKind::Other`] rather than silently mismatching.
560#[derive(Clone, Copy, Debug, PartialEq, Eq)]
561pub enum RefusalKind {
562    /// The requested version is below the backend's `min_version` or otherwise
563    /// not offered. Caller should upgrade/downgrade, not blindly retry.
564    VersionUnsupported,
565    /// The requested version is explicitly blocked (e.g. yanked). Do not retry
566    /// with the same version.
567    VersionBlocked,
568    /// The service name is unknown to this broker. A configuration error;
569    /// retrying will not help.
570    ServiceUnknown,
571    /// The broker is rate-limiting this peer. Honour `retry_after_ms`.
572    RateLimited,
573    /// The broker is shutting down. Retry against a fresh broker.
574    ShuttingDown,
575    /// Any other refusal code (peer rejected, internal, fd pressure, spawn
576    /// failure, unspecified, or a code newer than this build understands).
577    Other(ErrorCode),
578}
579
580impl RefusalKind {
581    /// Map a wire [`ErrorCode`] to its [`RefusalKind`].
582    pub fn from_code(code: ErrorCode) -> Self {
583        match code {
584            ErrorCode::ErrorVersionUnsupported => RefusalKind::VersionUnsupported,
585            ErrorCode::ErrorVersionBlocked => RefusalKind::VersionBlocked,
586            ErrorCode::ErrorServiceUnknown => RefusalKind::ServiceUnknown,
587            ErrorCode::ErrorRateLimited => RefusalKind::RateLimited,
588            ErrorCode::ErrorShuttingDown => RefusalKind::ShuttingDown,
589            other => RefusalKind::Other(other),
590        }
591    }
592}