1extern crate alloc;
31use alloc::rc::Rc;
32use alloc::vec::Vec;
33
34use crate::header::RtpsHeader;
35use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
36use crate::wire_types::Locator;
37
38pub const DEFAULT_MTU: usize = 1472;
40
41#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct OutboundDatagram {
48 pub bytes: Vec<u8>,
50 pub targets: Rc<Vec<Locator>>,
52}
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AddError {
57 WouldExceedMtu {
59 needed: usize,
61 remaining: usize,
63 },
64 BodyTooLarge,
66}
67
68#[derive(Debug)]
74pub struct MessageBuilder {
75 bytes: Vec<u8>,
76 targets: Rc<Vec<Locator>>,
77 mtu: usize,
78 submsg_count: usize,
79}
80
81impl MessageBuilder {
82 #[must_use]
87 pub fn open(header: RtpsHeader, targets: Rc<Vec<Locator>>, mtu: usize) -> Self {
88 assert!(
89 mtu >= 20,
90 "MTU must accommodate at least the 20-byte RTPS header"
91 );
92 let mut bytes = Vec::with_capacity(mtu);
93 bytes.extend_from_slice(&header.to_bytes());
94 Self {
95 bytes,
96 targets,
97 mtu,
98 submsg_count: 0,
99 }
100 }
101
102 #[must_use]
104 pub fn submsg_count(&self) -> usize {
105 self.submsg_count
106 }
107
108 #[must_use]
110 pub fn is_empty(&self) -> bool {
111 self.submsg_count == 0
112 }
113
114 #[must_use]
117 pub fn len(&self) -> usize {
118 self.bytes.len()
119 }
120
121 #[must_use]
123 pub fn remaining(&self) -> usize {
124 self.mtu.saturating_sub(self.bytes.len())
125 }
126
127 pub fn try_add_submessage(
139 &mut self,
140 id: SubmessageId,
141 flags: u8,
142 body: &[u8],
143 ) -> Result<(), AddError> {
144 let body_len = u16::try_from(body.len()).map_err(|_| AddError::BodyTooLarge)?;
145 let needed = SubmessageHeader::WIRE_SIZE + body.len();
146 if self.bytes.len() + needed > self.mtu {
147 return Err(AddError::WouldExceedMtu {
148 needed,
149 remaining: self.remaining(),
150 });
151 }
152 let sh = SubmessageHeader {
153 submessage_id: id,
154 flags: flags | FLAG_E_LITTLE_ENDIAN,
155 octets_to_next_header: body_len,
156 };
157 self.bytes.extend_from_slice(&sh.to_bytes());
158 self.bytes.extend_from_slice(body);
159 self.submsg_count += 1;
160 Ok(())
161 }
162
163 #[must_use]
170 pub fn finish(self) -> Option<OutboundDatagram> {
171 if self.submsg_count == 0 {
172 return None;
173 }
174 Some(OutboundDatagram {
175 bytes: self.bytes,
176 targets: self.targets,
177 })
178 }
179}
180
181#[cfg(test)]
182#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
183mod tests {
184 use super::*;
185 use crate::datagram::{ParsedSubmessage, decode_datagram};
186 use crate::submessages::{DataSubmessage, HeartbeatSubmessage};
187 use crate::wire_types::{EntityId, GuidPrefix, Locator, SequenceNumber, VendorId};
188
189 fn sample_header() -> RtpsHeader {
190 RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]))
191 }
192
193 fn sample_data(sn: i64, payload_len: usize) -> DataSubmessage {
194 DataSubmessage {
195 extra_flags: 0,
196 reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
197 writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
198 writer_sn: SequenceNumber(sn),
199 inline_qos: None,
200 key_flag: false,
201 non_standard_flag: false,
202 serialized_payload: alloc::sync::Arc::from(alloc::vec![0xAB; payload_len]),
203 }
204 }
205
206 fn targets() -> Rc<Vec<Locator>> {
207 Rc::new(alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7400)])
208 }
209
210 #[test]
211 fn fresh_builder_contains_only_rtps_header() {
212 let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
213 assert!(b.is_empty());
214 assert_eq!(b.len(), 20, "only RTPS header");
215 assert_eq!(b.submsg_count(), 0);
216 assert_eq!(b.remaining(), DEFAULT_MTU - 20);
217 }
218
219 #[test]
220 fn single_data_submessage_fits_and_decodes() {
221 let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
222 let (body, flags) = sample_data(1, 10).write_body(true);
223 b.try_add_submessage(SubmessageId::Data, flags, &body)
224 .unwrap();
225 let dg = b.finish().unwrap();
226 assert_eq!(dg.targets.len(), 1);
227 let parsed = decode_datagram(&dg.bytes).unwrap();
228 assert_eq!(parsed.submessages.len(), 1);
229 assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
230 }
231
232 #[test]
233 fn four_small_datas_aggregate_into_one_datagram() {
234 let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
235 for sn in 1..=4i64 {
236 let (body, flags) = sample_data(sn, 10).write_body(true);
237 b.try_add_submessage(SubmessageId::Data, flags, &body)
238 .unwrap();
239 }
240 let dg = b.finish().unwrap();
241 let parsed = decode_datagram(&dg.bytes).unwrap();
242 let data_count = parsed
244 .submessages
245 .iter()
246 .filter(|s| matches!(s, ParsedSubmessage::Data(_)))
247 .count();
248 assert_eq!(data_count, 4);
249 }
250
251 #[test]
252 fn overflow_rejects_with_would_exceed_mtu() {
253 let mtu = 100; let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
255 let (body, flags) = sample_data(1, 50).write_body(true);
257 b.try_add_submessage(SubmessageId::Data, flags, &body)
258 .unwrap();
259 let (body2, flags2) = sample_data(2, 50).write_body(true);
261 let res = b.try_add_submessage(SubmessageId::Data, flags2, &body2);
262 assert!(matches!(res, Err(AddError::WouldExceedMtu { .. })));
263 assert_eq!(b.submsg_count(), 1, "first add must still be counted");
264 }
265
266 #[test]
267 fn overflow_allows_caller_to_open_new_builder() {
268 let mtu = 100;
269 let (body, flags) = sample_data(1, 50).write_body(true);
270 let mut out: Vec<OutboundDatagram> = Vec::new();
271 let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
272
273 for sn in 1..=3i64 {
274 let (body_n, flags_n) = sample_data(sn, 50).write_body(true);
275 if b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
276 .is_err()
277 {
278 out.push(b.finish().unwrap());
279 b = MessageBuilder::open(sample_header(), targets(), mtu);
280 b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
281 .unwrap();
282 }
283 }
284 if !b.is_empty() {
285 out.push(b.finish().unwrap());
286 }
287 let _ = flags;
288 let _ = body;
289 assert_eq!(out.len(), 3);
291 }
292
293 #[test]
294 fn finish_on_empty_builder_returns_none() {
295 let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
296 assert!(b.finish().is_none());
297 }
298
299 #[test]
300 fn piggyback_heartbeat_after_data_aggregates() {
301 let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
302 let (body, flags) = sample_data(1, 10).write_body(true);
303 b.try_add_submessage(SubmessageId::Data, flags, &body)
304 .unwrap();
305 let hb = HeartbeatSubmessage {
306 reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
307 writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
308 first_sn: SequenceNumber(1),
309 last_sn: SequenceNumber(1),
310 count: 1,
311 final_flag: true,
312 liveliness_flag: false,
313 group_info: None,
314 };
315 let (hb_body, hb_flags) = hb.write_body(true);
316 b.try_add_submessage(SubmessageId::Heartbeat, hb_flags, &hb_body)
317 .unwrap();
318 let dg = b.finish().unwrap();
319 let parsed = decode_datagram(&dg.bytes).unwrap();
320 assert_eq!(parsed.submessages.len(), 2);
321 assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
322 assert!(matches!(
323 &parsed.submessages[1],
324 ParsedSubmessage::Heartbeat(h) if h.final_flag
325 ));
326 }
327
328 #[test]
329 fn builder_propagates_little_endian_flag_e() {
330 let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
333 let (body, _flags_from_write) = sample_data(1, 10).write_body(true);
334 b.try_add_submessage(SubmessageId::Data, 0, &body).unwrap();
336 let dg = b.finish().unwrap();
337 let sub_header_flags = dg.bytes[21]; assert_eq!(
340 sub_header_flags & FLAG_E_LITTLE_ENDIAN,
341 FLAG_E_LITTLE_ENDIAN
342 );
343 }
344
345 #[test]
346 #[should_panic(expected = "MTU must accommodate")]
347 fn open_panics_on_mtu_below_header() {
348 let _ = MessageBuilder::open(sample_header(), targets(), 10);
349 }
350
351 #[test]
352 fn body_too_large_rejected() {
353 let mut b = MessageBuilder::open(sample_header(), targets(), 100_000);
354 let oversize = alloc::vec![0u8; u16::MAX as usize + 1];
355 let res = b.try_add_submessage(SubmessageId::Data, 0, &oversize);
356 assert!(matches!(res, Err(AddError::BodyTooLarge)));
357 }
358}