use bytes::{Buf, BufMut};
pub const SUBPROTOCOL_STREAM_WINDOW: u16 = 0x0B00;
pub const SUBPROTOCOL_STREAM_NACK: u16 = 0x0B01;
pub const SUBPROTOCOL_STREAM_RESET: u16 = 0x0B02;
pub const STREAM_RESET_SIZE: usize = 8;
pub const STREAM_WINDOW_SIZE: usize = 24;
pub const STREAM_NACK_SIZE: usize = 24;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamWindow {
pub stream_id: u64,
pub total_consumed: u64,
pub ack_seq: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum StreamWindowCodecError {
#[error("truncated stream-window message: {0} bytes (need 16)")]
Truncated(usize),
#[error("oversize stream-window message: {0} bytes (need 16)")]
Oversize(usize),
}
impl StreamWindow {
#[inline]
pub fn encode(&self) -> [u8; STREAM_WINDOW_SIZE] {
let mut buf = [0u8; STREAM_WINDOW_SIZE];
(&mut buf[..8]).put_u64_le(self.stream_id);
(&mut buf[8..16]).put_u64_le(self.total_consumed);
(&mut buf[16..]).put_u64_le(self.ack_seq);
buf
}
pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
match data.len() {
n if n < STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Truncated(n)),
n if n > STREAM_WINDOW_SIZE => Err(StreamWindowCodecError::Oversize(n)),
_ => {
let mut cur = std::io::Cursor::new(data);
let stream_id = cur.get_u64_le();
let total_consumed = cur.get_u64_le();
let ack_seq = cur.get_u64_le();
Ok(Self {
stream_id,
total_consumed,
ack_seq,
})
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamNack {
pub stream_id: u64,
pub next_expected: u64,
pub missing_bitmap: u64,
}
impl StreamNack {
#[inline]
pub fn encode(&self) -> [u8; STREAM_NACK_SIZE] {
let mut buf = [0u8; STREAM_NACK_SIZE];
(&mut buf[..8]).put_u64_le(self.stream_id);
(&mut buf[8..16]).put_u64_le(self.next_expected);
(&mut buf[16..]).put_u64_le(self.missing_bitmap);
buf
}
pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
match data.len() {
n if n < STREAM_NACK_SIZE => Err(StreamWindowCodecError::Truncated(n)),
n if n > STREAM_NACK_SIZE => Err(StreamWindowCodecError::Oversize(n)),
_ => {
let mut cur = std::io::Cursor::new(data);
let stream_id = cur.get_u64_le();
let next_expected = cur.get_u64_le();
let missing_bitmap = cur.get_u64_le();
Ok(Self {
stream_id,
next_expected,
missing_bitmap,
})
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct StreamReset {
pub stream_id: u64,
}
impl StreamReset {
#[inline]
pub fn encode(&self) -> [u8; STREAM_RESET_SIZE] {
self.stream_id.to_le_bytes()
}
pub fn decode(data: &[u8]) -> Result<Self, StreamWindowCodecError> {
match data.len() {
n if n < STREAM_RESET_SIZE => Err(StreamWindowCodecError::Truncated(n)),
n if n > STREAM_RESET_SIZE => Err(StreamWindowCodecError::Oversize(n)),
_ => {
let mut cur = std::io::Cursor::new(data);
Ok(Self {
stream_id: cur.get_u64_le(),
})
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_round_trip() {
let msg = StreamWindow {
stream_id: 0xDEAD_BEEF_CAFE_F00D,
total_consumed: 0x0102_0304_0506_0708,
ack_seq: 0x1122_3344_5566_7788,
};
let bytes = msg.encode();
assert_eq!(bytes.len(), STREAM_WINDOW_SIZE);
let parsed = StreamWindow::decode(&bytes).unwrap();
assert_eq!(parsed, msg);
}
#[test]
fn test_decode_truncated_rejected() {
let err = StreamWindow::decode(&[0u8; STREAM_WINDOW_SIZE - 1]).unwrap_err();
assert!(matches!(err, StreamWindowCodecError::Truncated(_)));
}
#[test]
fn test_decode_oversize_rejected() {
let err = StreamWindow::decode(&[0u8; STREAM_WINDOW_SIZE + 1]).unwrap_err();
assert!(matches!(err, StreamWindowCodecError::Oversize(_)));
}
#[test]
fn test_decode_empty_rejected() {
let err = StreamWindow::decode(&[]).unwrap_err();
assert!(matches!(err, StreamWindowCodecError::Truncated(0)));
}
#[test]
fn test_endianness_is_little_endian() {
let msg = StreamWindow {
stream_id: 1,
total_consumed: 1,
ack_seq: 1,
};
let bytes = msg.encode();
assert_eq!(bytes[0], 0x01);
assert_eq!(bytes[1], 0x00);
assert_eq!(bytes[8], 0x01);
assert_eq!(bytes[9], 0x00);
assert_eq!(bytes[16], 0x01);
assert_eq!(bytes[17], 0x00);
}
#[test]
fn stream_nack_round_trip() {
let msg = StreamNack {
stream_id: 0xABCD,
next_expected: 7,
missing_bitmap: 0b1010,
};
assert_eq!(StreamNack::decode(&msg.encode()).unwrap(), msg);
}
#[test]
fn stream_reset_round_trip() {
let msg = StreamReset {
stream_id: 0x2000_0000_0000_0001,
};
assert_eq!(StreamReset::decode(&msg.encode()).unwrap(), msg);
}
}