running_process/broker/backend_sdk/mux.rs
1//! Sans-io endpoint multiplexer for backend daemons (#412).
2//!
3//! A consumer daemon's IPC endpoint serves three kinds of traffic:
4//!
5//! 1. its own **legacy wire** (whatever framing the daemon spoke before
6//! adopting running-process),
7//! 2. **`BackendHandle` identity probes** (nonce challenge frames sent
8//! by [`crate::broker::backend_handle::BackendHandle::probe_with_service`]),
9//! 3. **consumer payload frames** — the daemon's own requests carried
10//! opaquely in v1 `Frame` envelopes under a registered payload
11//! protocol.
12//!
13//! Before #412, every consumer hand-rolled the byte-level
14//! disambiguation (including the genuinely ambiguous case where a
15//! legacy length header makes byte 0 equal the v1 framing byte), probe
16//! validation, and probe-response construction. `BackendEndpointMux`
17//! owns all of it as a pure function of the read buffer: the daemon
18//! keeps its own sockets, runtime, and buffered reads, and calls
19//! [`BackendEndpointMux::poll`] whenever bytes arrive.
20
21use crate::broker::backend_lifecycle::identity::DaemonProcess;
22use crate::broker::backend_lifecycle::probe::{
23 endpoint_probe_request_from_frame, endpoint_probe_response_frame, EndpointProbeServerError,
24};
25use crate::broker::protocol::{
26 encode_framed, registry, try_decode_framed, Frame, FrameKind, FramingError, ENVELOPE_VERSION,
27};
28
29/// Consumer verdict on whether buffered bytes belong to its legacy wire.
30///
31/// The detector runs **before** any frame decoding, so a legacy header
32/// whose first byte happens to equal the v1 framing byte
33/// ([`ENVELOPE_VERSION`]) is classified by the consumer, which knows
34/// its own header layout, not by running-process.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum LegacyClassification {
37 /// The bytes start a legacy-wire message; the mux steps aside.
38 Legacy,
39 /// The bytes are not legacy wire; the mux continues frame handling.
40 NotLegacy,
41 /// Too few bytes to decide; read more and poll again.
42 NeedMoreBytes,
43}
44
45/// What the daemon's accept loop should do with its buffered bytes.
46#[derive(Debug)]
47pub enum MuxPoll {
48 /// Not enough bytes to classify or to complete a frame. Read more
49 /// bytes into the buffer and poll again. Nothing is consumed.
50 NeedMoreBytes,
51 /// The buffer starts a legacy-wire message. Nothing is consumed;
52 /// hand the buffer to the daemon's legacy decoder.
53 Legacy,
54 /// A `BackendHandle` identity probe was answered. Write `reply` to
55 /// the peer verbatim, then advance the read buffer by `consumed`.
56 ProbeAnswered {
57 /// Complete wire bytes (`[1][len][prost Frame]`) to send back.
58 reply: Vec<u8>,
59 /// Bytes of the probe request to consume from the buffer front.
60 consumed: usize,
61 },
62 /// A consumer payload frame arrived. Dispatch `frame.payload`
63 /// through the daemon's request handler (correlate on
64 /// `frame.request_id`), then advance the read buffer by `consumed`.
65 Payload {
66 /// The decoded consumer frame (kind, payload protocol, payload,
67 /// request id, trace context).
68 frame: Frame,
69 /// Bytes the frame occupied; consume from the buffer front.
70 consumed: usize,
71 },
72}
73
74/// Errors surfaced by [`BackendEndpointMux::poll`].
75///
76/// All of them mean the connection is in an unknown state; the daemon
77/// should drop it (matching broker behavior for framing violations).
78#[derive(Debug, thiserror::Error)]
79pub enum MuxError {
80 /// Outer framing violation (oversize body, undecodable frame).
81 #[error(transparent)]
82 Framing(#[from] FramingError),
83 /// A frame with the probe payload protocol failed probe validation.
84 #[error("malformed BackendHandle probe: {0}")]
85 MalformedProbe(#[from] EndpointProbeServerError),
86 /// A frame carried a first-party payload protocol this endpoint
87 /// does not serve (e.g. broker Hello sent to a backend daemon).
88 #[error(
89 "unexpected first-party frame on backend endpoint \
90 (payload_protocol {payload_protocol:#06X})"
91 )]
92 UnexpectedFirstPartyFrame {
93 /// The first-party payload protocol the peer used.
94 payload_protocol: u32,
95 },
96 /// A frame carried a payload protocol other than the ones this mux
97 /// was configured to accept.
98 #[error("frame for unserved payload protocol {payload_protocol:#06X}")]
99 UnservedPayloadProtocol {
100 /// The payload protocol the peer used.
101 payload_protocol: u32,
102 },
103}
104
105/// Sans-io classifier for a backend daemon endpoint.
106///
107/// Construct one per daemon (it is cheap and immutable; sharing behind
108/// an `Arc` is fine) with the daemon's own [`DaemonProcess`] identity,
109/// the consumer payload protocols it serves, and a legacy detector
110/// closure. In the accept loop, buffer reads as usual and call
111/// [`Self::poll`]:
112///
113/// ```
114/// use running_process::broker::backend_sdk::{
115/// BackendEndpointMux, LegacyClassification, MuxPoll,
116/// };
117/// use running_process::broker::backend_handle::DaemonProcess;
118/// use running_process::broker::protocol::Endpoint;
119///
120/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
121/// let endpoint = Endpoint::unix_socket("my-daemon", "/tmp/my-daemon.sock")?;
122/// let daemon = DaemonProcess::current_process(endpoint, None)?;
123/// let mux = BackendEndpointMux::new(daemon, &[0x7A63], |buf| {
124/// // First byte of my legacy wire is always the ASCII 'L' magic.
125/// match buf.first() {
126/// None => LegacyClassification::NeedMoreBytes,
127/// Some(b'L') => LegacyClassification::Legacy,
128/// Some(_) => LegacyClassification::NotLegacy,
129/// }
130/// });
131///
132/// let mut read_buf: Vec<u8> = Vec::new();
133/// // ... read bytes into read_buf, then:
134/// match mux.poll(&read_buf)? {
135/// MuxPoll::NeedMoreBytes => { /* read more */ }
136/// MuxPoll::Legacy => { /* my own decoder takes over */ }
137/// MuxPoll::ProbeAnswered { reply, consumed } => {
138/// // write `reply`, then read_buf.drain(..consumed);
139/// }
140/// MuxPoll::Payload { frame, consumed } => {
141/// // dispatch frame.payload, then read_buf.drain(..consumed);
142/// }
143/// }
144/// # Ok(())
145/// # }
146/// ```
147///
148/// Daemons with no legacy wire pass a detector that always returns
149/// [`LegacyClassification::NotLegacy`].
150pub struct BackendEndpointMux<F> {
151 daemon: DaemonProcess,
152 served_payload_protocols: Vec<u32>,
153 legacy_detector: F,
154}
155
156impl<F> BackendEndpointMux<F>
157where
158 F: Fn(&[u8]) -> LegacyClassification,
159{
160 /// Build a mux for `daemon`, serving the given consumer payload
161 /// protocols, with a consumer-provided legacy-wire detector.
162 pub fn new(
163 daemon: DaemonProcess,
164 served_payload_protocols: &[u32],
165 legacy_detector: F,
166 ) -> Self {
167 Self {
168 daemon,
169 served_payload_protocols: served_payload_protocols.to_vec(),
170 legacy_detector,
171 }
172 }
173
174 /// Classify the front of `buf` and, for probes, build the reply.
175 ///
176 /// Pure with respect to I/O: never reads or writes a socket and
177 /// never consumes from `buf` — the returned `consumed` counts tell
178 /// the caller how far to advance. See [`MuxPoll`] for the contract
179 /// of each verdict and [`MuxError`] for connection-fatal outcomes.
180 pub fn poll(&self, buf: &[u8]) -> Result<MuxPoll, MuxError> {
181 if buf.is_empty() {
182 return Ok(MuxPoll::NeedMoreBytes);
183 }
184
185 // The consumer's own wire wins ties: only it knows whether a
186 // leading 0x01 is a legacy length byte or our framing byte.
187 match (self.legacy_detector)(buf) {
188 LegacyClassification::Legacy => return Ok(MuxPoll::Legacy),
189 LegacyClassification::NeedMoreBytes => return Ok(MuxPoll::NeedMoreBytes),
190 LegacyClassification::NotLegacy => {}
191 }
192
193 // Not legacy and not our framing byte: still the consumer's
194 // problem (its decoder owns the error path for garbage).
195 if buf[0] != ENVELOPE_VERSION {
196 return Ok(MuxPoll::Legacy);
197 }
198
199 let Some(decoded) = try_decode_framed(buf)? else {
200 return Ok(MuxPoll::NeedMoreBytes);
201 };
202 let frame = decoded.frame;
203 let consumed = decoded.consumed;
204
205 if frame.payload_protocol == registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL {
206 let request = endpoint_probe_request_from_frame(&frame)?;
207 let response = endpoint_probe_response_frame(&request, &self.daemon);
208 let reply = encode_framed(&response)?;
209 return Ok(MuxPoll::ProbeAnswered { reply, consumed });
210 }
211
212 if registry::is_first_party(frame.payload_protocol) {
213 return Err(MuxError::UnexpectedFirstPartyFrame {
214 payload_protocol: frame.payload_protocol,
215 });
216 }
217
218 if !self
219 .served_payload_protocols
220 .contains(&frame.payload_protocol)
221 {
222 return Err(MuxError::UnservedPayloadProtocol {
223 payload_protocol: frame.payload_protocol,
224 });
225 }
226
227 // Cancel/event frames are reserved for v1.x; requests and
228 // responses both pass through so daemons can also act as
229 // frame clients on reused connections.
230 let _ = FrameKind::try_from(frame.kind);
231 Ok(MuxPoll::Payload { frame, consumed })
232 }
233
234 /// The daemon identity this mux answers probes with.
235 pub fn daemon(&self) -> &DaemonProcess {
236 &self.daemon
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::broker::backend_lifecycle::probe::PROBE_NONCE_BYTES;
244 use crate::broker::protocol::{registry, Endpoint, PayloadEncoding};
245 use prost::Message;
246
247 const TEST_PROTOCOL: u32 = 0x7001;
248
249 fn test_daemon() -> DaemonProcess {
250 let endpoint = Endpoint::unix_socket("mux-test", "/tmp/mux-test.sock").expect("endpoint");
251 DaemonProcess::current_process(endpoint, Some(30)).expect("identity")
252 }
253
254 fn test_mux() -> BackendEndpointMux<impl Fn(&[u8]) -> LegacyClassification> {
255 BackendEndpointMux::new(test_daemon(), &[TEST_PROTOCOL], |buf: &[u8]| {
256 // Toy legacy wire: 4-byte LE length, then 4-byte LE version 15.
257 if buf.len() < 8 {
258 return LegacyClassification::NeedMoreBytes;
259 }
260 let version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
261 if version == 15 {
262 LegacyClassification::Legacy
263 } else {
264 LegacyClassification::NotLegacy
265 }
266 })
267 }
268
269 fn probe_request_wire(nonce: [u8; PROBE_NONCE_BYTES]) -> Vec<u8> {
270 let frame = Frame::request(
271 registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
272 nonce.to_vec(),
273 )
274 .with_request_id(7);
275 encode_framed(&frame).expect("encode probe")
276 }
277
278 #[test]
279 fn empty_and_short_buffers_need_more_bytes() {
280 let mux = test_mux();
281 assert!(matches!(mux.poll(&[]), Ok(MuxPoll::NeedMoreBytes)));
282 // 0x01 leading byte is ambiguous until the detector can rule.
283 assert!(matches!(mux.poll(&[1, 0, 0]), Ok(MuxPoll::NeedMoreBytes)));
284 }
285
286 #[test]
287 fn legacy_header_wins_even_with_frame_version_first_byte() {
288 let mux = test_mux();
289 // Legacy message of length 0x...01 — byte 0 collides with the
290 // v1 framing byte. The detector sees version 15 and claims it.
291 let mut legacy = 257u32.to_le_bytes().to_vec();
292 legacy.extend_from_slice(&15u32.to_le_bytes());
293 assert_eq!(legacy[0], ENVELOPE_VERSION);
294 assert!(matches!(mux.poll(&legacy), Ok(MuxPoll::Legacy)));
295
296 // Non-frame first byte goes to the consumer too.
297 assert!(matches!(
298 mux.poll(&[42, 0, 0, 0, 0, 16, 0, 0, 0]),
299 Ok(MuxPoll::Legacy)
300 ));
301 }
302
303 #[test]
304 fn probe_request_is_answered_with_identity_echo() {
305 let mux = test_mux();
306 let nonce = [9u8; PROBE_NONCE_BYTES];
307 let wire = probe_request_wire(nonce);
308
309 // Partial probe frames wait for more bytes.
310 assert!(matches!(
311 mux.poll(&wire[..wire.len() - 1]),
312 Ok(MuxPoll::NeedMoreBytes)
313 ));
314
315 let MuxPoll::ProbeAnswered { reply, consumed } = mux.poll(&wire).expect("poll") else {
316 panic!("expected ProbeAnswered");
317 };
318 assert_eq!(consumed, wire.len());
319
320 let decoded = try_decode_framed(&reply)
321 .expect("decode")
322 .expect("complete");
323 let frame = decoded.frame;
324 assert_eq!(
325 frame.payload_protocol,
326 registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL
327 );
328 assert_eq!(frame.request_id, 7);
329 assert_eq!(&frame.payload[..PROBE_NONCE_BYTES], &nonce);
330 let identity =
331 crate::broker::protocol::DaemonProcess::decode(&frame.payload[PROBE_NONCE_BYTES..])
332 .expect("identity payload");
333 assert_eq!(identity.pid, std::process::id());
334 }
335
336 #[test]
337 fn consumer_payload_frame_passes_through() {
338 let mux = test_mux();
339 let request = Frame::request(TEST_PROTOCOL, b"ping".to_vec()).with_request_id(3);
340 let wire = encode_framed(&request).expect("encode");
341 let MuxPoll::Payload { frame, consumed } = mux.poll(&wire).expect("poll") else {
342 panic!("expected Payload");
343 };
344 assert_eq!(frame, request);
345 assert_eq!(consumed, wire.len());
346 }
347
348 #[test]
349 fn first_party_and_unserved_protocols_are_rejected() {
350 let mux = test_mux();
351 // Payload bytes keep the wire ≥ 8 bytes so the toy legacy
352 // detector can classify (a 7-byte frame is legitimately
353 // ambiguous and yields NeedMoreBytes instead).
354 let hello = Frame::request(registry::CONTROL_PAYLOAD_PROTOCOL, b"hello".to_vec());
355 let wire = encode_framed(&hello).expect("encode");
356 assert!(matches!(
357 mux.poll(&wire),
358 Err(MuxError::UnexpectedFirstPartyFrame {
359 payload_protocol: 0
360 })
361 ));
362
363 let other = Frame::request(0x7002, Vec::new());
364 let wire = encode_framed(&other).expect("encode");
365 assert!(matches!(
366 mux.poll(&wire),
367 Err(MuxError::UnservedPayloadProtocol {
368 payload_protocol: 0x7002
369 })
370 ));
371 }
372
373 #[test]
374 fn malformed_probe_is_connection_fatal() {
375 let mux = test_mux();
376 let mut bad = Frame::request(
377 registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
378 vec![0u8; PROBE_NONCE_BYTES - 1],
379 );
380 bad.payload_encoding = PayloadEncoding::None as i32;
381 let wire = encode_framed(&bad).expect("encode");
382 assert!(matches!(mux.poll(&wire), Err(MuxError::MalformedProbe(_))));
383 }
384}