running_process/broker/protocol/frame_ext.rs
1//! Ergonomic constructors and buffer-level codecs for the v1 `Frame`
2//! envelope (#412).
3//!
4//! The broker v1 post-mortem found every consumer hand-building
5//! `Frame { envelope_version: 1, kind, payload_protocol, payload,
6//! request_id, payload_encoding: None, deadline_unix_ms: 0,
7//! traceparent: "", tracestate: "" }` plus the outer
8//! `[u8 1][u32 LE len]` header, and re-deriving the
9//! peek-without-consume decode against a growable read buffer. This
10//! module owns those four pieces:
11//!
12//! - [`Frame::request`] / [`Frame::response_to`] — envelope
13//! construction with correct v1 defaults.
14//! - [`encode_framed`] — one `Frame` to complete wire bytes
15//! (`[1][len][prost]`).
16//! - [`try_decode_framed`] — incremental decode from a byte buffer,
17//! returning how many bytes the caller must consume.
18//! - [`Endpoint::windows_pipe`] / [`Endpoint::unix_socket`] — endpoint
19//! identities that respect the platform naming rules (notably: on
20//! Windows the path is the **bare** pipe name, never
21//! `\\.\pipe\`-prefixed, because running-process resolves endpoint
22//! paths through interprocess's `GenericNamespaced` name type which
23//! prepends the prefix itself).
24//!
25//! The sync-stream helpers stay in [`super::framing`]; this module is
26//! the buffer-level twin for consumers with their own buffered I/O
27//! (async daemons accumulating into a `BytesMut`-style buffer).
28
29use prost::Message;
30
31use crate::broker::protocol::{
32 Endpoint, Frame, FrameKind, FramingError, PayloadEncoding, ENVELOPE_VERSION, MAX_FRAME_BYTES,
33 PROTOCOL_VERSION,
34};
35
36/// Length of the outer wire header: `[u8 framing_version][u32 LE body_len]`.
37pub const FRAME_HEADER_BYTES: usize = 5;
38
39impl Frame {
40 /// Build a v1 request frame with correct envelope defaults.
41 ///
42 /// Sets `envelope_version` to [`PROTOCOL_VERSION`], `kind` to
43 /// `REQUEST`, `payload_encoding` to `NONE`, no deadline, and empty
44 /// trace context. Callers correlate request/response pairs through
45 /// `request_id` (see [`Frame::with_request_id`]).
46 ///
47 /// ```
48 /// use running_process::broker::protocol::Frame;
49 ///
50 /// let frame = Frame::request(0x7A63, b"payload".to_vec()).with_request_id(7);
51 /// assert_eq!(frame.envelope_version, 1);
52 /// assert_eq!(frame.request_id, 7);
53 /// ```
54 pub fn request(payload_protocol: u32, payload: Vec<u8>) -> Self {
55 Self {
56 envelope_version: PROTOCOL_VERSION,
57 kind: FrameKind::Request as i32,
58 payload_protocol,
59 payload,
60 request_id: 0,
61 payload_encoding: PayloadEncoding::None as i32,
62 deadline_unix_ms: 0,
63 traceparent: String::new(),
64 tracestate: String::new(),
65 }
66 }
67
68 /// Build the v1 response frame for `request`.
69 ///
70 /// Echoes the request's `payload_protocol`, `request_id`, and trace
71 /// context, sets `kind` to `RESPONSE`, and carries `payload`.
72 ///
73 /// ```
74 /// use running_process::broker::protocol::Frame;
75 ///
76 /// let request = Frame::request(0x7A63, b"ping".to_vec()).with_request_id(9);
77 /// let response = Frame::response_to(&request, b"pong".to_vec());
78 /// assert_eq!(response.request_id, 9);
79 /// assert_eq!(response.payload_protocol, 0x7A63);
80 /// ```
81 pub fn response_to(request: &Self, payload: Vec<u8>) -> Self {
82 Self {
83 envelope_version: PROTOCOL_VERSION,
84 kind: FrameKind::Response as i32,
85 payload_protocol: request.payload_protocol,
86 payload,
87 request_id: request.request_id,
88 payload_encoding: PayloadEncoding::None as i32,
89 deadline_unix_ms: 0,
90 traceparent: request.traceparent.clone(),
91 tracestate: request.tracestate.clone(),
92 }
93 }
94
95 /// Set the correlation `request_id` (builder style).
96 #[must_use]
97 pub fn with_request_id(mut self, request_id: u64) -> Self {
98 self.request_id = request_id;
99 self
100 }
101}
102
103/// Encode one `Frame` into complete wire bytes:
104/// `[u8 framing_version=1][u32 LE body_len][prost Frame]`.
105///
106/// # Errors
107///
108/// [`FramingError::FrameTooLarge`] when the encoded body exceeds
109/// [`MAX_FRAME_BYTES`].
110pub fn encode_framed(frame: &Frame) -> Result<Vec<u8>, FramingError> {
111 let body_len = frame.encoded_len();
112 if body_len > MAX_FRAME_BYTES {
113 return Err(FramingError::FrameTooLarge {
114 body_length: body_len,
115 cap: MAX_FRAME_BYTES,
116 });
117 }
118 let mut wire = Vec::with_capacity(FRAME_HEADER_BYTES + body_len);
119 wire.push(ENVELOPE_VERSION);
120 wire.extend_from_slice(&(body_len as u32).to_le_bytes());
121 frame
122 .encode(&mut wire)
123 .expect("prost encoding into Vec cannot fail because Vec writes are infallible");
124 Ok(wire)
125}
126
127/// One frame decoded from a byte buffer by [`try_decode_framed`].
128#[derive(Debug, Clone, PartialEq)]
129pub struct DecodedFramed {
130 /// The decoded frame.
131 pub frame: Frame,
132 /// Total wire bytes the frame occupied (header + body). The caller
133 /// must consume exactly this many bytes from the front of its buffer.
134 pub consumed: usize,
135}
136
137/// Incrementally decode one `Frame` from the front of `buf`.
138///
139/// Returns `Ok(None)` when the buffer does not yet hold a complete
140/// frame (read more bytes and retry); the buffer is never logically
141/// consumed — on `Ok(Some(decoded))` the caller advances its buffer by
142/// `decoded.consumed` bytes.
143///
144/// # Errors
145///
146/// - [`FramingError::UnsupportedFramingVersion`] when the leading byte
147/// is not [`ENVELOPE_VERSION`]. Consumers multiplexing a legacy wire
148/// on the same endpoint should classify the buffer first (see
149/// [`crate::broker::backend_sdk::BackendEndpointMux`]).
150/// - [`FramingError::FrameTooLarge`] when the advertised body length
151/// exceeds [`MAX_FRAME_BYTES`].
152/// - [`FramingError::Decode`] when the body bytes are not a valid
153/// prost `Frame`.
154pub fn try_decode_framed(buf: &[u8]) -> Result<Option<DecodedFramed>, FramingError> {
155 if buf.is_empty() {
156 return Ok(None);
157 }
158 if buf[0] != ENVELOPE_VERSION {
159 return Err(FramingError::UnsupportedFramingVersion {
160 got: buf[0],
161 expected: ENVELOPE_VERSION,
162 });
163 }
164 if buf.len() < FRAME_HEADER_BYTES {
165 return Ok(None);
166 }
167 let body_len = u32::from_le_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
168 if body_len > MAX_FRAME_BYTES {
169 return Err(FramingError::FrameTooLarge {
170 body_length: body_len,
171 cap: MAX_FRAME_BYTES,
172 });
173 }
174 let total = FRAME_HEADER_BYTES + body_len;
175 if buf.len() < total {
176 return Ok(None);
177 }
178 let frame = Frame::decode(&buf[FRAME_HEADER_BYTES..total]).map_err(FramingError::Decode)?;
179 Ok(Some(DecodedFramed {
180 frame,
181 consumed: total,
182 }))
183}
184
185impl Endpoint {
186 /// Build a Windows named-pipe endpoint identity from a **bare**
187 /// pipe name.
188 ///
189 /// running-process resolves endpoint paths through interprocess's
190 /// `GenericNamespaced` name type, which prepends `\\.\pipe\`
191 /// itself. Passing an already-prefixed path silently addresses the
192 /// wrong pipe, so this constructor rejects it.
193 ///
194 /// Available on every platform so cross-platform consumers can
195 /// construct endpoint identities for manifests and diagnostics.
196 ///
197 /// ```
198 /// use running_process::broker::protocol::Endpoint;
199 ///
200 /// let endpoint = Endpoint::windows_pipe("my-daemon", "my-daemon-pipe").unwrap();
201 /// assert_eq!(endpoint.path, "my-daemon-pipe");
202 /// assert!(Endpoint::windows_pipe("my-daemon", r"\\.\pipe\my-daemon-pipe").is_err());
203 /// ```
204 ///
205 /// # Errors
206 ///
207 /// [`EndpointNameError::PrefixedPipeName`] when `pipe_name` starts
208 /// with `\\.\pipe\` (or the forward-slash spelling), and
209 /// [`EndpointNameError::Empty`] for an empty name.
210 pub fn windows_pipe(
211 namespace_id: impl Into<String>,
212 pipe_name: impl Into<String>,
213 ) -> Result<Self, EndpointNameError> {
214 let pipe_name = pipe_name.into();
215 if pipe_name.is_empty() {
216 return Err(EndpointNameError::Empty);
217 }
218 let lowered = pipe_name.to_ascii_lowercase().replace('/', "\\");
219 if lowered.starts_with("\\\\.\\pipe\\") {
220 return Err(EndpointNameError::PrefixedPipeName { got: pipe_name });
221 }
222 Ok(Self {
223 namespace_id: namespace_id.into(),
224 path: pipe_name,
225 })
226 }
227
228 /// Build a Unix-domain-socket endpoint identity from a filesystem
229 /// path.
230 ///
231 /// Available on every platform so cross-platform consumers can
232 /// construct endpoint identities for manifests and diagnostics.
233 ///
234 /// # Errors
235 ///
236 /// [`EndpointNameError::Empty`] for an empty path.
237 pub fn unix_socket(
238 namespace_id: impl Into<String>,
239 socket_path: impl Into<String>,
240 ) -> Result<Self, EndpointNameError> {
241 let socket_path = socket_path.into();
242 if socket_path.is_empty() {
243 return Err(EndpointNameError::Empty);
244 }
245 Ok(Self {
246 namespace_id: namespace_id.into(),
247 path: socket_path,
248 })
249 }
250}
251
252/// Errors from the [`Endpoint`] smart constructors.
253#[derive(Debug, thiserror::Error, PartialEq, Eq)]
254pub enum EndpointNameError {
255 /// The endpoint name or path was empty.
256 #[error("endpoint name must not be empty")]
257 Empty,
258 /// A Windows pipe name carried the `\\.\pipe\` prefix; endpoint
259 /// paths must be the bare pipe name.
260 #[error(
261 "windows pipe name must be bare (no \\\\.\\pipe\\ prefix), got {got:?}: \
262 running-process prepends the prefix when resolving the endpoint"
263 )]
264 PrefixedPipeName {
265 /// The rejected, already-prefixed name.
266 got: String,
267 },
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn request_and_response_round_trip_through_buffer_codecs() {
276 let request = Frame::request(0x7A63, b"ping".to_vec()).with_request_id(42);
277 assert_eq!(request.envelope_version, PROTOCOL_VERSION);
278 assert_eq!(FrameKind::try_from(request.kind), Ok(FrameKind::Request));
279 assert_eq!(
280 PayloadEncoding::try_from(request.payload_encoding),
281 Ok(PayloadEncoding::None)
282 );
283
284 let response = Frame::response_to(&request, b"pong".to_vec());
285 assert_eq!(FrameKind::try_from(response.kind), Ok(FrameKind::Response));
286 assert_eq!(response.request_id, 42);
287 assert_eq!(response.payload_protocol, 0x7A63);
288
289 let wire = encode_framed(&request).expect("encode");
290 assert_eq!(wire[0], ENVELOPE_VERSION);
291 let decoded = try_decode_framed(&wire)
292 .expect("decode")
293 .expect("complete frame");
294 assert_eq!(decoded.frame, request);
295 assert_eq!(decoded.consumed, wire.len());
296 }
297
298 #[test]
299 fn response_echoes_trace_context() {
300 let mut request = Frame::request(0x7A63, Vec::new()).with_request_id(1);
301 request.traceparent = "00-abc-def-01".to_owned();
302 request.tracestate = "vendor=1".to_owned();
303 let response = Frame::response_to(&request, Vec::new());
304 assert_eq!(response.traceparent, request.traceparent);
305 assert_eq!(response.tracestate, request.tracestate);
306 }
307
308 #[test]
309 fn try_decode_framed_waits_for_complete_frames() {
310 let wire = encode_framed(&Frame::request(0x7001, b"abc".to_vec())).expect("encode");
311 assert!(try_decode_framed(&[]).expect("empty").is_none());
312 for cut in 1..wire.len() {
313 assert!(
314 try_decode_framed(&wire[..cut]).expect("partial").is_none(),
315 "partial frame of {cut} bytes must not decode"
316 );
317 }
318 // Trailing bytes after a complete frame are left for the next decode.
319 let mut two = wire.clone();
320 two.extend_from_slice(&wire);
321 let first = try_decode_framed(&two).expect("decode").expect("complete");
322 assert_eq!(first.consumed, wire.len());
323 }
324
325 #[test]
326 fn try_decode_framed_rejects_foreign_version_and_oversize() {
327 assert!(matches!(
328 try_decode_framed(&[2, 0, 0, 0, 0]),
329 Err(FramingError::UnsupportedFramingVersion { got: 2, .. })
330 ));
331 let mut oversize = vec![ENVELOPE_VERSION];
332 oversize.extend_from_slice(&(MAX_FRAME_BYTES as u32 + 1).to_le_bytes());
333 assert!(matches!(
334 try_decode_framed(&oversize),
335 Err(FramingError::FrameTooLarge { .. })
336 ));
337 }
338
339 #[test]
340 fn endpoint_constructors_enforce_naming_rules() {
341 let pipe = Endpoint::windows_pipe("svc", "svc-pipe").expect("bare name");
342 assert_eq!(pipe.namespace_id, "svc");
343 assert_eq!(pipe.path, "svc-pipe");
344
345 assert_eq!(
346 Endpoint::windows_pipe("svc", r"\\.\pipe\svc-pipe"),
347 Err(EndpointNameError::PrefixedPipeName {
348 got: r"\\.\pipe\svc-pipe".to_owned()
349 })
350 );
351 assert_eq!(
352 Endpoint::windows_pipe("svc", "//./pipe/svc-pipe"),
353 Err(EndpointNameError::PrefixedPipeName {
354 got: "//./pipe/svc-pipe".to_owned()
355 })
356 );
357 assert_eq!(
358 Endpoint::windows_pipe("svc", ""),
359 Err(EndpointNameError::Empty)
360 );
361
362 let sock = Endpoint::unix_socket("svc", "/tmp/svc.sock").expect("path");
363 assert_eq!(sock.path, "/tmp/svc.sock");
364 assert_eq!(
365 Endpoint::unix_socket("svc", ""),
366 Err(EndpointNameError::Empty)
367 );
368 }
369}