extern crate alloc;
use alloc::rc::Rc;
use alloc::vec::Vec;
use crate::header::RtpsHeader;
use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
use crate::wire_types::Locator;
pub const DEFAULT_MTU: usize = 1472;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OutboundDatagram {
pub bytes: Vec<u8>,
pub targets: Rc<Vec<Locator>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AddError {
WouldExceedMtu {
needed: usize,
remaining: usize,
},
BodyTooLarge,
}
#[derive(Debug)]
pub struct MessageBuilder {
bytes: Vec<u8>,
targets: Rc<Vec<Locator>>,
mtu: usize,
submsg_count: usize,
}
impl MessageBuilder {
#[must_use]
pub fn open(header: RtpsHeader, targets: Rc<Vec<Locator>>, mtu: usize) -> Self {
assert!(
mtu >= 20,
"MTU must accommodate at least the 20-byte RTPS header"
);
let mut bytes = Vec::with_capacity(mtu);
bytes.extend_from_slice(&header.to_bytes());
Self {
bytes,
targets,
mtu,
submsg_count: 0,
}
}
#[must_use]
pub fn submsg_count(&self) -> usize {
self.submsg_count
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.submsg_count == 0
}
#[must_use]
pub fn len(&self) -> usize {
self.bytes.len()
}
#[must_use]
pub fn remaining(&self) -> usize {
self.mtu.saturating_sub(self.bytes.len())
}
pub fn try_add_submessage(
&mut self,
id: SubmessageId,
flags: u8,
body: &[u8],
) -> Result<(), AddError> {
let body_len = u16::try_from(body.len()).map_err(|_| AddError::BodyTooLarge)?;
let needed = SubmessageHeader::WIRE_SIZE + body.len();
if self.bytes.len() + needed > self.mtu {
return Err(AddError::WouldExceedMtu {
needed,
remaining: self.remaining(),
});
}
let sh = SubmessageHeader {
submessage_id: id,
flags: flags | FLAG_E_LITTLE_ENDIAN,
octets_to_next_header: body_len,
};
self.bytes.extend_from_slice(&sh.to_bytes());
self.bytes.extend_from_slice(body);
self.submsg_count += 1;
Ok(())
}
#[must_use]
pub fn finish(self) -> Option<OutboundDatagram> {
if self.submsg_count == 0 {
return None;
}
Some(OutboundDatagram {
bytes: self.bytes,
targets: self.targets,
})
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
use crate::datagram::{ParsedSubmessage, decode_datagram};
use crate::submessages::{DataSubmessage, HeartbeatSubmessage};
use crate::wire_types::{EntityId, GuidPrefix, Locator, SequenceNumber, VendorId};
fn sample_header() -> RtpsHeader {
RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]))
}
fn sample_data(sn: i64, payload_len: usize) -> DataSubmessage {
DataSubmessage {
extra_flags: 0,
reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
writer_sn: SequenceNumber(sn),
inline_qos: None,
key_flag: false,
non_standard_flag: false,
serialized_payload: alloc::sync::Arc::from(alloc::vec![0xAB; payload_len]),
}
}
fn targets() -> Rc<Vec<Locator>> {
Rc::new(alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7400)])
}
#[test]
fn fresh_builder_contains_only_rtps_header() {
let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
assert!(b.is_empty());
assert_eq!(b.len(), 20, "only RTPS header");
assert_eq!(b.submsg_count(), 0);
assert_eq!(b.remaining(), DEFAULT_MTU - 20);
}
#[test]
fn single_data_submessage_fits_and_decodes() {
let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
let (body, flags) = sample_data(1, 10).write_body(true);
b.try_add_submessage(SubmessageId::Data, flags, &body)
.unwrap();
let dg = b.finish().unwrap();
assert_eq!(dg.targets.len(), 1);
let parsed = decode_datagram(&dg.bytes).unwrap();
assert_eq!(parsed.submessages.len(), 1);
assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
}
#[test]
fn four_small_datas_aggregate_into_one_datagram() {
let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
for sn in 1..=4i64 {
let (body, flags) = sample_data(sn, 10).write_body(true);
b.try_add_submessage(SubmessageId::Data, flags, &body)
.unwrap();
}
let dg = b.finish().unwrap();
let parsed = decode_datagram(&dg.bytes).unwrap();
let data_count = parsed
.submessages
.iter()
.filter(|s| matches!(s, ParsedSubmessage::Data(_)))
.count();
assert_eq!(data_count, 4);
}
#[test]
fn overflow_rejects_with_would_exceed_mtu() {
let mtu = 100; let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
let (body, flags) = sample_data(1, 50).write_body(true);
b.try_add_submessage(SubmessageId::Data, flags, &body)
.unwrap();
let (body2, flags2) = sample_data(2, 50).write_body(true);
let res = b.try_add_submessage(SubmessageId::Data, flags2, &body2);
assert!(matches!(res, Err(AddError::WouldExceedMtu { .. })));
assert_eq!(b.submsg_count(), 1, "first add must still be counted");
}
#[test]
fn overflow_allows_caller_to_open_new_builder() {
let mtu = 100;
let (body, flags) = sample_data(1, 50).write_body(true);
let mut out: Vec<OutboundDatagram> = Vec::new();
let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
for sn in 1..=3i64 {
let (body_n, flags_n) = sample_data(sn, 50).write_body(true);
if b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
.is_err()
{
out.push(b.finish().unwrap());
b = MessageBuilder::open(sample_header(), targets(), mtu);
b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
.unwrap();
}
}
if !b.is_empty() {
out.push(b.finish().unwrap());
}
let _ = flags;
let _ = body;
assert_eq!(out.len(), 3);
}
#[test]
fn finish_on_empty_builder_returns_none() {
let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
assert!(b.finish().is_none());
}
#[test]
fn piggyback_heartbeat_after_data_aggregates() {
let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
let (body, flags) = sample_data(1, 10).write_body(true);
b.try_add_submessage(SubmessageId::Data, flags, &body)
.unwrap();
let hb = HeartbeatSubmessage {
reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
first_sn: SequenceNumber(1),
last_sn: SequenceNumber(1),
count: 1,
final_flag: true,
liveliness_flag: false,
group_info: None,
};
let (hb_body, hb_flags) = hb.write_body(true);
b.try_add_submessage(SubmessageId::Heartbeat, hb_flags, &hb_body)
.unwrap();
let dg = b.finish().unwrap();
let parsed = decode_datagram(&dg.bytes).unwrap();
assert_eq!(parsed.submessages.len(), 2);
assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
assert!(matches!(
&parsed.submessages[1],
ParsedSubmessage::Heartbeat(h) if h.final_flag
));
}
#[test]
fn builder_propagates_little_endian_flag_e() {
let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
let (body, _flags_from_write) = sample_data(1, 10).write_body(true);
b.try_add_submessage(SubmessageId::Data, 0, &body).unwrap();
let dg = b.finish().unwrap();
let sub_header_flags = dg.bytes[21]; assert_eq!(
sub_header_flags & FLAG_E_LITTLE_ENDIAN,
FLAG_E_LITTLE_ENDIAN
);
}
#[test]
#[should_panic(expected = "MTU must accommodate")]
fn open_panics_on_mtu_below_header() {
let _ = MessageBuilder::open(sample_header(), targets(), 10);
}
#[test]
fn body_too_large_rejected() {
let mut b = MessageBuilder::open(sample_header(), targets(), 100_000);
let oversize = alloc::vec![0u8; u16::MAX as usize + 1];
let res = b.try_add_submessage(SubmessageId::Data, 0, &oversize);
assert!(matches!(res, Err(AddError::BodyTooLarge)));
}
}