use super::types::{EntityId, SequenceNumber};
use crate::error::{Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum SubmessageKind {
Data = 0x15,
Heartbeat = 0x07,
AckNack = 0x06,
}
impl SubmessageKind {
pub fn from_u8(value: u8) -> Option<Self> {
match value {
0x15 => Some(Self::Data),
0x07 => Some(Self::Heartbeat),
0x06 => Some(Self::AckNack),
_ => None,
}
}
}
const _: () = {
assert!(
SubmessageKind::Data as u8 == 0x15,
"DATA submessage ID must be 0x15"
);
assert!(
SubmessageKind::Heartbeat as u8 == 0x07,
"HEARTBEAT submessage ID must be 0x07"
);
assert!(
SubmessageKind::AckNack as u8 == 0x06,
"ACKNACK submessage ID must be 0x06"
);
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubmessageFlags(pub u8);
impl SubmessageFlags {
pub const LITTLE_ENDIAN: u8 = 0x01;
pub const fn is_little_endian(&self) -> bool {
self.0 & Self::LITTLE_ENDIAN != 0
}
pub const fn little_endian() -> Self {
Self(Self::LITTLE_ENDIAN)
}
pub const fn big_endian() -> Self {
Self(0)
}
}
impl Default for SubmessageFlags {
fn default() -> Self {
Self::little_endian()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SubmessageHeader {
pub kind: SubmessageKind,
pub flags: SubmessageFlags,
pub octets_to_next: u16,
}
impl SubmessageHeader {
pub const SIZE: usize = 4;
pub const fn new(kind: SubmessageKind, flags: SubmessageFlags, octets_to_next: u16) -> Self {
Self {
kind,
flags,
octets_to_next,
}
}
pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
if buf.len() < Self::SIZE {
return Err(Error::BufferTooSmall);
}
buf[0] = match self.kind {
SubmessageKind::Data => 0x15,
SubmessageKind::Heartbeat => 0x07,
SubmessageKind::AckNack => 0x06,
};
buf[1] = self.flags.0;
buf[2] = (self.octets_to_next & 0xff) as u8;
buf[3] = ((self.octets_to_next >> 8) & 0xff) as u8;
Ok(Self::SIZE)
}
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.len() < Self::SIZE {
return Err(Error::BufferTooSmall);
}
let kind = SubmessageKind::from_u8(buf[0]).ok_or(Error::InvalidSubmessage)?;
let flags = SubmessageFlags(buf[1]);
let octets_to_next = u16::from_le_bytes([buf[2], buf[3]]);
Ok(Self {
kind,
flags,
octets_to_next,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Data {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub writer_sn: SequenceNumber,
}
impl Data {
pub const MIN_SIZE: usize = 24;
pub const fn new(reader_id: EntityId, writer_id: EntityId, writer_sn: SequenceNumber) -> Self {
Self {
reader_id,
writer_id,
writer_sn,
}
}
pub fn encode_header(&self, buf: &mut [u8]) -> Result<usize> {
if buf.len() < Self::MIN_SIZE {
return Err(Error::BufferTooSmall);
}
let header = SubmessageHeader::new(
SubmessageKind::Data,
SubmessageFlags(0x05), 20, );
header.encode(&mut buf[0..4])?;
buf[4] = 0x00;
buf[5] = 0x00;
buf[6] = 0x10; buf[7] = 0x00;
buf[8..12].copy_from_slice(self.reader_id.as_bytes());
buf[12..16].copy_from_slice(self.writer_id.as_bytes());
let sn = self.writer_sn.value();
let sn_high = (sn >> 32) as i32;
let sn_low = sn as u32;
buf[16..20].copy_from_slice(&sn_high.to_le_bytes());
buf[20..24].copy_from_slice(&sn_low.to_le_bytes());
Ok(Self::MIN_SIZE)
}
pub fn decode(buf: &[u8]) -> Result<(Self, usize)> {
if buf.len() < Self::MIN_SIZE {
return Err(Error::BufferTooSmall);
}
let header = SubmessageHeader::decode(&buf[0..4])?;
if header.kind != SubmessageKind::Data {
return Err(Error::InvalidSubmessage);
}
let octets_to_inline_qos = u16::from_le_bytes([buf[6], buf[7]]) as usize;
let mut reader_id_bytes = [0u8; 4];
reader_id_bytes.copy_from_slice(&buf[8..12]);
let reader_id = EntityId::new(reader_id_bytes);
let mut writer_id_bytes = [0u8; 4];
writer_id_bytes.copy_from_slice(&buf[12..16]);
let writer_id = EntityId::new(writer_id_bytes);
let sn_high = i32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
let sn_low = u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]);
let sn_value = ((sn_high as i64) << 32) | (sn_low as i64);
let writer_sn = SequenceNumber::new(sn_value);
let data = Self {
reader_id,
writer_id,
writer_sn,
};
let payload_offset = 8 + octets_to_inline_qos;
Ok((data, payload_offset))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Heartbeat {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub first_sn: SequenceNumber,
pub last_sn: SequenceNumber,
pub count: u32,
}
impl Heartbeat {
pub const SIZE: usize = 32;
pub const fn new(
reader_id: EntityId,
writer_id: EntityId,
first_sn: SequenceNumber,
last_sn: SequenceNumber,
count: u32,
) -> Self {
Self {
reader_id,
writer_id,
first_sn,
last_sn,
count,
}
}
pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
if buf.len() < Self::SIZE {
return Err(Error::BufferTooSmall);
}
let header =
SubmessageHeader::new(SubmessageKind::Heartbeat, SubmessageFlags::default(), 28);
header.encode(&mut buf[0..4])?;
buf[4..8].copy_from_slice(self.reader_id.as_bytes());
buf[8..12].copy_from_slice(self.writer_id.as_bytes());
let first_sn_bytes = self.first_sn.value().to_le_bytes();
buf[12..20].copy_from_slice(&first_sn_bytes);
let last_sn_bytes = self.last_sn.value().to_le_bytes();
buf[20..28].copy_from_slice(&last_sn_bytes);
let count_bytes = self.count.to_le_bytes();
buf[28..32].copy_from_slice(&count_bytes);
Ok(Self::SIZE)
}
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.len() < Self::SIZE {
return Err(Error::BufferTooSmall);
}
let header = SubmessageHeader::decode(&buf[0..4])?;
if header.kind != SubmessageKind::Heartbeat {
return Err(Error::InvalidSubmessage);
}
let mut reader_id_bytes = [0u8; 4];
reader_id_bytes.copy_from_slice(&buf[4..8]);
let reader_id = EntityId::new(reader_id_bytes);
let mut writer_id_bytes = [0u8; 4];
writer_id_bytes.copy_from_slice(&buf[8..12]);
let writer_id = EntityId::new(writer_id_bytes);
let mut first_sn_bytes = [0u8; 8];
first_sn_bytes.copy_from_slice(&buf[12..20]);
let first_sn = SequenceNumber::new(i64::from_le_bytes(first_sn_bytes));
let mut last_sn_bytes = [0u8; 8];
last_sn_bytes.copy_from_slice(&buf[20..28]);
let last_sn = SequenceNumber::new(i64::from_le_bytes(last_sn_bytes));
let mut count_bytes = [0u8; 4];
count_bytes.copy_from_slice(&buf[28..32]);
let count = u32::from_le_bytes(count_bytes);
Ok(Self {
reader_id,
writer_id,
first_sn,
last_sn,
count,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct AckNack {
pub reader_id: EntityId,
pub writer_id: EntityId,
pub reader_sn_state_base: SequenceNumber,
pub count: u32,
}
impl AckNack {
pub const MIN_SIZE: usize = 24;
pub const fn new(
reader_id: EntityId,
writer_id: EntityId,
reader_sn_state_base: SequenceNumber,
count: u32,
) -> Self {
Self {
reader_id,
writer_id,
reader_sn_state_base,
count,
}
}
pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
if buf.len() < Self::MIN_SIZE {
return Err(Error::BufferTooSmall);
}
let header = SubmessageHeader::new(SubmessageKind::AckNack, SubmessageFlags::default(), 20);
header.encode(&mut buf[0..4])?;
buf[4..8].copy_from_slice(self.reader_id.as_bytes());
buf[8..12].copy_from_slice(self.writer_id.as_bytes());
let base_sn_bytes = self.reader_sn_state_base.value().to_le_bytes();
buf[12..20].copy_from_slice(&base_sn_bytes);
let count_bytes = self.count.to_le_bytes();
buf[20..24].copy_from_slice(&count_bytes);
Ok(Self::MIN_SIZE)
}
pub fn decode(buf: &[u8]) -> Result<Self> {
if buf.len() < Self::MIN_SIZE {
return Err(Error::BufferTooSmall);
}
let header = SubmessageHeader::decode(&buf[0..4])?;
if header.kind != SubmessageKind::AckNack {
return Err(Error::InvalidSubmessage);
}
let mut reader_id_bytes = [0u8; 4];
reader_id_bytes.copy_from_slice(&buf[4..8]);
let reader_id = EntityId::new(reader_id_bytes);
let mut writer_id_bytes = [0u8; 4];
writer_id_bytes.copy_from_slice(&buf[8..12]);
let writer_id = EntityId::new(writer_id_bytes);
let mut base_sn_bytes = [0u8; 8];
base_sn_bytes.copy_from_slice(&buf[12..20]);
let reader_sn_state_base = SequenceNumber::new(i64::from_le_bytes(base_sn_bytes));
let mut count_bytes = [0u8; 4];
count_bytes.copy_from_slice(&buf[20..24]);
let count = u32::from_le_bytes(count_bytes);
Ok(Self {
reader_id,
writer_id,
reader_sn_state_base,
count,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Submessage {
Data(Data),
Heartbeat(Heartbeat),
AckNack(AckNack),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_submessage_header_encode_decode() {
let header =
SubmessageHeader::new(SubmessageKind::Data, SubmessageFlags::little_endian(), 100);
let mut buf = [0u8; 16];
header.encode(&mut buf).unwrap();
let decoded = SubmessageHeader::decode(&buf).unwrap();
assert_eq!(decoded.kind, SubmessageKind::Data);
assert_eq!(decoded.octets_to_next, 100);
}
#[test]
fn test_data_encode_decode() {
let data = Data::new(
EntityId::new([0, 0, 0, 1]),
EntityId::new([0, 0, 0, 2]),
SequenceNumber::new(42),
);
let mut buf = [0u8; 64];
let written = data.encode_header(&mut buf).unwrap();
assert_eq!(written, Data::MIN_SIZE);
let (decoded, offset) = Data::decode(&buf).unwrap();
assert_eq!(decoded, data);
assert_eq!(offset, Data::MIN_SIZE);
}
#[test]
fn test_heartbeat_encode_decode() {
let hb = Heartbeat::new(
EntityId::new([0, 0, 0, 1]),
EntityId::new([0, 0, 0, 2]),
SequenceNumber::new(1),
SequenceNumber::new(10),
5,
);
let mut buf = [0u8; 64];
let written = hb.encode(&mut buf).unwrap();
assert_eq!(written, Heartbeat::SIZE);
let decoded = Heartbeat::decode(&buf).unwrap();
assert_eq!(decoded, hb);
}
#[test]
fn test_acknack_encode_decode() {
let acknack = AckNack::new(
EntityId::new([0, 0, 0, 1]),
EntityId::new([0, 0, 0, 2]),
SequenceNumber::new(5),
3,
);
let mut buf = [0u8; 64];
let written = acknack.encode(&mut buf).unwrap();
assert_eq!(written, AckNack::MIN_SIZE);
let decoded = AckNack::decode(&buf).unwrap();
assert_eq!(decoded, acknack);
}
}