1use std::io;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7
8use interprocess::local_socket::prelude::*;
9use prost::Message;
10
11use crate::broker::capabilities::{handoff_transport_available, CAP_HANDLE_PASSING};
12use crate::broker::protocol::{
13 hello_reply::Result as HelloReplyResult, read_frame, write_frame, AdminReply, AdminRequest,
14 ErrorCode, Frame, FrameKind, FramingError, HandoffAck, Hello, HelloReply, Negotiated,
15 PayloadEncoding,
16};
17use crate::broker::server::handoff::validate_handoff_frame;
18use crate::broker::server::{local_socket_name, ADMIN_PAYLOAD_PROTOCOL};
19
20const PROTOCOL_VERSION: u32 = 1;
21const CONTROL_PAYLOAD_PROTOCOL: u32 = 0;
22
23pub const DEFAULT_HANDOFF_READY_TIMEOUT: Duration = Duration::from_secs(2);
26
27pub const RUNNING_PROCESS_DISABLE_ENV: &str = "RUNNING_PROCESS_DISABLE";
29pub const RUNNING_PROCESS_DISABLE_VALUE: &str = "1";
31
32pub fn broker_disabled_by_env() -> Result<bool, BrokerDisableEnvError> {
37 let Some(value) = std::env::var_os(RUNNING_PROCESS_DISABLE_ENV) else {
38 return Ok(false);
39 };
40 let value = value.to_string_lossy();
41 if value == RUNNING_PROCESS_DISABLE_VALUE {
42 Ok(true)
43 } else {
44 Err(BrokerDisableEnvError {
45 value: value.into_owned(),
46 })
47 }
48}
49
50#[derive(Clone, Debug)]
52pub struct ConnectBackendRequest<'a> {
53 pub broker_endpoint: &'a str,
55 pub service_name: &'a str,
57 pub wanted_version: &'a str,
59 pub self_version: &'a str,
61 pub cached_backend_endpoint: Option<&'a str>,
63 pub client_version: &'a str,
65 pub client_lib_name: &'a str,
67 pub client_lib_version: &'a str,
69 pub client_keepalive_secs: u64,
71 pub adopt_handed_off_connection: bool,
87 pub handoff_ready_timeout: Duration,
90}
91
92impl<'a> ConnectBackendRequest<'a> {
93 pub fn new(
95 broker_endpoint: &'a str,
96 service_name: &'a str,
97 wanted_version: &'a str,
98 self_version: &'a str,
99 ) -> Self {
100 Self {
101 broker_endpoint,
102 service_name,
103 wanted_version,
104 self_version,
105 cached_backend_endpoint: None,
106 client_version: "",
107 client_lib_name: "running-process",
108 client_lib_version: env!("CARGO_PKG_VERSION"),
109 client_keepalive_secs: 0,
110 adopt_handed_off_connection: false,
111 handoff_ready_timeout: DEFAULT_HANDOFF_READY_TIMEOUT,
112 }
113 }
114
115 fn can_hello_skip(&self) -> bool {
116 self.cached_backend_endpoint.is_some() && self.wanted_version == self.self_version
117 }
118
119 fn hello(&self) -> Hello {
120 Hello {
121 client_min_protocol: PROTOCOL_VERSION,
122 client_max_protocol: PROTOCOL_VERSION,
123 service_name: self.service_name.into(),
124 wanted_version: self.wanted_version.into(),
125 client_version: self.client_version.into(),
126 client_capabilities: client_capabilities(),
127 auth_token: Vec::new(),
128 request_id: "hello".into(),
129 connection_id: 0,
130 peer_pid: std::process::id(),
131 client_lib_name: self.client_lib_name.into(),
132 client_lib_version: self.client_lib_version.into(),
133 peer_attestation_nonce: Vec::new(),
134 capability_token: Vec::new(),
135 client_keepalive_secs: self.client_keepalive_secs,
136 }
137 }
138}
139
140fn client_capabilities() -> u64 {
147 if handoff_transport_available() {
148 CAP_HANDLE_PASSING
149 } else {
150 0
151 }
152}
153
154#[derive(Clone, Copy, Debug, PartialEq, Eq)]
156pub enum BackendConnectionRoute {
157 HelloSkip,
159 BrokerNegotiated,
161 HandlePassed,
170}
171
172#[derive(Debug)]
174pub struct BackendConnection {
175 pub stream: interprocess::local_socket::Stream,
177 pub endpoint: String,
184 pub route: BackendConnectionRoute,
186 pub negotiated: Option<Negotiated>,
188}
189
190impl BackendConnection {
191 pub fn handoff_token(&self) -> Option<&[u8]> {
202 self.negotiated
203 .as_ref()
204 .map(|negotiated| negotiated.handle_passed_token.as_slice())
205 .filter(|token| !token.is_empty())
206 }
207}
208
209pub fn connect_to_backend(
225 request: ConnectBackendRequest<'_>,
226) -> Result<BackendConnection, BrokerClientError> {
227 if request.can_hello_skip() {
228 if let Some(endpoint) = request.cached_backend_endpoint {
229 if let Ok(stream) = connect_local_socket(endpoint) {
230 return Ok(BackendConnection {
231 stream,
232 endpoint: endpoint.into(),
233 route: BackendConnectionRoute::HelloSkip,
234 negotiated: None,
235 });
236 }
237 }
238 }
239
240 let (broker_stream, negotiated) = broker_hello(&request)?;
241 if request.adopt_handed_off_connection && handoff_negotiated(&negotiated) {
242 if let Some(adopted) = await_handoff_ready(
243 broker_stream,
244 negotiated.handle_passed_token.clone(),
245 request.handoff_ready_timeout,
246 ) {
247 return Ok(BackendConnection {
248 endpoint: negotiated.backend_pipe.clone(),
249 stream: adopted,
250 route: BackendConnectionRoute::HandlePassed,
251 negotiated: Some(negotiated),
252 });
253 }
254 }
255
256 if negotiated.backend_pipe.is_empty() {
257 return Err(BrokerClientError::EmptyBackendPipe);
258 }
259 let stream = connect_local_socket(&negotiated.backend_pipe)
260 .map_err(BrokerClientError::BackendConnect)?;
261 Ok(BackendConnection {
262 endpoint: negotiated.backend_pipe.clone(),
263 stream,
264 route: BackendConnectionRoute::BrokerNegotiated,
265 negotiated: Some(negotiated),
266 })
267}
268
269fn handoff_negotiated(negotiated: &Negotiated) -> bool {
272 negotiated.server_capabilities & CAP_HANDLE_PASSING == CAP_HANDLE_PASSING
273 && !negotiated.handle_passed_token.is_empty()
274}
275
276fn await_handoff_ready(
289 stream: interprocess::local_socket::Stream,
290 expected_token: Vec<u8>,
291 timeout: Duration,
292) -> Option<interprocess::local_socket::Stream> {
293 let (result_tx, result_rx) = mpsc::channel();
294 thread::spawn(move || {
295 let mut stream = stream;
296 let outcome = read_handoff_ready(&mut stream, &expected_token).map(|()| stream);
297 let _ = result_tx.send(outcome);
298 });
299 match result_rx.recv_timeout(timeout) {
300 Ok(Ok(stream)) => Some(stream),
301 Ok(Err(_)) | Err(_) => None,
302 }
303}
304
305fn read_handoff_ready(
310 stream: &mut interprocess::local_socket::Stream,
311 expected_token: &[u8],
312) -> Result<(), &'static str> {
313 let bytes = read_frame(stream).map_err(|_| "failed to read handoff-ready frame")?;
314 let frame =
315 Frame::decode(bytes.as_slice()).map_err(|_| "failed to decode handoff-ready Frame")?;
316 validate_handoff_frame(&frame, FrameKind::Event)?;
317 let ack = HandoffAck::decode(frame.payload.as_slice())
318 .map_err(|_| "failed to decode handoff-ready HandoffAck payload")?;
319 if ack.token != expected_token {
320 return Err("handoff-ready token echo does not match the negotiated token");
321 }
322 if !ack.accepted {
323 return Err("broker relayed a refused handoff");
324 }
325 Ok(())
326}
327
328pub fn send_admin_request(
330 broker_endpoint: &str,
331 request: AdminRequest,
332) -> Result<AdminReply, BrokerClientError> {
333 let mut stream =
334 connect_local_socket(broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
335 let request_frame = Frame {
336 envelope_version: PROTOCOL_VERSION,
337 kind: FrameKind::Request as i32,
338 payload_protocol: ADMIN_PAYLOAD_PROTOCOL,
339 payload: request.encode_to_vec(),
340 request_id: 1,
341 payload_encoding: PayloadEncoding::None as i32,
342 deadline_unix_ms: 0,
343 traceparent: String::new(),
344 tracestate: String::new(),
345 };
346 write_frame(&mut stream, &request_frame.encode_to_vec())?;
347
348 let response_bytes = read_frame(&mut stream)?;
349 let response_frame =
350 Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
351 validate_response_frame(
352 &response_frame,
353 ADMIN_PAYLOAD_PROTOCOL,
354 "payload_protocol is not admin",
355 )?;
356 AdminReply::decode(response_frame.payload.as_slice())
357 .map_err(BrokerClientError::DecodeAdminReply)
358}
359
360pub fn connect_local_socket(endpoint: &str) -> io::Result<interprocess::local_socket::Stream> {
362 let name = local_socket_name(endpoint)?;
363 LocalSocketStream::connect(name)
364}
365
366fn broker_hello(
367 request: &ConnectBackendRequest<'_>,
368) -> Result<(interprocess::local_socket::Stream, Negotiated), BrokerClientError> {
369 let mut stream =
370 connect_local_socket(request.broker_endpoint).map_err(BrokerClientError::BrokerConnect)?;
371 let hello = request.hello();
372 let request_frame = Frame {
373 envelope_version: PROTOCOL_VERSION,
374 kind: FrameKind::Request as i32,
375 payload_protocol: CONTROL_PAYLOAD_PROTOCOL,
376 payload: hello.encode_to_vec(),
377 request_id: 1,
378 payload_encoding: PayloadEncoding::None as i32,
379 deadline_unix_ms: 0,
380 traceparent: String::new(),
381 tracestate: String::new(),
382 };
383 write_frame(&mut stream, &request_frame.encode_to_vec())?;
384
385 let response_bytes = read_frame(&mut stream)?;
386 let response_frame =
387 Frame::decode(response_bytes.as_slice()).map_err(BrokerClientError::DecodeFrame)?;
388 validate_response_frame(
389 &response_frame,
390 CONTROL_PAYLOAD_PROTOCOL,
391 "payload_protocol is not control-plane",
392 )?;
393 let reply = HelloReply::decode(response_frame.payload.as_slice())
394 .map_err(BrokerClientError::DecodeHelloReply)?;
395 match reply
396 .result
397 .ok_or(BrokerClientError::MissingHelloReplyResult)?
398 {
399 HelloReplyResult::Negotiated(negotiated) => Ok((stream, negotiated)),
400 HelloReplyResult::Refused(refused) => Err(BrokerClientError::Refused {
401 code: ErrorCode::try_from(refused.code).unwrap_or(ErrorCode::Unspecified),
402 reason: refused.reason,
403 retry_after_ms: refused.retry_after_ms,
404 }),
405 }
406}
407
408fn validate_response_frame(
409 frame: &Frame,
410 expected_payload_protocol: u32,
411 payload_protocol_error: &'static str,
412) -> Result<(), BrokerClientError> {
413 if frame.envelope_version != PROTOCOL_VERSION {
414 return Err(BrokerClientError::UnexpectedResponseFrame(
415 "envelope_version is not v1",
416 ));
417 }
418 if FrameKind::try_from(frame.kind) != Ok(FrameKind::Response) {
419 return Err(BrokerClientError::UnexpectedResponseFrame(
420 "kind is not RESPONSE",
421 ));
422 }
423 if frame.payload_protocol != expected_payload_protocol {
424 return Err(BrokerClientError::UnexpectedResponseFrame(
425 payload_protocol_error,
426 ));
427 }
428 if PayloadEncoding::try_from(frame.payload_encoding) != Ok(PayloadEncoding::None) {
429 return Err(BrokerClientError::UnexpectedResponseFrame(
430 "payload is compressed",
431 ));
432 }
433 Ok(())
434}
435
436#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
438#[error("RUNNING_PROCESS_DISABLE must be unset or 1, got {value:?}")]
439pub struct BrokerDisableEnvError {
440 pub value: String,
442}
443
444#[derive(Debug, thiserror::Error)]
446pub enum BrokerClientError {
447 #[error("failed to connect to broker: {0}")]
449 BrokerConnect(io::Error),
450 #[error("failed to connect to negotiated backend: {0}")]
452 BackendConnect(io::Error),
453 #[error(transparent)]
455 Framing(#[from] FramingError),
456 #[error("failed to decode broker response Frame: {0}")]
458 DecodeFrame(prost::DecodeError),
459 #[error("failed to decode broker HelloReply: {0}")]
461 DecodeHelloReply(prost::DecodeError),
462 #[error("failed to decode broker AdminReply: {0}")]
464 DecodeAdminReply(prost::DecodeError),
465 #[error("unexpected broker response frame: {0}")]
467 UnexpectedResponseFrame(&'static str),
468 #[error("broker HelloReply did not contain a result")]
470 MissingHelloReplyResult,
471 #[error("broker refused Hello: {reason} ({code:?}, retry_after_ms={retry_after_ms})")]
473 Refused {
474 code: ErrorCode,
476 reason: String,
478 retry_after_ms: u64,
480 },
481 #[error("broker negotiated an empty backend endpoint")]
483 EmptyBackendPipe,
484}