Skip to main content

running_process/broker/protocol/
framing.rs

1//! v1 broker wire framing — `[u8 framing_version][u32 LE body_length][prost body]`.
2//!
3//! ## Frozen-forever
4//!
5//! This module implements THE truly-frozen-forever invariant of v1
6//! per #228 "Frozen-forever commitments": the framing byte is `1`,
7//! the body length is a little-endian `u32`, and the body is a prost
8//! payload. Any future protocol version is signalled by changing the
9//! framing byte, which lets a v1 broker recognize a v2 client and
10//! return `Refused{ERROR_VERSION_UNSUPPORTED}` instead of decoding
11//! garbage.
12//!
13//! ## Sizes
14//!
15//! - [`MAX_FRAME_BYTES`] caps an arbitrary frame at 16 MiB. Larger
16//!   frames cause the broker to disconnect.
17//! - [`MAX_HELLO_BYTES`] caps the initial Hello envelope at 64 KiB.
18//!   Larger Hello frames cause the broker to return `Refused` and
19//!   close. Hello-specific reads should pass [`MAX_HELLO_BYTES`] to
20//!   [`read_frame_with_cap`].
21//!
22//! ## Sync, not async
23//!
24//! Phase 1 ships a synchronous `std::io::{Read, Write}` implementation
25//! because the `client` cargo feature does not pull in tokio. The
26//! broker server (Phase 4) runs under `feature = "daemon"` (which
27//! does include tokio) and can wrap a `TcpStream`/`NamedPipeServer`
28//! either through `tokio::io::sync_bridge` or by re-implementing the
29//! wire layout on `AsyncRead`/`AsyncWrite`. The wire format is the
30//! same; only the surface API differs.
31
32use std::io::{self, Read, Write};
33
34use crate::broker::{FRAMING_VERSION_V1, MAX_FRAME_SIZE_BYTES, MAX_HELLO_SIZE_BYTES};
35
36/// Framing byte for v1. Alias of [`crate::broker::FRAMING_VERSION_V1`]
37/// to match the name used in the #228/#230 specs verbatim.
38pub const ENVELOPE_VERSION: u8 = FRAMING_VERSION_V1;
39
40/// Default per-frame size cap (16 MiB). Alias of
41/// [`crate::broker::MAX_FRAME_SIZE_BYTES`].
42pub const MAX_FRAME_BYTES: usize = MAX_FRAME_SIZE_BYTES;
43
44/// Hello-envelope size cap (64 KiB). Alias of
45/// [`crate::broker::MAX_HELLO_SIZE_BYTES`].
46pub const MAX_HELLO_BYTES: usize = MAX_HELLO_SIZE_BYTES;
47
48/// Errors produced by [`read_frame`]/[`write_frame`].
49///
50/// The error variants are deliberately distinct from `io::Error` so
51/// callers can map them onto the broker's wire-level error codes (e.g.
52/// `Refused{ERROR_VERSION_UNSUPPORTED}` for an
53/// [`FramingError::UnsupportedFramingVersion`]).
54#[derive(Debug, thiserror::Error)]
55pub enum FramingError {
56    /// Peer's framing byte did not match [`ENVELOPE_VERSION`].
57    ///
58    /// Per #228, the v1 broker writes `Refused{ERROR_VERSION_UNSUPPORTED}`
59    /// to the wire and closes the connection on this error.
60    #[error("unsupported framing version: got {got}, expected {expected}")]
61    UnsupportedFramingVersion {
62        /// The framing byte the peer actually sent.
63        got: u8,
64        /// The framing byte we expected (always
65        /// [`ENVELOPE_VERSION`] in v1).
66        expected: u8,
67    },
68
69    /// Body length exceeds the configured per-frame cap.
70    ///
71    /// The broker disconnects on this error per the wire-level
72    /// commitments in #228. Callers should not attempt to drain the
73    /// peer's payload — the socket is no longer in a known state.
74    #[error("frame body too large: {body_length} bytes exceeds cap {cap}")]
75    FrameTooLarge {
76        /// The length the peer claimed in the 4-byte LE header.
77        body_length: usize,
78        /// The cap the caller passed to [`read_frame_with_cap`].
79        cap: usize,
80    },
81
82    /// The underlying stream closed before the full frame arrived.
83    #[error("unexpected EOF while reading frame ({context})")]
84    UnexpectedEof {
85        /// Which part of the frame we were reading (e.g. "framing byte").
86        context: &'static str,
87    },
88
89    /// Raw I/O error from the underlying stream.
90    #[error("I/O error: {0}")]
91    Io(#[from] io::Error),
92
93    /// The frame body was not a valid prost `Frame` message.
94    ///
95    /// Produced by the buffer-level
96    /// [`try_decode_framed`](super::frame_ext::try_decode_framed)
97    /// codec (#412); the stream-level [`read_frame`] returns raw body
98    /// bytes and leaves prost decoding to the caller.
99    #[error("failed to decode Frame body: {0}")]
100    Decode(#[from] prost::DecodeError),
101}
102
103/// Read one v1 frame from `reader` with the default 16 MiB cap.
104///
105/// Equivalent to `read_frame_with_cap(reader, MAX_FRAME_BYTES)`. Use
106/// [`read_frame_with_cap`] with [`MAX_HELLO_BYTES`] for the initial
107/// Hello envelope.
108///
109/// # Errors
110///
111/// - [`FramingError::UnsupportedFramingVersion`] if the leading byte
112///   is not [`ENVELOPE_VERSION`]. The connection should be closed
113///   after writing `Refused{ERROR_VERSION_UNSUPPORTED}`.
114/// - [`FramingError::FrameTooLarge`] if the body-length header
115///   exceeds the cap. The connection should be closed.
116/// - [`FramingError::UnexpectedEof`] if the stream returned EOF before
117///   all expected bytes arrived.
118/// - [`FramingError::Io`] for any other I/O error.
119pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, FramingError> {
120    read_frame_with_cap(reader, MAX_FRAME_BYTES)
121}
122
123/// Read one v1 frame from `reader`, rejecting bodies larger than `max_bytes`.
124///
125/// Pass [`MAX_HELLO_BYTES`] (64 KiB) for the initial Hello envelope,
126/// and [`MAX_FRAME_BYTES`] (16 MiB) — the default in
127/// [`read_frame`] — for all subsequent frames.
128pub fn read_frame_with_cap<R: Read>(
129    reader: &mut R,
130    max_bytes: usize,
131) -> Result<Vec<u8>, FramingError> {
132    // Step 1: framing version byte.
133    let mut version_buf = [0u8; 1];
134    read_exact_or_eof(reader, &mut version_buf, "framing byte")?;
135    let version = version_buf[0];
136    if version != ENVELOPE_VERSION {
137        return Err(FramingError::UnsupportedFramingVersion {
138            got: version,
139            expected: ENVELOPE_VERSION,
140        });
141    }
142
143    // Step 2: body length (4 bytes, little-endian).
144    let mut len_buf = [0u8; 4];
145    read_exact_or_eof(reader, &mut len_buf, "body length header")?;
146    let body_length = u32::from_le_bytes(len_buf) as usize;
147
148    // Step 3: enforce size cap *before* allocating.
149    if body_length > max_bytes {
150        return Err(FramingError::FrameTooLarge {
151            body_length,
152            cap: max_bytes,
153        });
154    }
155
156    // Step 4: read the body. A zero-length body is legal — Frame
157    // messages with an empty payload are explicitly allowed by the
158    // v1 schema (e.g. heartbeat-style probes).
159    let mut body = vec![0u8; body_length];
160    if body_length > 0 {
161        read_exact_or_eof(reader, &mut body, "frame body")?;
162    }
163    Ok(body)
164}
165
166/// Write one v1 frame to `writer`.
167///
168/// The frame is laid out as `[u8 framing_version=1][u32 LE
169/// body_length][body]`. Returns the number of bytes written on
170/// success (5 + body.len()).
171///
172/// # Errors
173///
174/// - [`FramingError::FrameTooLarge`] if `body.len()` exceeds
175///   [`MAX_FRAME_BYTES`]. The caller must not exceed
176///   [`MAX_HELLO_BYTES`] for Hello frames; this guard catches only
177///   the absolute ceiling.
178/// - [`FramingError::Io`] for any other I/O error.
179pub fn write_frame<W: Write>(writer: &mut W, body: &[u8]) -> Result<usize, FramingError> {
180    if body.len() > MAX_FRAME_BYTES {
181        return Err(FramingError::FrameTooLarge {
182            body_length: body.len(),
183            cap: MAX_FRAME_BYTES,
184        });
185    }
186
187    // u32 LE body length — `body.len()` fits in u32 because the cap
188    // (16 MiB) is well under u32::MAX.
189    let body_len_u32 = body.len() as u32;
190    let header: [u8; 5] = [
191        ENVELOPE_VERSION,
192        (body_len_u32 & 0xFF) as u8,
193        ((body_len_u32 >> 8) & 0xFF) as u8,
194        ((body_len_u32 >> 16) & 0xFF) as u8,
195        ((body_len_u32 >> 24) & 0xFF) as u8,
196    ];
197
198    writer.write_all(&header)?;
199    if !body.is_empty() {
200        writer.write_all(body)?;
201    }
202    writer.flush()?;
203    Ok(header.len() + body.len())
204}
205
206fn read_exact_or_eof<R: Read>(
207    reader: &mut R,
208    buf: &mut [u8],
209    context: &'static str,
210) -> Result<(), FramingError> {
211    match reader.read_exact(buf) {
212        Ok(()) => Ok(()),
213        Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
214            Err(FramingError::UnexpectedEof { context })
215        }
216        Err(err) => Err(FramingError::Io(err)),
217    }
218}