running_process/broker/client.rs
1//! Client-side helpers for the v1 broker Hello path.
2
3use std::io;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7
8use interprocess::local_socket::prelude::*;
9use prost::Message;
10
11use crate::broker::capabilities::{handoff_transport_available, CAP_HANDLE_PASSING};
12use crate::broker::protocol::{
13 hello_reply::Result as HelloReplyResult, read_frame, validate_frame_envelope, write_frame,
14 AdminReply, AdminRequest, ErrorCode, Frame, FrameKind, FrameValidationError, FramingError,
15 HandoffAck, Hello, HelloReply, Negotiated, PayloadEncoding, ADMIN_PAYLOAD_PROTOCOL,
16 CONTROL_PAYLOAD_PROTOCOL, PROTOCOL_VERSION,
17};
18use crate::broker::server::handoff::validate_handoff_frame;
19use crate::broker::server::local_socket_name;
20
21/// Default wall-clock bound on waiting for the broker's handoff-ready relay
22/// before silently downgrading to the `backend_pipe` reconnect path.
23pub const DEFAULT_HANDOFF_READY_TIMEOUT: Duration = Duration::from_secs(2);
24
25/// Canonical emergency escape hatch for participating broker consumers.
26pub const RUNNING_PROCESS_DISABLE_ENV: &str = "RUNNING_PROCESS_DISABLE";
27/// Value that disables broker usage and keeps the consumer on its direct path.
28pub const RUNNING_PROCESS_DISABLE_VALUE: &str = "1";
29/// TEST-ONLY seam that points the client at a fake backend endpoint (#354).
30///
31/// When set and non-empty, [`connect_to_backend`] connects directly to the
32/// given endpoint (same local-socket transport as the Hello-skip cache path)
33/// and skips broker discovery, Hello negotiation, and version checks
34/// entirely. A connect failure is returned as-is — there is no fallback to
35/// the real broker path, so tests that set this seam stay deterministic.
36///
37/// **Never set this in production.** It bypasses every broker safety check.
38/// The canonical escape hatch [`RUNNING_PROCESS_DISABLE_ENV`]`=1` takes
39/// precedence: when the broker is disabled, the fake-backend seam is ignored
40/// too.
41pub const RUNNING_PROCESS_FAKE_BACKEND_ENV: &str = "RUNNING_PROCESS_FAKE_BACKEND";
42
43/// Return whether the canonical broker escape hatch is enabled.
44///
45/// This helper only parses the shared environment contract. Consumers still
46/// own the direct fallback path they should use when this returns `true`.
47pub fn broker_disabled_by_env() -> Result<bool, BrokerDisableEnvError> {
48 let Some(value) = std::env::var_os(RUNNING_PROCESS_DISABLE_ENV) else {
49 return Ok(false);
50 };
51 let value = value.to_string_lossy();
52 if value == RUNNING_PROCESS_DISABLE_VALUE {
53 Ok(true)
54 } else {
55 Err(BrokerDisableEnvError {
56 value: value.into_owned(),
57 })
58 }
59}
60
61/// Inputs for [`connect_to_backend`].
62#[derive(Clone, Debug)]
63pub struct ConnectBackendRequest<'a> {
64 /// Broker pipe/socket endpoint.
65 pub broker_endpoint: &'a str,
66 /// Logical service name, such as `zccache`.
67 pub service_name: &'a str,
68 /// Backend version the caller wants.
69 pub wanted_version: &'a str,
70 /// Version of the caller's own service binary.
71 pub self_version: &'a str,
72 /// Previously negotiated backend endpoint, if the caller has one.
73 pub cached_backend_endpoint: Option<&'a str>,
74 /// Informational client version.
75 pub client_version: &'a str,
76 /// Client library name for diagnostics.
77 pub client_lib_name: &'a str,
78 /// Client library version for diagnostics.
79 pub client_lib_version: &'a str,
80 /// Proposed keepalive interval.
81 pub client_keepalive_secs: u64,
82 /// Opt in to adopting a handed-off backend connection (#354, slice 7).
83 ///
84 /// Default `false`: the client always reconnects to
85 /// `Negotiated.backend_pipe`, exactly as before. When `true` AND the
86 /// broker negotiated [`CAP_HANDLE_PASSING`] AND issued a non-empty
87 /// `Negotiated.handle_passed_token`, the client waits up to
88 /// [`Self::handoff_ready_timeout`] for the broker's handoff-ready relay
89 /// (an EVENT frame under the `0xD0FF` handoff payload protocol carrying
90 /// the backend's `HandoffAck`) on the SAME broker connection. On a valid
91 /// accepted relay with a matching token echo, the client keeps that
92 /// connection as the backend connection
93 /// ([`BackendConnectionRoute::HandlePassed`]). Any failure — missing
94 /// relay, timeout, refused or malformed ACK, token mismatch — silently
95 /// downgrades to the `backend_pipe` reconnect; adoption failure is never
96 /// an error by itself.
97 pub adopt_handed_off_connection: bool,
98 /// Deadline for the handoff-ready relay when
99 /// [`Self::adopt_handed_off_connection`] is set.
100 pub handoff_ready_timeout: Duration,
101}
102
103impl<'a> ConnectBackendRequest<'a> {
104 /// Build a request with running-process defaults.
105 pub fn new(
106 broker_endpoint: &'a str,
107 service_name: &'a str,
108 wanted_version: &'a str,
109 self_version: &'a str,
110 ) -> Self {
111 Self {
112 broker_endpoint,
113 service_name,
114 wanted_version,
115 self_version,
116 cached_backend_endpoint: None,
117 client_version: "",
118 client_lib_name: "running-process",
119 client_lib_version: env!("CARGO_PKG_VERSION"),
120 client_keepalive_secs: 0,
121 adopt_handed_off_connection: false,
122 handoff_ready_timeout: DEFAULT_HANDOFF_READY_TIMEOUT,
123 }
124 }
125
126 fn can_hello_skip(&self) -> bool {
127 self.cached_backend_endpoint.is_some() && self.wanted_version == self.self_version
128 }
129
130 fn hello(&self) -> Hello {
131 Hello {
132 client_min_protocol: PROTOCOL_VERSION,
133 client_max_protocol: PROTOCOL_VERSION,
134 service_name: self.service_name.into(),
135 wanted_version: self.wanted_version.into(),
136 client_version: self.client_version.into(),
137 client_capabilities: client_capabilities(),
138 auth_token: Vec::new(),
139 request_id: "hello".into(),
140 connection_id: 0,
141 peer_pid: std::process::id(),
142 client_lib_name: self.client_lib_name.into(),
143 client_lib_version: self.client_lib_version.into(),
144 peer_attestation_nonce: Vec::new(),
145 capability_token: Vec::new(),
146 client_keepalive_secs: self.client_keepalive_secs,
147 }
148 }
149}
150
151/// Capability bitmap this client advertises in `Hello.client_capabilities`.
152///
153/// [`CAP_HANDLE_PASSING`] is advertised only when the build carries a
154/// platform handoff transport (Windows `DuplicateHandle`, Unix
155/// `SCM_RIGHTS`) — currently both, but kept explicit so an exotic target
156/// degrades cleanly to the reconnect path.
157fn client_capabilities() -> u64 {
158 if handoff_transport_available() {
159 CAP_HANDLE_PASSING
160 } else {
161 0
162 }
163}
164
165/// How [`connect_to_backend`] reached the returned backend endpoint.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub enum BackendConnectionRoute {
168 /// Connected directly to a known backend endpoint, skipping Hello.
169 ///
170 /// Used for the cached-endpoint fast path and, deliberately reused to
171 /// avoid a new enum variant (a semver hazard for exhaustive matches),
172 /// for the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam. A
173 /// fake-backend connection is distinguishable because the caller set the
174 /// env var and [`BackendConnection::endpoint`] equals its value.
175 HelloSkip,
176 /// Asked the broker via Hello, then connected to the negotiated endpoint.
177 BrokerNegotiated,
178 /// Adopted the existing broker connection after a confirmed handoff.
179 ///
180 /// The broker handed the client's connection to the backend
181 /// (`DuplicateHandle`/`SCM_RIGHTS`) and relayed the backend's accepted
182 /// `HandoffAck` back to the client, so the socket that carried Hello is
183 /// now served by the backend. No connection to `backend_pipe` was
184 /// opened; [`BackendConnection::endpoint`] still reports the negotiated
185 /// `backend_pipe` so callers can cache it for future Hello-skip.
186 HandlePassed,
187}
188
189/// Open backend connection returned by [`connect_to_backend`].
190#[derive(Debug)]
191pub struct BackendConnection {
192 /// Connected local socket stream.
193 pub stream: interprocess::local_socket::Stream,
194 /// Endpoint that was connected.
195 ///
196 /// For [`BackendConnectionRoute::HandlePassed`] this is the negotiated
197 /// `backend_pipe` — useful as the Hello-skip cache key — even though the
198 /// stream is the original broker connection rather than a fresh connect
199 /// to that endpoint.
200 pub endpoint: String,
201 /// Route used to establish the connection.
202 pub route: BackendConnectionRoute,
203 /// Broker negotiation metadata when the broker path was used.
204 pub negotiated: Option<Negotiated>,
205}
206
207impl BackendConnection {
208 /// Pending one-time handoff token issued by the broker, if any.
209 ///
210 /// Non-empty only when both sides negotiated `CAP_HANDLE_PASSING`. By
211 /// default the client still connects via `Negotiated.backend_pipe` and
212 /// the route stays [`BackendConnectionRoute::BrokerNegotiated`]; when the
213 /// caller opted in via
214 /// [`ConnectBackendRequest::adopt_handed_off_connection`] and the broker
215 /// confirmed the handoff, the route is
216 /// [`BackendConnectionRoute::HandlePassed`] and this token is the one the
217 /// confirmation echoed (#354).
218 pub fn handoff_token(&self) -> Option<&[u8]> {
219 self.negotiated
220 .as_ref()
221 .map(|negotiated| negotiated.handle_passed_token.as_slice())
222 .filter(|token| !token.is_empty())
223 }
224}
225
226/// Connect to a backend with the v1 Hello-skip fast path.
227///
228/// TEST seam: when [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] is set to a
229/// non-empty endpoint (and `RUNNING_PROCESS_DISABLE=1` is not engaged), the
230/// client connects directly to that endpoint and returns
231/// [`BackendConnectionRoute::HelloSkip`] with no negotiation. A connect
232/// failure is returned ([`BrokerClientError::BackendConnect`]) without
233/// falling back to the broker path. Never set the seam in production.
234///
235/// When `cached_backend_endpoint` is present and `wanted_version ==
236/// self_version`, this tries the cached backend endpoint first. On miss,
237/// or when the versions differ, it sends a broker `Hello`, reads the
238/// broker `HelloReply`, and connects to `Negotiated.backend_pipe`.
239///
240/// With [`ConnectBackendRequest::adopt_handed_off_connection`] set and a
241/// negotiated handoff (capability bit + non-empty token), the client first
242/// waits — bounded by [`ConnectBackendRequest::handoff_ready_timeout`] — for
243/// the broker's handoff-ready relay on the same connection and, when the
244/// relay confirms the backend accepted, keeps that connection as the backend
245/// connection. Any adoption failure silently falls back to the
246/// `backend_pipe` reconnect below; reconnect remains the authoritative
247/// correctness path.
248pub fn connect_to_backend(
249 request: ConnectBackendRequest<'_>,
250) -> Result<BackendConnection, BrokerClientError> {
251 #[cfg(feature = "test-seams")]
252 if let Some(endpoint) = fake_backend_endpoint_from_env() {
253 let stream = connect_local_socket(&endpoint).map_err(BrokerClientError::BackendConnect)?;
254 return Ok(BackendConnection {
255 stream,
256 endpoint,
257 route: BackendConnectionRoute::HelloSkip,
258 negotiated: None,
259 });
260 }
261
262 if request.can_hello_skip() {
263 if let Some(endpoint) = request.cached_backend_endpoint {
264 if let Ok(stream) = connect_local_socket(endpoint) {
265 return Ok(BackendConnection {
266 stream,
267 endpoint: endpoint.into(),
268 route: BackendConnectionRoute::HelloSkip,
269 negotiated: None,
270 });
271 }
272 }
273 }
274
275 let (broker_stream, negotiated) = broker_hello(&request)?;
276 if request.adopt_handed_off_connection && handoff_negotiated(&negotiated) {
277 if let Some(adopted) = await_handoff_ready(
278 broker_stream,
279 negotiated.handle_passed_token.clone(),
280 request.handoff_ready_timeout,
281 ) {
282 return Ok(BackendConnection {
283 endpoint: negotiated.backend_pipe.clone(),
284 stream: adopted,
285 route: BackendConnectionRoute::HandlePassed,
286 negotiated: Some(negotiated),
287 });
288 }
289 }
290
291 if negotiated.backend_pipe.is_empty() {
292 return Err(BrokerClientError::EmptyBackendPipe);
293 }
294 let stream = connect_local_socket(&negotiated.backend_pipe)
295 .map_err(BrokerClientError::BackendConnect)?;
296 Ok(BackendConnection {
297 endpoint: negotiated.backend_pipe.clone(),
298 stream,
299 route: BackendConnectionRoute::BrokerNegotiated,
300 negotiated: Some(negotiated),
301 })
302}
303
304/// Read the [`RUNNING_PROCESS_FAKE_BACKEND_ENV`] test seam, if active.
305///
306/// Returns `Some(endpoint)` only when the variable is set to a non-empty
307/// value AND the canonical disable hatch is not engaged
308/// (`RUNNING_PROCESS_DISABLE=1` takes precedence — a disabled broker ignores
309/// the fake seam too, mirroring the consumer-side disable contract). An
310/// invalid `RUNNING_PROCESS_DISABLE` value is a configuration error that
311/// [`broker_disabled_by_env`] surfaces to consumers before they reach
312/// `connect_to_backend`; it does not suppress the seam here.
313///
314/// Gated behind the off-by-default `test-seams` feature (#433 R4) so the test
315/// backdoor is physically absent from every production build of
316/// [`connect_to_backend`]. Consumers depend on `running-process` with
317/// `features = ["client", ...]`; `test-seams` is never in that set.
318#[cfg(feature = "test-seams")]
319fn fake_backend_endpoint_from_env() -> Option<String> {
320 let value = std::env::var_os(RUNNING_PROCESS_FAKE_BACKEND_ENV)?;
321 let value = value.to_string_lossy();
322 if value.is_empty() {
323 return None;
324 }
325 if matches!(broker_disabled_by_env(), Ok(true)) {
326 return None;
327 }
328 Some(value.into_owned())
329}
330
331/// True when the broker negotiated handle passing for this connection: the
332/// server capability bit is set AND a one-time token was issued.
333fn handoff_negotiated(negotiated: &Negotiated) -> bool {
334 negotiated.server_capabilities & CAP_HANDLE_PASSING == CAP_HANDLE_PASSING
335 && !negotiated.handle_passed_token.is_empty()
336}
337
338/// Wait (bounded) for the broker's handoff-ready relay on the Hello
339/// connection and return the stream when adoption is confirmed.
340///
341/// The relay is an EVENT frame under the handoff payload protocol
342/// (`0xD0FF`) whose payload is the backend's `HandoffAck`; the client
343/// requires the token echo to match its negotiated one-time token and
344/// `accepted = true`. The blocking framed read runs on a helper thread so
345/// the wait is strictly deadline-bounded even though local-socket streams
346/// have no portable read timeout; on timeout the stream stays with the
347/// helper thread (which exits as soon as the abandoned read resolves) and
348/// the caller falls back to reconnect. Every failure returns `None` —
349/// adoption is best-effort by contract.
350fn await_handoff_ready(
351 stream: interprocess::local_socket::Stream,
352 expected_token: Vec<u8>,
353 timeout: Duration,
354) -> Option<interprocess::local_socket::Stream> {
355 let (result_tx, result_rx) = mpsc::channel();
356 thread::spawn(move || {
357 let mut stream = stream;
358 let outcome = read_handoff_ready(&mut stream, &expected_token).map(|()| stream);
359 let _ = result_tx.send(outcome);
360 });
361 match result_rx.recv_timeout(timeout) {
362 Ok(Ok(stream)) => Some(stream),
363 Ok(Err(_)) | Err(_) => None,
364 }
365}
366
367/// Read and validate one handoff-ready relay frame.
368///
369/// Errors carry a static description for diagnostics, but the adoption
370/// contract maps every failure to the silent reconnect downgrade.
371fn read_handoff_ready(
372 stream: &mut interprocess::local_socket::Stream,
373 expected_token: &[u8],
374) -> Result<(), &'static str> {
375 let bytes = read_frame(stream).map_err(|_| "failed to read handoff-ready frame")?;
376 let frame =
377 Frame::decode(bytes.as_slice()).map_err(|_| "failed to decode handoff-ready Frame")?;
378 validate_handoff_frame(&frame, FrameKind::Event)?;
379 let ack = HandoffAck::decode(frame.payload.as_slice())
380 .map_err(|_| "failed to decode handoff-ready HandoffAck payload")?;
381 if ack.token != expected_token {
382 return Err("handoff-ready token echo does not match the negotiated token");
383 }
384 if !ack.accepted {
385 return Err("broker relayed a refused handoff");
386 }
387 Ok(())
388}
389
390/// Send one typed admin request to a broker endpoint and return its reply.
391pub fn send_admin_request(
392 broker_endpoint: &str,
393 request: AdminRequest,
394) -> Result<AdminReply, BrokerClientError> {
395 let mut stream =
396 connect_local_socket(broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
397 let request_frame = Frame {
398 envelope_version: PROTOCOL_VERSION,
399 kind: FrameKind::Request as i32,
400 payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
401 payload: request.encode_to_vec(),
402 request_id: 1,
403 payload_encoding: PayloadEncoding::None as i32,
404 deadline_unix_ms: 0,
405 traceparent: String::new(),
406 tracestate: String::new(),
407 };
408 write_frame(&mut stream, &request_frame.encode_to_vec())?;
409
410 let response_bytes = read_frame(&mut stream)?;
411 let response_frame =
412 Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
413 validate_response_frame(
414 &response_frame,
415 ADMIN_PAYLOAD_PROTOCOL,
416 "payload_protocol is not admin",
417 )?;
418 AdminReply::decode(response_frame.payload.as_slice())
419 .map_err(BrokerClientError::DecodeAdminReply)
420}
421
422/// Open a platform local socket by broker endpoint string.
423pub fn connect_local_socket(endpoint: &str) -> io::Result<interprocess::local_socket::Stream> {
424 let name = local_socket_name(endpoint)?;
425 LocalSocketStream::connect(name)
426}
427
428fn broker_hello(
429 request: &ConnectBackendRequest<'_>,
430) -> Result<(interprocess::local_socket::Stream, Negotiated), BrokerClientError> {
431 let mut stream =
432 connect_local_socket(request.broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
433 let hello = request.hello();
434 let request_frame = Frame {
435 envelope_version: PROTOCOL_VERSION,
436 kind: FrameKind::Request as i32,
437 payload_protocol: CONTROL_PAYLOAD_PROTOCOL,
438 payload: hello.encode_to_vec(),
439 request_id: 1,
440 payload_encoding: PayloadEncoding::None as i32,
441 deadline_unix_ms: 0,
442 traceparent: String::new(),
443 tracestate: String::new(),
444 };
445 write_frame(&mut stream, &request_frame.encode_to_vec())?;
446
447 let response_bytes = read_frame(&mut stream)?;
448 let response_frame =
449 Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
450 validate_response_frame(
451 &response_frame,
452 CONTROL_PAYLOAD_PROTOCOL,
453 "payload_protocol is not control-plane",
454 )?;
455 let reply = HelloReply::decode(response_frame.payload.as_slice())
456 .map_err(BrokerClientError::DecodeHelloReply)?;
457 match reply
458 .result
459 .ok_or(BrokerClientError::MissingHelloReplyResult)?
460 {
461 HelloReplyResult::Negotiated(negotiated) => Ok((stream, negotiated)),
462 HelloReplyResult::Refused(refused) => Err(BrokerClientError::Refused {
463 code: ErrorCode::try_from(refused.code).unwrap_or(ErrorCode::Unspecified),
464 reason: refused.reason,
465 retry_after_ms: refused.retry_after_ms,
466 }),
467 }
468}
469
470fn validate_response_frame(
471 frame: &Frame,
472 expected_payload_protocol: u32,
473 payload_protocol_error: &'static str,
474) -> Result<(), BrokerClientError> {
475 validate_frame_envelope(frame, FrameKind::Response, expected_payload_protocol).map_err(
476 |error| {
477 BrokerClientError::UnexpectedResponseFrame(match error {
478 FrameValidationError::EnvelopeVersion { .. } => "envelope_version is not v1",
479 FrameValidationError::Kind { .. } => "kind is not RESPONSE",
480 FrameValidationError::PayloadProtocol { .. } => payload_protocol_error,
481 FrameValidationError::PayloadEncoding { .. } => "payload is compressed",
482 })
483 },
484 )
485}
486
487/// Invalid value for the canonical broker disable variable.
488#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
489#[error("RUNNING_PROCESS_DISABLE must be unset or 1, got {value:?}")]
490pub struct BrokerDisableEnvError {
491 /// Value read from `RUNNING_PROCESS_DISABLE`.
492 pub value: String,
493}
494
495/// Errors produced by broker client helpers.
496#[derive(Debug, thiserror::Error)]
497pub enum BrokerClientError {
498 /// Could not connect to the broker.
499 #[error("failed to connect to broker: {0}")]
500 BrokerConnect(io::Error),
501 /// Broker negotiation succeeded but the returned backend endpoint failed.
502 #[error("failed to connect to negotiated backend: {0}")]
503 BackendConnect(io::Error),
504 /// Frame read/write failed.
505 #[error(transparent)]
506 Framing(#[from] FramingError),
507 /// Broker response frame was malformed.
508 #[error("failed to decode broker response Frame: {0}")]
509 DecodeFrame(prost::DecodeError),
510 /// Broker response payload was not a valid `HelloReply`.
511 #[error("failed to decode broker HelloReply: {0}")]
512 DecodeHelloReply(prost::DecodeError),
513 /// Broker response payload was not a valid `AdminReply`.
514 #[error("failed to decode broker AdminReply: {0}")]
515 DecodeAdminReply(prost::DecodeError),
516 /// Broker returned an unexpected response envelope.
517 #[error("unexpected broker response frame: {0}")]
518 UnexpectedResponseFrame(&'static str),
519 /// Broker returned `HelloReply` without a result.
520 #[error("broker HelloReply did not contain a result")]
521 MissingHelloReplyResult,
522 /// Broker refused the Hello request.
523 #[error("broker refused Hello: {reason} ({code:?}, retry_after_ms={retry_after_ms})")]
524 Refused {
525 /// Stable refusal code.
526 code: ErrorCode,
527 /// Human-readable reason.
528 reason: String,
529 /// Retry hint.
530 retry_after_ms: u64,
531 },
532 /// Broker returned an empty backend endpoint.
533 #[error("broker negotiated an empty backend endpoint")]
534 EmptyBackendPipe,
535}
536
537impl BrokerClientError {
538 /// Classify a broker refusal into a stable, matchable kind (#433 R7).
539 ///
540 /// Returns `Some` only for [`BrokerClientError::Refused`]; every other
541 /// (transport/decoding) error returns `None`. Consumers branch on
542 /// [`RefusalKind`] instead of pattern-matching the raw `i32`
543 /// [`ErrorCode`], so retry/escalate decisions stay readable and survive
544 /// the addition of future codes (mapped to [`RefusalKind::Other`]).
545 pub fn refusal_kind(&self) -> Option<RefusalKind> {
546 match self {
547 BrokerClientError::Refused { code, .. } => Some(RefusalKind::from_code(*code)),
548 _ => None,
549 }
550 }
551}
552
553/// Stable, matchable classification of a broker `HelloReply::Refused` code.
554///
555/// This is the consumer-facing decision surface for the broker's refusal
556/// codes: the wire carries an [`ErrorCode`] `i32`, but a future broker may add
557/// codes a consumer's build predates. Matching on `RefusalKind` keeps consumer
558/// retry logic exhaustive and forward-compatible — any unrecognized code lands
559/// in [`RefusalKind::Other`] rather than silently mismatching.
560#[derive(Clone, Copy, Debug, PartialEq, Eq)]
561pub enum RefusalKind {
562 /// The requested version is below the backend's `min_version` or otherwise
563 /// not offered. Caller should upgrade/downgrade, not blindly retry.
564 VersionUnsupported,
565 /// The requested version is explicitly blocked (e.g. yanked). Do not retry
566 /// with the same version.
567 VersionBlocked,
568 /// The service name is unknown to this broker. A configuration error;
569 /// retrying will not help.
570 ServiceUnknown,
571 /// The broker is rate-limiting this peer. Honour `retry_after_ms`.
572 RateLimited,
573 /// The broker is shutting down. Retry against a fresh broker.
574 ShuttingDown,
575 /// Any other refusal code (peer rejected, internal, fd pressure, spawn
576 /// failure, unspecified, or a code newer than this build understands).
577 Other(ErrorCode),
578}
579
580impl RefusalKind {
581 /// Map a wire [`ErrorCode`] to its [`RefusalKind`].
582 pub fn from_code(code: ErrorCode) -> Self {
583 match code {
584 ErrorCode::ErrorVersionUnsupported => RefusalKind::VersionUnsupported,
585 ErrorCode::ErrorVersionBlocked => RefusalKind::VersionBlocked,
586 ErrorCode::ErrorServiceUnknown => RefusalKind::ServiceUnknown,
587 ErrorCode::ErrorRateLimited => RefusalKind::RateLimited,
588 ErrorCode::ErrorShuttingDown => RefusalKind::ShuttingDown,
589 other => RefusalKind::Other(other),
590 }
591 }
592}