Skip to main content

running_process/broker/protocol/
frame_ext.rs

1//! Ergonomic constructors and buffer-level codecs for the v1 `Frame`
2//! envelope (#412).
3//!
4//! The broker v1 post-mortem found every consumer hand-building
5//! `Frame { envelope_version: 1, kind, payload_protocol, payload,
6//! request_id, payload_encoding: None, deadline_unix_ms: 0,
7//! traceparent: "", tracestate: "" }` plus the outer
8//! `[u8 1][u32 LE len]` header, and re-deriving the
9//! peek-without-consume decode against a growable read buffer. This
10//! module owns those four pieces:
11//!
12//! - [`Frame::request`] / [`Frame::response_to`] — envelope
13//!   construction with correct v1 defaults.
14//! - [`encode_framed`] — one `Frame` to complete wire bytes
15//!   (`[1][len][prost]`).
16//! - [`try_decode_framed`] — incremental decode from a byte buffer,
17//!   returning how many bytes the caller must consume.
18//! - [`Endpoint::windows_pipe`] / [`Endpoint::unix_socket`] — endpoint
19//!   identities that respect the platform naming rules (notably: on
20//!   Windows the path is the **bare** pipe name, never
21//!   `\\.\pipe\`-prefixed, because running-process resolves endpoint
22//!   paths through interprocess's `GenericNamespaced` name type which
23//!   prepends the prefix itself).
24//!
25//! The sync-stream helpers stay in [`super::framing`]; this module is
26//! the buffer-level twin for consumers with their own buffered I/O
27//! (async daemons accumulating into a `BytesMut`-style buffer).
28
29use prost::Message;
30
31use crate::broker::protocol::{
32    Endpoint, Frame, FrameKind, FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES,
33    PROTOCOL_VERSION,
34};
35
36/// Length of the outer wire header: `[u8 framing_version][u32 LE body_len]`.
37pub const FRAME_HEADER_BYTES: usize = 5;
38
39impl Frame {
40    /// Build a v1 request frame with correct envelope defaults.
41    ///
42    /// Sets `envelope_version` to [`PROTOCOL_VERSION`], `kind` to
43    /// `REQUEST`, `payload_encoding` to `NONE`, no deadline, and empty
44    /// trace context. Callers correlate request/response pairs through
45    /// `request_id` (see [`Frame::with_request_id`]).
46    ///
47    /// ```
48    /// use running_process::broker::protocol::Frame;
49    ///
50    /// let frame = Frame::request(0x7A63, b"payload".to_vec()).with_request_id(7);
51    /// assert_eq!(frame.envelope_version, 1);
52    /// assert_eq!(frame.request_id, 7);
53    /// ```
54    pub fn request(payload_protocol: u32, payload: Vec<u8>) -> Self {
55        Self {
56            envelope_version: PROTOCOL_VERSION,
57            kind: FrameKind::Request as i32,
58            payload_protocol,
59            payload,
60            request_id: 0,
61            payload_encoding: PayloadEncoding::None as i32,
62            deadline_unix_ms: 0,
63            traceparent: String::new(),
64            tracestate: String::new(),
65        }
66    }
67
68    /// Build the v1 response frame for `request`.
69    ///
70    /// Echoes the request's `payload_protocol`, `request_id`, and trace
71    /// context, sets `kind` to `RESPONSE`, and carries `payload`.
72    ///
73    /// ```
74    /// use running_process::broker::protocol::Frame;
75    ///
76    /// let request = Frame::request(0x7A63, b"ping".to_vec()).with_request_id(9);
77    /// let response = Frame::response_to(&request, b"pong".to_vec());
78    /// assert_eq!(response.request_id, 9);
79    /// assert_eq!(response.payload_protocol, 0x7A63);
80    /// ```
81    pub fn response_to(request: &Self, payload: Vec<u8>) -> Self {
82        Self {
83            envelope_version: PROTOCOL_VERSION,
84            kind: FrameKind::Response as i32,
85            payload_protocol: request.payload_protocol,
86            payload,
87            request_id: request.request_id,
88            payload_encoding: PayloadEncoding::None as i32,
89            deadline_unix_ms: 0,
90            traceparent: request.traceparent.clone(),
91            tracestate: request.tracestate.clone(),
92        }
93    }
94
95    /// Set the correlation `request_id` (builder style).
96    #[must_use]
97    pub fn with_request_id(mut self, request_id: u64) -> Self {
98        self.request_id = request_id;
99        self
100    }
101}
102
103/// Encode one `Frame` into complete wire bytes:
104/// `[u8 framing_version=1][u32 LE body_len][prost Frame]`.
105///
106/// # Errors
107///
108/// [`FramingError::FrameTooLarge`] when the encoded body exceeds
109/// [`MAX_FRAME_BYTES`].
110pub fn encode_framed(frame: &Frame) -> Result<Vec<u8>, FramingError> {
111    let body_len = frame.encoded_len();
112    if body_len > MAX_FRAME_BYTES {
113        return Err(FramingError::FrameTooLarge {
114            body_length: body_len,
115            cap: MAX_FRAME_BYTES,
116        });
117    }
118    let mut wire = Vec::with_capacity(FRAME_HEADER_BYTES + body_len);
119    wire.push(ENVELOPE_VERSION);
120    wire.extend_from_slice(&(body_len as u32).to_le_bytes());
121    frame
122        .encode(&mut wire)
123        .expect("prost encoding into Vec cannot fail because Vec writes are infallible");
124    Ok(wire)
125}
126
127/// One frame decoded from a byte buffer by [`try_decode_framed`].
128#[derive(Debug, Clone, PartialEq)]
129pub struct DecodedFramed {
130    /// The decoded frame.
131    pub frame: Frame,
132    /// Total wire bytes the frame occupied (header + body). The caller
133    /// must consume exactly this many bytes from the front of its buffer.
134    pub consumed: usize,
135}
136
137/// Incrementally decode one `Frame` from the front of `buf`.
138///
139/// Returns `Ok(None)` when the buffer does not yet hold a complete
140/// frame (read more bytes and retry); the buffer is never logically
141/// consumed — on `Ok(Some(decoded))` the caller advances its buffer by
142/// `decoded.consumed` bytes.
143///
144/// # Errors
145///
146/// - [`FramingError::UnsupportedFramingVersion`] when the leading byte
147///   is not [`ENVELOPE_VERSION`]. Consumers multiplexing a legacy wire
148///   on the same endpoint should classify the buffer first (see
149///   [`crate::broker::backend_sdk::BackendEndpointMux`]).
150/// - [`FramingError::FrameTooLarge`] when the advertised body length
151///   exceeds [`MAX_FRAME_BYTES`].
152/// - [`FramingError::Decode`] when the body bytes are not a valid
153///   prost `Frame`.
154pub fn try_decode_framed(buf: &[u8]) -> Result<Option<DecodedFramed>, FramingError> {
155    if buf.is_empty() {
156        return Ok(None);
157    }
158    if buf[0] != ENVELOPE_VERSION {
159        return Err(FramingError::UnsupportedFramingVersion {
160            got: buf[0],
161            expected: ENVELOPE_VERSION,
162        });
163    }
164    if buf.len() < FRAME_HEADER_BYTES {
165        return Ok(None);
166    }
167    let body_len = u32::from_le_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
168    if body_len > MAX_FRAME_BYTES {
169        return Err(FramingError::FrameTooLarge {
170            body_length: body_len,
171            cap: MAX_FRAME_BYTES,
172        });
173    }
174    let total = FRAME_HEADER_BYTES + body_len;
175    if buf.len() < total {
176        return Ok(None);
177    }
178    let frame = Frame::decode(&buf[FRAME_HEADER_BYTES..total]).map_err(FramingError::Decode)?;
179    Ok(Some(DecodedFramed {
180        frame,
181        consumed: total,
182    }))
183}
184
185impl Endpoint {
186    /// Build a Windows named-pipe endpoint identity from a **bare**
187    /// pipe name.
188    ///
189    /// running-process resolves endpoint paths through interprocess's
190    /// `GenericNamespaced` name type, which prepends `\\.\pipe\`
191    /// itself. Passing an already-prefixed path silently addresses the
192    /// wrong pipe, so this constructor rejects it.
193    ///
194    /// Available on every platform so cross-platform consumers can
195    /// construct endpoint identities for manifests and diagnostics.
196    ///
197    /// ```
198    /// use running_process::broker::protocol::Endpoint;
199    ///
200    /// let endpoint = Endpoint::windows_pipe("my-daemon", "my-daemon-pipe").unwrap();
201    /// assert_eq!(endpoint.path, "my-daemon-pipe");
202    /// assert!(Endpoint::windows_pipe("my-daemon", r"\\.\pipe\my-daemon-pipe").is_err());
203    /// ```
204    ///
205    /// # Errors
206    ///
207    /// [`EndpointNameError::PrefixedPipeName`] when `pipe_name` starts
208    /// with `\\.\pipe\` (or the forward-slash spelling), and
209    /// [`EndpointNameError::Empty`] for an empty name.
210    pub fn windows_pipe(
211        namespace_id: impl Into<String>,
212        pipe_name: impl Into<String>,
213    ) -> Result<Self, EndpointNameError> {
214        let pipe_name = pipe_name.into();
215        if pipe_name.is_empty() {
216            return Err(EndpointNameError::Empty);
217        }
218        let lowered = pipe_name.to_ascii_lowercase().replace('/', "\\");
219        if lowered.starts_with("\\\\.\\pipe\\") {
220            return Err(EndpointNameError::PrefixedPipeName { got: pipe_name });
221        }
222        Ok(Self {
223            namespace_id: namespace_id.into(),
224            path: pipe_name,
225        })
226    }
227
228    /// Build a Unix-domain-socket endpoint identity from a filesystem
229    /// path.
230    ///
231    /// Available on every platform so cross-platform consumers can
232    /// construct endpoint identities for manifests and diagnostics.
233    ///
234    /// # Errors
235    ///
236    /// [`EndpointNameError::Empty`] for an empty path.
237    pub fn unix_socket(
238        namespace_id: impl Into<String>,
239        socket_path: impl Into<String>,
240    ) -> Result<Self, EndpointNameError> {
241        let socket_path = socket_path.into();
242        if socket_path.is_empty() {
243            return Err(EndpointNameError::Empty);
244        }
245        Ok(Self {
246            namespace_id: namespace_id.into(),
247            path: socket_path,
248        })
249    }
250}
251
252/// Errors from the [`Endpoint`] smart constructors.
253#[derive(Debug, thiserror::Error, PartialEq, Eq)]
254pub enum EndpointNameError {
255    /// The endpoint name or path was empty.
256    #[error("endpoint name must not be empty")]
257    Empty,
258    /// A Windows pipe name carried the `\\.\pipe\` prefix; endpoint
259    /// paths must be the bare pipe name.
260    #[error(
261        "windows pipe name must be bare (no \\\\.\\pipe\\ prefix), got {got:?}: \
262         running-process prepends the prefix when resolving the endpoint"
263    )]
264    PrefixedPipeName {
265        /// The rejected, already-prefixed name.
266        got: String,
267    },
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn request_and_response_round_trip_through_buffer_codecs() {
276        let request = Frame::request(0x7A63, b"ping".to_vec()).with_request_id(42);
277        assert_eq!(request.envelope_version, PROTOCOL_VERSION);
278        assert_eq!(FrameKind::try_from(request.kind), Ok(FrameKind::Request));
279        assert_eq!(
280            PayloadEncoding::try_from(request.payload_encoding),
281            Ok(PayloadEncoding::None)
282        );
283
284        let response = Frame::response_to(&request, b"pong".to_vec());
285        assert_eq!(FrameKind::try_from(response.kind), Ok(FrameKind::Response));
286        assert_eq!(response.request_id, 42);
287        assert_eq!(response.payload_protocol, 0x7A63);
288
289        let wire = encode_framed(&request).expect("encode");
290        assert_eq!(wire[0], ENVELOPE_VERSION);
291        let decoded = try_decode_framed(&wire)
292            .expect("decode")
293            .expect("complete frame");
294        assert_eq!(decoded.frame, request);
295        assert_eq!(decoded.consumed, wire.len());
296    }
297
298    #[test]
299    fn response_echoes_trace_context() {
300        let mut request = Frame::request(0x7A63, Vec::new()).with_request_id(1);
301        request.traceparent = "00-abc-def-01".to_owned();
302        request.tracestate = "vendor=1".to_owned();
303        let response = Frame::response_to(&request, Vec::new());
304        assert_eq!(response.traceparent, request.traceparent);
305        assert_eq!(response.tracestate, request.tracestate);
306    }
307
308    #[test]
309    fn try_decode_framed_waits_for_complete_frames() {
310        let wire = encode_framed(&Frame::request(0x7001, b"abc".to_vec())).expect("encode");
311        assert!(try_decode_framed(&[]).expect("empty").is_none());
312        for cut in 1..wire.len() {
313            assert!(
314                try_decode_framed(&wire[..cut]).expect("partial").is_none(),
315                "partial frame of {cut} bytes must not decode"
316            );
317        }
318        // Trailing bytes after a complete frame are left for the next decode.
319        let mut two = wire.clone();
320        two.extend_from_slice(&wire);
321        let first = try_decode_framed(&two).expect("decode").expect("complete");
322        assert_eq!(first.consumed, wire.len());
323    }
324
325    #[test]
326    fn try_decode_framed_rejects_foreign_version_and_oversize() {
327        assert!(matches!(
328            try_decode_framed(&[2, 0, 0, 0, 0]),
329            Err(FramingError::UnsupportedFramingVersion { got: 2, .. })
330        ));
331        let mut oversize = vec![ENVELOPE_VERSION];
332        oversize.extend_from_slice(&(MAX_FRAME_BYTES as u32 + 1).to_le_bytes());
333        assert!(matches!(
334            try_decode_framed(&oversize),
335            Err(FramingError::FrameTooLarge { .. })
336        ));
337    }
338
339    #[test]
340    fn endpoint_constructors_enforce_naming_rules() {
341        let pipe = Endpoint::windows_pipe("svc", "svc-pipe").expect("bare name");
342        assert_eq!(pipe.namespace_id, "svc");
343        assert_eq!(pipe.path, "svc-pipe");
344
345        assert_eq!(
346            Endpoint::windows_pipe("svc", r"\\.\pipe\svc-pipe"),
347            Err(EndpointNameError::PrefixedPipeName {
348                got: r"\\.\pipe\svc-pipe".to_owned()
349            })
350        );
351        assert_eq!(
352            Endpoint::windows_pipe("svc", "//./pipe/svc-pipe"),
353            Err(EndpointNameError::PrefixedPipeName {
354                got: "//./pipe/svc-pipe".to_owned()
355            })
356        );
357        assert_eq!(
358            Endpoint::windows_pipe("svc", ""),
359            Err(EndpointNameError::Empty)
360        );
361
362        let sock = Endpoint::unix_socket("svc", "/tmp/svc.sock").expect("path");
363        assert_eq!(sock.path, "/tmp/svc.sock");
364        assert_eq!(
365            Endpoint::unix_socket("svc", ""),
366            Err(EndpointNameError::Empty)
367        );
368    }
369}