running_process/broker/protocol/framing.rs
1//! v1 broker wire framing — `[u8 framing_version][u32 LE body_length][prost body]`.
2//!
3//! ## Frozen-forever
4//!
5//! This module implements THE truly-frozen-forever invariant of v1
6//! per #228 "Frozen-forever commitments": the framing byte is `1`,
7//! the body length is a little-endian `u32`, and the body is a prost
8//! payload. Any future protocol version is signalled by changing the
9//! framing byte, which lets a v1 broker recognize a v2 client and
10//! return `Refused{ERROR_VERSION_UNSUPPORTED}` instead of decoding
11//! garbage.
12//!
13//! ## Sizes
14//!
15//! - [`MAX_FRAME_BYTES`] caps an arbitrary frame at 16 MiB. Larger
16//! frames cause the broker to disconnect.
17//! - [`MAX_HELLO_BYTES`] caps the initial Hello envelope at 64 KiB.
18//! Larger Hello frames cause the broker to return `Refused` and
19//! close. Hello-specific reads should pass [`MAX_HELLO_BYTES`] to
20//! [`read_frame_with_cap`].
21//!
22//! ## Sync, not async
23//!
24//! Phase 1 ships a synchronous `std::io::{Read, Write}` implementation
25//! because the `client` cargo feature does not pull in tokio. The
26//! broker server (Phase 4) runs under `feature = "daemon"` (which
27//! does include tokio) and can wrap a `TcpStream`/`NamedPipeServer`
28//! either through `tokio::io::sync_bridge` or by re-implementing the
29//! wire layout on `AsyncRead`/`AsyncWrite`. The wire format is the
30//! same; only the surface API differs.
31
32use std::io::{self, Read, Write};
33
34use crate::broker::{FRAMING_VERSION_V1, MAX_FRAME_SIZE_BYTES, MAX_HELLO_SIZE_BYTES};
35
36/// Framing byte for v1. Alias of [`crate::broker::FRAMING_VERSION_V1`]
37/// to match the name used in the #228/#230 specs verbatim.
38pub const ENVELOPE_VERSION: u8 = FRAMING_VERSION_V1;
39
40/// Default per-frame size cap (16 MiB). Alias of
41/// [`crate::broker::MAX_FRAME_SIZE_BYTES`].
42pub const MAX_FRAME_BYTES: usize = MAX_FRAME_SIZE_BYTES;
43
44/// Hello-envelope size cap (64 KiB). Alias of
45/// [`crate::broker::MAX_HELLO_SIZE_BYTES`].
46pub const MAX_HELLO_BYTES: usize = MAX_HELLO_SIZE_BYTES;
47
48/// Errors produced by [`read_frame`]/[`write_frame`].
49///
50/// The error variants are deliberately distinct from `io::Error` so
51/// callers can map them onto the broker's wire-level error codes (e.g.
52/// `Refused{ERROR_VERSION_UNSUPPORTED}` for an
53/// [`FramingError::UnsupportedFramingVersion`]).
54#[derive(Debug, thiserror::Error)]
55pub enum FramingError {
56 /// Peer's framing byte did not match [`ENVELOPE_VERSION`].
57 ///
58 /// Per #228, the v1 broker writes `Refused{ERROR_VERSION_UNSUPPORTED}`
59 /// to the wire and closes the connection on this error.
60 #[error("unsupported framing version: got {got}, expected {expected}")]
61 UnsupportedFramingVersion {
62 /// The framing byte the peer actually sent.
63 got: u8,
64 /// The framing byte we expected (always
65 /// [`ENVELOPE_VERSION`] in v1).
66 expected: u8,
67 },
68
69 /// Body length exceeds the configured per-frame cap.
70 ///
71 /// The broker disconnects on this error per the wire-level
72 /// commitments in #228. Callers should not attempt to drain the
73 /// peer's payload — the socket is no longer in a known state.
74 #[error("frame body too large: {body_length} bytes exceeds cap {cap}")]
75 FrameTooLarge {
76 /// The length the peer claimed in the 4-byte LE header.
77 body_length: usize,
78 /// The cap the caller passed to [`read_frame_with_cap`].
79 cap: usize,
80 },
81
82 /// The underlying stream closed before the full frame arrived.
83 #[error("unexpected EOF while reading frame ({context})")]
84 UnexpectedEof {
85 /// Which part of the frame we were reading (e.g. "framing byte").
86 context: &'static str,
87 },
88
89 /// Raw I/O error from the underlying stream.
90 #[error("I/O error: {0}")]
91 Io(#[from] io::Error),
92}
93
94/// Read one v1 frame from `reader` with the default 16 MiB cap.
95///
96/// Equivalent to `read_frame_with_cap(reader, MAX_FRAME_BYTES)`. Use
97/// [`read_frame_with_cap`] with [`MAX_HELLO_BYTES`] for the initial
98/// Hello envelope.
99///
100/// # Errors
101///
102/// - [`FramingError::UnsupportedFramingVersion`] if the leading byte
103/// is not [`ENVELOPE_VERSION`]. The connection should be closed
104/// after writing `Refused{ERROR_VERSION_UNSUPPORTED}`.
105/// - [`FramingError::FrameTooLarge`] if the body-length header
106/// exceeds the cap. The connection should be closed.
107/// - [`FramingError::UnexpectedEof`] if the stream returned EOF before
108/// all expected bytes arrived.
109/// - [`FramingError::Io`] for any other I/O error.
110pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, FramingError> {
111 read_frame_with_cap(reader, MAX_FRAME_BYTES)
112}
113
114/// Read one v1 frame from `reader`, rejecting bodies larger than `max_bytes`.
115///
116/// Pass [`MAX_HELLO_BYTES`] (64 KiB) for the initial Hello envelope,
117/// and [`MAX_FRAME_BYTES`] (16 MiB) — the default in
118/// [`read_frame`] — for all subsequent frames.
119pub fn read_frame_with_cap<R: Read>(
120 reader: &mut R,
121 max_bytes: usize,
122) -> Result<Vec<u8>, FramingError> {
123 // Step 1: framing version byte.
124 let mut version_buf = [0u8; 1];
125 read_exact_or_eof(reader, &mut version_buf, "framing byte")?;
126 let version = version_buf[0];
127 if version != ENVELOPE_VERSION {
128 return Err(FramingError::UnsupportedFramingVersion {
129 got: version,
130 expected: ENVELOPE_VERSION,
131 });
132 }
133
134 // Step 2: body length (4 bytes, little-endian).
135 let mut len_buf = [0u8; 4];
136 read_exact_or_eof(reader, &mut len_buf, "body length header")?;
137 let body_length = u32::from_le_bytes(len_buf) as usize;
138
139 // Step 3: enforce size cap *before* allocating.
140 if body_length > max_bytes {
141 return Err(FramingError::FrameTooLarge {
142 body_length,
143 cap: max_bytes,
144 });
145 }
146
147 // Step 4: read the body. A zero-length body is legal — Frame
148 // messages with an empty payload are explicitly allowed by the
149 // v1 schema (e.g. heartbeat-style probes).
150 let mut body = vec![0u8; body_length];
151 if body_length > 0 {
152 read_exact_or_eof(reader, &mut body, "frame body")?;
153 }
154 Ok(body)
155}
156
157/// Write one v1 frame to `writer`.
158///
159/// The frame is laid out as `[u8 framing_version=1][u32 LE
160/// body_length][body]`. Returns the number of bytes written on
161/// success (5 + body.len()).
162///
163/// # Errors
164///
165/// - [`FramingError::FrameTooLarge`] if `body.len()` exceeds
166/// [`MAX_FRAME_BYTES`]. The caller must not exceed
167/// [`MAX_HELLO_BYTES`] for Hello frames; this guard catches only
168/// the absolute ceiling.
169/// - [`FramingError::Io`] for any other I/O error.
170pub fn write_frame<W: Write>(writer: &mut W, body: &[u8]) -> Result<usize, FramingError> {
171 if body.len() > MAX_FRAME_BYTES {
172 return Err(FramingError::FrameTooLarge {
173 body_length: body.len(),
174 cap: MAX_FRAME_BYTES,
175 });
176 }
177
178 // u32 LE body length — `body.len()` fits in u32 because the cap
179 // (16 MiB) is well under u32::MAX.
180 let body_len_u32 = body.len() as u32;
181 let header: [u8; 5] = [
182 ENVELOPE_VERSION,
183 (body_len_u32 & 0xFF) as u8,
184 ((body_len_u32 >> 8) & 0xFF) as u8,
185 ((body_len_u32 >> 16) & 0xFF) as u8,
186 ((body_len_u32 >> 24) & 0xFF) as u8,
187 ];
188
189 writer.write_all(&header)?;
190 if !body.is_empty() {
191 writer.write_all(body)?;
192 }
193 writer.flush()?;
194 Ok(header.len() + body.len())
195}
196
197fn read_exact_or_eof<R: Read>(
198 reader: &mut R,
199 buf: &mut [u8],
200 context: &'static str,
201) -> Result<(), FramingError> {
202 match reader.read_exact(buf) {
203 Ok(()) => Ok(()),
204 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
205 Err(FramingError::UnexpectedEof { context })
206 }
207 Err(err) => Err(FramingError::Io(err)),
208 }
209}