Skip to main content

running_process/broker/backend_sdk/
mux.rs

1//! Sans-io endpoint multiplexer for backend daemons (#412).
2//!
3//! A consumer daemon's IPC endpoint serves three kinds of traffic:
4//!
5//! 1. its own **legacy wire** (whatever framing the daemon spoke before
6//!    adopting running-process),
7//! 2. **`BackendHandle` identity probes** (nonce challenge frames sent
8//!    by [`crate::broker::backend_handle::BackendHandle::probe_with_service`]),
9//! 3. **consumer payload frames** — the daemon's own requests carried
10//!    opaquely in v1 `Frame` envelopes under a registered payload
11//!    protocol.
12//!
13//! Before #412, every consumer hand-rolled the byte-level
14//! disambiguation (including the genuinely ambiguous case where a
15//! legacy length header makes byte 0 equal the v1 framing byte), probe
16//! validation, and probe-response construction. `BackendEndpointMux`
17//! owns all of it as a pure function of the read buffer: the daemon
18//! keeps its own sockets, runtime, and buffered reads, and calls
19//! [`BackendEndpointMux::poll`] whenever bytes arrive.
20
21use crate::broker::backend_lifecycle::identity::DaemonProcess;
22use crate::broker::backend_lifecycle::probe::{
23    endpoint_probe_request_from_frame, endpoint_probe_response_frame, EndpointProbeServerError,
24};
25use crate::broker::protocol::{
26    encode_framed, registry, try_decode_framed, Frame, FrameKind, FramingError, ENVELOPE_VERSION,
27};
28
29/// Consumer verdict on whether buffered bytes belong to its legacy wire.
30///
31/// The detector runs **before** any frame decoding, so a legacy header
32/// whose first byte happens to equal the v1 framing byte
33/// ([`ENVELOPE_VERSION`]) is classified by the consumer, which knows
34/// its own header layout, not by running-process.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum LegacyClassification {
37    /// The bytes start a legacy-wire message; the mux steps aside.
38    Legacy,
39    /// The bytes are not legacy wire; the mux continues frame handling.
40    NotLegacy,
41    /// Too few bytes to decide; read more and poll again.
42    NeedMoreBytes,
43}
44
45/// What the daemon's accept loop should do with its buffered bytes.
46#[derive(Debug)]
47pub enum MuxPoll {
48    /// Not enough bytes to classify or to complete a frame. Read more
49    /// bytes into the buffer and poll again. Nothing is consumed.
50    NeedMoreBytes,
51    /// The buffer starts a legacy-wire message. Nothing is consumed;
52    /// hand the buffer to the daemon's legacy decoder.
53    Legacy,
54    /// A `BackendHandle` identity probe was answered. Write `reply` to
55    /// the peer verbatim, then advance the read buffer by `consumed`.
56    ProbeAnswered {
57        /// Complete wire bytes (`[1][len][prost Frame]`) to send back.
58        reply: Vec<u8>,
59        /// Bytes of the probe request to consume from the buffer front.
60        consumed: usize,
61    },
62    /// A consumer payload frame arrived. Dispatch `frame.payload`
63    /// through the daemon's request handler (correlate on
64    /// `frame.request_id`), then advance the read buffer by `consumed`.
65    Payload {
66        /// The decoded consumer frame (kind, payload protocol, payload,
67        /// request id, trace context).
68        frame: Frame,
69        /// Bytes the frame occupied; consume from the buffer front.
70        consumed: usize,
71    },
72}
73
74/// Errors surfaced by [`BackendEndpointMux::poll`].
75///
76/// All of them mean the connection is in an unknown state; the daemon
77/// should drop it (matching broker behavior for framing violations).
78#[derive(Debug, thiserror::Error)]
79pub enum MuxError {
80    /// Outer framing violation (oversize body, undecodable frame).
81    #[error(transparent)]
82    Framing(#[from] FramingError),
83    /// A frame with the probe payload protocol failed probe validation.
84    #[error("malformed BackendHandle probe: {0}")]
85    MalformedProbe(#[from] EndpointProbeServerError),
86    /// A frame carried a first-party payload protocol this endpoint
87    /// does not serve (e.g. broker Hello sent to a backend daemon).
88    #[error(
89        "unexpected first-party frame on backend endpoint \
90         (payload_protocol {payload_protocol:#06X})"
91    )]
92    UnexpectedFirstPartyFrame {
93        /// The first-party payload protocol the peer used.
94        payload_protocol: u32,
95    },
96    /// A frame carried a payload protocol other than the ones this mux
97    /// was configured to accept.
98    #[error("frame for unserved payload protocol {payload_protocol:#06X}")]
99    UnservedPayloadProtocol {
100        /// The payload protocol the peer used.
101        payload_protocol: u32,
102    },
103}
104
105/// Sans-io classifier for a backend daemon endpoint.
106///
107/// Construct one per daemon (it is cheap and immutable; sharing behind
108/// an `Arc` is fine) with the daemon's own [`DaemonProcess`] identity,
109/// the consumer payload protocols it serves, and a legacy detector
110/// closure. In the accept loop, buffer reads as usual and call
111/// [`Self::poll`]:
112///
113/// ```
114/// use running_process::broker::backend_sdk::{
115///     BackendEndpointMux, LegacyClassification, MuxPoll,
116/// };
117/// use running_process::broker::backend_handle::DaemonProcess;
118/// use running_process::broker::protocol::Endpoint;
119///
120/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
121/// let endpoint = Endpoint::unix_socket("my-daemon", "/tmp/my-daemon.sock")?;
122/// let daemon = DaemonProcess::current_process(endpoint, None)?;
123/// let mux = BackendEndpointMux::new(daemon, &[0x7A63], |buf| {
124///     // First byte of my legacy wire is always the ASCII 'L' magic.
125///     match buf.first() {
126///         None => LegacyClassification::NeedMoreBytes,
127///         Some(b'L') => LegacyClassification::Legacy,
128///         Some(_) => LegacyClassification::NotLegacy,
129///     }
130/// });
131///
132/// let mut read_buf: Vec<u8> = Vec::new();
133/// // ... read bytes into read_buf, then:
134/// match mux.poll(&read_buf)? {
135///     MuxPoll::NeedMoreBytes => { /* read more */ }
136///     MuxPoll::Legacy => { /* my own decoder takes over */ }
137///     MuxPoll::ProbeAnswered { reply, consumed } => {
138///         // write `reply`, then read_buf.drain(..consumed);
139///     }
140///     MuxPoll::Payload { frame, consumed } => {
141///         // dispatch frame.payload, then read_buf.drain(..consumed);
142///     }
143/// }
144/// # Ok(())
145/// # }
146/// ```
147///
148/// Daemons with no legacy wire pass a detector that always returns
149/// [`LegacyClassification::NotLegacy`].
150pub struct BackendEndpointMux<F> {
151    daemon: DaemonProcess,
152    served_payload_protocols: Vec<u32>,
153    legacy_detector: F,
154}
155
156impl<F> BackendEndpointMux<F>
157where
158    F: Fn(&[u8]) -> LegacyClassification,
159{
160    /// Build a mux for `daemon`, serving the given consumer payload
161    /// protocols, with a consumer-provided legacy-wire detector.
162    pub fn new(
163        daemon: DaemonProcess,
164        served_payload_protocols: &[u32],
165        legacy_detector: F,
166    ) -> Self {
167        Self {
168            daemon,
169            served_payload_protocols: served_payload_protocols.to_vec(),
170            legacy_detector,
171        }
172    }
173
174    /// Classify the front of `buf` and, for probes, build the reply.
175    ///
176    /// Pure with respect to I/O: never reads or writes a socket and
177    /// never consumes from `buf` — the returned `consumed` counts tell
178    /// the caller how far to advance. See [`MuxPoll`] for the contract
179    /// of each verdict and [`MuxError`] for connection-fatal outcomes.
180    pub fn poll(&self, buf: &[u8]) -> Result<MuxPoll, MuxError> {
181        if buf.is_empty() {
182            return Ok(MuxPoll::NeedMoreBytes);
183        }
184
185        // The consumer's own wire wins ties: only it knows whether a
186        // leading 0x01 is a legacy length byte or our framing byte.
187        match (self.legacy_detector)(buf) {
188            LegacyClassification::Legacy => return Ok(MuxPoll::Legacy),
189            LegacyClassification::NeedMoreBytes => return Ok(MuxPoll::NeedMoreBytes),
190            LegacyClassification::NotLegacy => {}
191        }
192
193        // Not legacy and not our framing byte: still the consumer's
194        // problem (its decoder owns the error path for garbage).
195        if buf[0] != ENVELOPE_VERSION {
196            return Ok(MuxPoll::Legacy);
197        }
198
199        let Some(decoded) = try_decode_framed(buf)? else {
200            return Ok(MuxPoll::NeedMoreBytes);
201        };
202        let frame = decoded.frame;
203        let consumed = decoded.consumed;
204
205        if frame.payload_protocol == registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL {
206            let request = endpoint_probe_request_from_frame(&frame)?;
207            let response = endpoint_probe_response_frame(&request, &self.daemon);
208            let reply = encode_framed(&response)?;
209            return Ok(MuxPoll::ProbeAnswered { reply, consumed });
210        }
211
212        if registry::is_first_party(frame.payload_protocol) {
213            return Err(MuxError::UnexpectedFirstPartyFrame {
214                payload_protocol: frame.payload_protocol,
215            });
216        }
217
218        if !self
219            .served_payload_protocols
220            .contains(&frame.payload_protocol)
221        {
222            return Err(MuxError::UnservedPayloadProtocol {
223                payload_protocol: frame.payload_protocol,
224            });
225        }
226
227        // Cancel/event frames are reserved for v1.x; requests and
228        // responses both pass through so daemons can also act as
229        // frame clients on reused connections.
230        let _ = FrameKind::try_from(frame.kind);
231        Ok(MuxPoll::Payload { frame, consumed })
232    }
233
234    /// The daemon identity this mux answers probes with.
235    pub fn daemon(&self) -> &DaemonProcess {
236        &self.daemon
237    }
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::broker::backend_lifecycle::probe::PROBE_NONCE_BYTES;
244    use crate::broker::protocol::{registry, Endpoint, PayloadEncoding};
245    use prost::Message;
246
247    const TEST_PROTOCOL: u32 = 0x7001;
248
249    fn test_daemon() -> DaemonProcess {
250        let endpoint = Endpoint::unix_socket("mux-test", "/tmp/mux-test.sock").expect("endpoint");
251        DaemonProcess::current_process(endpoint, Some(30)).expect("identity")
252    }
253
254    fn test_mux() -> BackendEndpointMux<impl Fn(&[u8]) -> LegacyClassification> {
255        BackendEndpointMux::new(test_daemon(), &[TEST_PROTOCOL], |buf: &[u8]| {
256            // Toy legacy wire: 4-byte LE length, then 4-byte LE version 15.
257            if buf.len() < 8 {
258                return LegacyClassification::NeedMoreBytes;
259            }
260            let version = u32::from_le_bytes([buf[4], buf[5], buf[6], buf[7]]);
261            if version == 15 {
262                LegacyClassification::Legacy
263            } else {
264                LegacyClassification::NotLegacy
265            }
266        })
267    }
268
269    fn probe_request_wire(nonce: [u8; PROBE_NONCE_BYTES]) -> Vec<u8> {
270        let frame = Frame::request(
271            registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
272            nonce.to_vec(),
273        )
274        .with_request_id(7);
275        encode_framed(&frame).expect("encode probe")
276    }
277
278    #[test]
279    fn empty_and_short_buffers_need_more_bytes() {
280        let mux = test_mux();
281        assert!(matches!(mux.poll(&[]), Ok(MuxPoll::NeedMoreBytes)));
282        // 0x01 leading byte is ambiguous until the detector can rule.
283        assert!(matches!(mux.poll(&[1, 0, 0]), Ok(MuxPoll::NeedMoreBytes)));
284    }
285
286    #[test]
287    fn legacy_header_wins_even_with_frame_version_first_byte() {
288        let mux = test_mux();
289        // Legacy message of length 0x...01 — byte 0 collides with the
290        // v1 framing byte. The detector sees version 15 and claims it.
291        let mut legacy = 257u32.to_le_bytes().to_vec();
292        legacy.extend_from_slice(&15u32.to_le_bytes());
293        assert_eq!(legacy[0], ENVELOPE_VERSION);
294        assert!(matches!(mux.poll(&legacy), Ok(MuxPoll::Legacy)));
295
296        // Non-frame first byte goes to the consumer too.
297        assert!(matches!(
298            mux.poll(&[42, 0, 0, 0, 0, 16, 0, 0, 0]),
299            Ok(MuxPoll::Legacy)
300        ));
301    }
302
303    #[test]
304    fn probe_request_is_answered_with_identity_echo() {
305        let mux = test_mux();
306        let nonce = [9u8; PROBE_NONCE_BYTES];
307        let wire = probe_request_wire(nonce);
308
309        // Partial probe frames wait for more bytes.
310        assert!(matches!(
311            mux.poll(&wire[..wire.len() - 1]),
312            Ok(MuxPoll::NeedMoreBytes)
313        ));
314
315        let MuxPoll::ProbeAnswered { reply, consumed } = mux.poll(&wire).expect("poll") else {
316            panic!("expected ProbeAnswered");
317        };
318        assert_eq!(consumed, wire.len());
319
320        let decoded = try_decode_framed(&reply)
321            .expect("decode")
322            .expect("complete");
323        let frame = decoded.frame;
324        assert_eq!(
325            frame.payload_protocol,
326            registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL
327        );
328        assert_eq!(frame.request_id, 7);
329        assert_eq!(&frame.payload[..PROBE_NONCE_BYTES], &nonce);
330        let identity =
331            crate::broker::protocol::DaemonProcess::decode(&frame.payload[PROBE_NONCE_BYTES..])
332                .expect("identity payload");
333        assert_eq!(identity.pid, std::process::id());
334    }
335
336    #[test]
337    fn consumer_payload_frame_passes_through() {
338        let mux = test_mux();
339        let request = Frame::request(TEST_PROTOCOL, b"ping".to_vec()).with_request_id(3);
340        let wire = encode_framed(&request).expect("encode");
341        let MuxPoll::Payload { frame, consumed } = mux.poll(&wire).expect("poll") else {
342            panic!("expected Payload");
343        };
344        assert_eq!(frame, request);
345        assert_eq!(consumed, wire.len());
346    }
347
348    #[test]
349    fn first_party_and_unserved_protocols_are_rejected() {
350        let mux = test_mux();
351        // Payload bytes keep the wire ≥ 8 bytes so the toy legacy
352        // detector can classify (a 7-byte frame is legitimately
353        // ambiguous and yields NeedMoreBytes instead).
354        let hello = Frame::request(registry::CONTROL_PAYLOAD_PROTOCOL, b"hello".to_vec());
355        let wire = encode_framed(&hello).expect("encode");
356        assert!(matches!(
357            mux.poll(&wire),
358            Err(MuxError::UnexpectedFirstPartyFrame {
359                payload_protocol: 0
360            })
361        ));
362
363        let other = Frame::request(0x7002, Vec::new());
364        let wire = encode_framed(&other).expect("encode");
365        assert!(matches!(
366            mux.poll(&wire),
367            Err(MuxError::UnservedPayloadProtocol {
368                payload_protocol: 0x7002
369            })
370        ));
371    }
372
373    #[test]
374    fn malformed_probe_is_connection_fatal() {
375        let mux = test_mux();
376        let mut bad = Frame::request(
377            registry::BACKEND_HANDLE_PROBE_PAYLOAD_PROTOCOL,
378            vec![0u8; PROBE_NONCE_BYTES - 1],
379        );
380        bad.payload_encoding = PayloadEncoding::None as i32;
381        let wire = encode_framed(&bad).expect("encode");
382        assert!(matches!(mux.poll(&wire), Err(MuxError::MalformedProbe(_))));
383    }
384}