Skip to main content

running_process/broker/protocol/
framing.rs

1//! v1 broker wire framing — `[u8 framing_version][u32 LE body_length][prost body]`.
2//!
3//! ## Frozen-forever
4//!
5//! This module implements THE truly-frozen-forever invariant of v1
6//! per #228 "Frozen-forever commitments": the framing byte is `1`,
7//! the body length is a little-endian `u32`, and the body is a prost
8//! payload. Any future protocol version is signalled by changing the
9//! framing byte, which lets a v1 broker recognize a v2 client and
10//! return `Refused{ERROR_VERSION_UNSUPPORTED}` instead of decoding
11//! garbage.
12//!
13//! ## Sizes
14//!
15//! - [`MAX_FRAME_BYTES`] caps an arbitrary frame at 16 MiB. Larger
16//!   frames cause the broker to disconnect.
17//! - [`MAX_HELLO_BYTES`] caps the initial Hello envelope at 64 KiB.
18//!   Larger Hello frames cause the broker to return `Refused` and
19//!   close. Hello-specific reads should pass [`MAX_HELLO_BYTES`] to
20//!   [`read_frame_with_cap`].
21//!
22//! ## Sync, not async
23//!
24//! Phase 1 ships a synchronous `std::io::{Read, Write}` implementation
25//! because the `client` cargo feature does not pull in tokio. The
26//! broker server (Phase 4) runs under `feature = "daemon"` (which
27//! does include tokio) and can wrap a `TcpStream`/`NamedPipeServer`
28//! either through `tokio::io::sync_bridge` or by re-implementing the
29//! wire layout on `AsyncRead`/`AsyncWrite`. The wire format is the
30//! same; only the surface API differs.
31
32use std::io::{self, Read, Write};
33
34use crate::broker::{FRAMING_VERSION_V1, MAX_FRAME_SIZE_BYTES, MAX_HELLO_SIZE_BYTES};
35
36/// Framing byte for v1. Alias of [`crate::broker::FRAMING_VERSION_V1`]
37/// to match the name used in the #228/#230 specs verbatim.
38pub const ENVELOPE_VERSION: u8 = FRAMING_VERSION_V1;
39
40/// Default per-frame size cap (16 MiB). Alias of
41/// [`crate::broker::MAX_FRAME_SIZE_BYTES`].
42pub const MAX_FRAME_BYTES: usize = MAX_FRAME_SIZE_BYTES;
43
44/// Hello-envelope size cap (64 KiB). Alias of
45/// [`crate::broker::MAX_HELLO_SIZE_BYTES`].
46pub const MAX_HELLO_BYTES: usize = MAX_HELLO_SIZE_BYTES;
47
48/// Errors produced by [`read_frame`]/[`write_frame`].
49///
50/// The error variants are deliberately distinct from `io::Error` so
51/// callers can map them onto the broker's wire-level error codes (e.g.
52/// `Refused{ERROR_VERSION_UNSUPPORTED}` for an
53/// [`FramingError::UnsupportedFramingVersion`]).
54#[derive(Debug, thiserror::Error)]
55pub enum FramingError {
56    /// Peer's framing byte did not match [`ENVELOPE_VERSION`].
57    ///
58    /// Per #228, the v1 broker writes `Refused{ERROR_VERSION_UNSUPPORTED}`
59    /// to the wire and closes the connection on this error.
60    #[error("unsupported framing version: got {got}, expected {expected}")]
61    UnsupportedFramingVersion {
62        /// The framing byte the peer actually sent.
63        got: u8,
64        /// The framing byte we expected (always
65        /// [`ENVELOPE_VERSION`] in v1).
66        expected: u8,
67    },
68
69    /// Body length exceeds the configured per-frame cap.
70    ///
71    /// The broker disconnects on this error per the wire-level
72    /// commitments in #228. Callers should not attempt to drain the
73    /// peer's payload — the socket is no longer in a known state.
74    #[error("frame body too large: {body_length} bytes exceeds cap {cap}")]
75    FrameTooLarge {
76        /// The length the peer claimed in the 4-byte LE header.
77        body_length: usize,
78        /// The cap the caller passed to [`read_frame_with_cap`].
79        cap: usize,
80    },
81
82    /// The underlying stream closed before the full frame arrived.
83    #[error("unexpected EOF while reading frame ({context})")]
84    UnexpectedEof {
85        /// Which part of the frame we were reading (e.g. "framing byte").
86        context: &'static str,
87    },
88
89    /// Raw I/O error from the underlying stream.
90    #[error("I/O error: {0}")]
91    Io(#[from] io::Error),
92}
93
94/// Read one v1 frame from `reader` with the default 16 MiB cap.
95///
96/// Equivalent to `read_frame_with_cap(reader, MAX_FRAME_BYTES)`. Use
97/// [`read_frame_with_cap`] with [`MAX_HELLO_BYTES`] for the initial
98/// Hello envelope.
99///
100/// # Errors
101///
102/// - [`FramingError::UnsupportedFramingVersion`] if the leading byte
103///   is not [`ENVELOPE_VERSION`]. The connection should be closed
104///   after writing `Refused{ERROR_VERSION_UNSUPPORTED}`.
105/// - [`FramingError::FrameTooLarge`] if the body-length header
106///   exceeds the cap. The connection should be closed.
107/// - [`FramingError::UnexpectedEof`] if the stream returned EOF before
108///   all expected bytes arrived.
109/// - [`FramingError::Io`] for any other I/O error.
110pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, FramingError> {
111    read_frame_with_cap(reader, MAX_FRAME_BYTES)
112}
113
114/// Read one v1 frame from `reader`, rejecting bodies larger than `max_bytes`.
115///
116/// Pass [`MAX_HELLO_BYTES`] (64 KiB) for the initial Hello envelope,
117/// and [`MAX_FRAME_BYTES`] (16 MiB) — the default in
118/// [`read_frame`] — for all subsequent frames.
119pub fn read_frame_with_cap<R: Read>(
120    reader: &mut R,
121    max_bytes: usize,
122) -> Result<Vec<u8>, FramingError> {
123    // Step 1: framing version byte.
124    let mut version_buf = [0u8; 1];
125    read_exact_or_eof(reader, &mut version_buf, "framing byte")?;
126    let version = version_buf[0];
127    if version != ENVELOPE_VERSION {
128        return Err(FramingError::UnsupportedFramingVersion {
129            got: version,
130            expected: ENVELOPE_VERSION,
131        });
132    }
133
134    // Step 2: body length (4 bytes, little-endian).
135    let mut len_buf = [0u8; 4];
136    read_exact_or_eof(reader, &mut len_buf, "body length header")?;
137    let body_length = u32::from_le_bytes(len_buf) as usize;
138
139    // Step 3: enforce size cap *before* allocating.
140    if body_length > max_bytes {
141        return Err(FramingError::FrameTooLarge {
142            body_length,
143            cap: max_bytes,
144        });
145    }
146
147    // Step 4: read the body. A zero-length body is legal — Frame
148    // messages with an empty payload are explicitly allowed by the
149    // v1 schema (e.g. heartbeat-style probes).
150    let mut body = vec![0u8; body_length];
151    if body_length > 0 {
152        read_exact_or_eof(reader, &mut body, "frame body")?;
153    }
154    Ok(body)
155}
156
157/// Write one v1 frame to `writer`.
158///
159/// The frame is laid out as `[u8 framing_version=1][u32 LE
160/// body_length][body]`. Returns the number of bytes written on
161/// success (5 + body.len()).
162///
163/// # Errors
164///
165/// - [`FramingError::FrameTooLarge`] if `body.len()` exceeds
166///   [`MAX_FRAME_BYTES`]. The caller must not exceed
167///   [`MAX_HELLO_BYTES`] for Hello frames; this guard catches only
168///   the absolute ceiling.
169/// - [`FramingError::Io`] for any other I/O error.
170pub fn write_frame<W: Write>(writer: &mut W, body: &[u8]) -> Result<usize, FramingError> {
171    if body.len() > MAX_FRAME_BYTES {
172        return Err(FramingError::FrameTooLarge {
173            body_length: body.len(),
174            cap: MAX_FRAME_BYTES,
175        });
176    }
177
178    // u32 LE body length — `body.len()` fits in u32 because the cap
179    // (16 MiB) is well under u32::MAX.
180    let body_len_u32 = body.len() as u32;
181    let header: [u8; 5] = [
182        ENVELOPE_VERSION,
183        (body_len_u32 & 0xFF) as u8,
184        ((body_len_u32 >> 8) & 0xFF) as u8,
185        ((body_len_u32 >> 16) & 0xFF) as u8,
186        ((body_len_u32 >> 24) & 0xFF) as u8,
187    ];
188
189    writer.write_all(&header)?;
190    if !body.is_empty() {
191        writer.write_all(body)?;
192    }
193    writer.flush()?;
194    Ok(header.len() + body.len())
195}
196
197fn read_exact_or_eof<R: Read>(
198    reader: &mut R,
199    buf: &mut [u8],
200    context: &'static str,
201) -> Result<(), FramingError> {
202    match reader.read_exact(buf) {
203        Ok(()) => Ok(()),
204        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
205            Err(FramingError::UnexpectedEof { context })
206        }
207        Err(err) => Err(FramingError::Io(err)),
208    }
209}