use std::io::{self, Read, Write};
use crate::broker::{FRAMING_VERSION_V1, MAX_FRAME_SIZE_BYTES, MAX_HELLO_SIZE_BYTES};
pub const ENVELOPE_VERSION: u8 = FRAMING_VERSION_V1;
pub const MAX_FRAME_BYTES: usize = MAX_FRAME_SIZE_BYTES;
pub const MAX_HELLO_BYTES: usize = MAX_HELLO_SIZE_BYTES;
#[derive(Debug, thiserror::Error)]
pub enum FramingError {
#[error("unsupported framing version: got {got}, expected {expected}")]
UnsupportedFramingVersion {
got: u8,
expected: u8,
},
#[error("frame body too large: {body_length} bytes exceeds cap {cap}")]
FrameTooLarge {
body_length: usize,
cap: usize,
},
#[error("unexpected EOF while reading frame ({context})")]
UnexpectedEof {
context: &'static str,
},
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("failed to decode Frame body: {0}")]
Decode(#[from] prost::DecodeError),
}
pub fn read_frame<R: Read>(reader: &mut R) -> Result<Vec<u8>, FramingError> {
read_frame_with_cap(reader, MAX_FRAME_BYTES)
}
pub fn read_frame_with_cap<R: Read>(
reader: &mut R,
max_bytes: usize,
) -> Result<Vec<u8>, FramingError> {
let mut version_buf = [0u8; 1];
read_exact_or_eof(reader, &mut version_buf, "framing byte")?;
let version = version_buf[0];
if version != ENVELOPE_VERSION {
return Err(FramingError::UnsupportedFramingVersion {
got: version,
expected: ENVELOPE_VERSION,
});
}
let mut len_buf = [0u8; 4];
read_exact_or_eof(reader, &mut len_buf, "body length header")?;
let body_length = u32::from_le_bytes(len_buf) as usize;
if body_length > max_bytes {
return Err(FramingError::FrameTooLarge {
body_length,
cap: max_bytes,
});
}
let mut body = vec![0u8; body_length];
if body_length > 0 {
read_exact_or_eof(reader, &mut body, "frame body")?;
}
Ok(body)
}
pub fn write_frame<W: Write>(writer: &mut W, body: &[u8]) -> Result<usize, FramingError> {
if body.len() > MAX_FRAME_BYTES {
return Err(FramingError::FrameTooLarge {
body_length: body.len(),
cap: MAX_FRAME_BYTES,
});
}
let body_len_u32 = body.len() as u32;
let header: [u8; 5] = [
ENVELOPE_VERSION,
(body_len_u32 & 0xFF) as u8,
((body_len_u32 >> 8) & 0xFF) as u8,
((body_len_u32 >> 16) & 0xFF) as u8,
((body_len_u32 >> 24) & 0xFF) as u8,
];
writer.write_all(&header)?;
if !body.is_empty() {
writer.write_all(body)?;
}
writer.flush()?;
Ok(header.len() + body.len())
}
fn read_exact_or_eof<R: Read>(
reader: &mut R,
buf: &mut [u8],
context: &'static str,
) -> Result<(), FramingError> {
match reader.read_exact(buf) {
Ok(()) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => {
Err(FramingError::UnexpectedEof { context })
}
Err(err) => Err(FramingError::Io(err)),
}
}