pub fn encode(payload: &[u8], out: &mut Vec<u8>) {
out.push(0); out.extend_from_slice(&(payload.len() as u32).to_be_bytes());
out.extend_from_slice(payload);
}
pub fn encode_compressed(compressed_payload: &[u8], out: &mut Vec<u8>) {
out.push(1); out.extend_from_slice(&(compressed_payload.len() as u32).to_be_bytes());
out.extend_from_slice(compressed_payload);
}
#[derive(Debug, PartialEq, Eq)]
pub enum DecodeResult {
Complete {
payload: Vec<u8>,
compressed: bool,
consumed: usize,
},
Incomplete(usize),
}
pub fn decode(buf: &[u8]) -> DecodeResult {
if buf.len() < 5 {
return DecodeResult::Incomplete(5 - buf.len());
}
let compressed = buf[0] != 0;
let length = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
let total = 5 + length;
if buf.len() < total {
return DecodeResult::Incomplete(total - buf.len());
}
DecodeResult::Complete {
payload: buf[5..total].to_vec(),
compressed,
consumed: total,
}
}
#[derive(Debug, Default)]
pub struct MessageBuffer {
buf: Vec<u8>,
}
impl MessageBuffer {
pub fn new() -> Self {
Self { buf: Vec::new() }
}
pub fn push(&mut self, data: &[u8]) {
self.buf.extend_from_slice(data);
}
pub fn try_decode(&mut self) -> Option<(Vec<u8>, bool)> {
match decode(&self.buf) {
DecodeResult::Complete {
payload,
compressed,
consumed,
} => {
self.buf.drain(..consumed);
Some((payload, compressed))
}
DecodeResult::Incomplete(_) => None,
}
}
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_decode_round_trip() {
let payload = b"hello grpc";
let mut buf = Vec::new();
encode(payload, &mut buf);
assert_eq!(buf.len(), 5 + payload.len());
assert_eq!(buf[0], 0); assert_eq!(
u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]),
payload.len() as u32
);
match decode(&buf) {
DecodeResult::Complete {
payload: decoded,
compressed,
consumed,
} => {
assert_eq!(decoded, payload);
assert!(!compressed);
assert_eq!(consumed, buf.len());
}
other => panic!("expected Complete, got {other:?}"),
}
}
#[test]
fn decode_incomplete_header() {
assert_eq!(decode(&[]), DecodeResult::Incomplete(5));
assert_eq!(decode(&[0, 0]), DecodeResult::Incomplete(3));
assert_eq!(decode(&[0, 0, 0, 0]), DecodeResult::Incomplete(1));
}
#[test]
fn decode_incomplete_payload() {
let mut buf = Vec::new();
encode(b"hello", &mut buf);
buf.truncate(7);
assert_eq!(decode(&buf), DecodeResult::Incomplete(3));
}
#[test]
fn encode_empty_message() {
let mut buf = Vec::new();
encode(b"", &mut buf);
assert_eq!(buf, &[0, 0, 0, 0, 0]);
match decode(&buf) {
DecodeResult::Complete {
payload, consumed, ..
} => {
assert!(payload.is_empty());
assert_eq!(consumed, 5);
}
other => panic!("expected Complete, got {other:?}"),
}
}
#[test]
fn message_buffer_reassembly() {
let payload = b"reassembled message";
let mut encoded = Vec::new();
encode(payload, &mut encoded);
let mut mb = MessageBuffer::new();
assert!(mb.is_empty());
mb.push(&encoded[..3]);
assert!(mb.try_decode().is_none());
mb.push(&encoded[3..8]);
assert!(mb.try_decode().is_none());
mb.push(&encoded[8..]);
let (decoded, compressed) = mb.try_decode().unwrap();
assert_eq!(decoded, payload);
assert!(!compressed);
assert!(mb.is_empty());
}
#[test]
fn message_buffer_multiple_messages() {
let mut encoded = Vec::new();
encode(b"first", &mut encoded);
encode(b"second", &mut encoded);
let mut mb = MessageBuffer::new();
mb.push(&encoded);
assert_eq!(mb.try_decode().unwrap().0, b"first");
assert_eq!(mb.try_decode().unwrap().0, b"second");
assert!(mb.try_decode().is_none());
assert!(mb.is_empty());
}
}