1use super::{
2 submessage_elements::SubmessageElement,
3 submessages::{
4 ack_nack::AckNackSubmessageWrite, data::DataSubmessageWrite,
5 data_frag::DataFragSubmessageWrite, gap::GapSubmessageWrite,
6 heartbeat::HeartbeatSubmessageWrite, heartbeat_frag::HeartbeatFragSubmessageWrite,
7 info_destination::InfoDestinationSubmessageWrite, info_reply::InfoReplySubmessageWrite,
8 info_source::InfoSourceSubmessageWrite, info_timestamp::InfoTimestampSubmessageWrite,
9 nack_frag::NackFragSubmessageWrite, pad::PadSubmessageWrite,
10 },
11 types::{ProtocolId, SubmessageFlag, SubmessageKind},
12};
13use crate::rtps::{
14 messages::{
15 submessage_elements::ArcSlice,
16 submessages::{
17 ack_nack::AckNackSubmessageRead, data::DataSubmessageRead,
18 data_frag::DataFragSubmessageRead, gap::GapSubmessageRead,
19 heartbeat::HeartbeatSubmessageRead, heartbeat_frag::HeartbeatFragSubmessageRead,
20 info_destination::InfoDestinationSubmessageRead, info_reply::InfoReplySubmessageRead,
21 info_source::InfoSourceSubmessageRead, info_timestamp::InfoTimestampSubmessageRead,
22 nack_frag::NackFragSubmessageRead, pad::PadSubmessageRead,
23 },
24 types::{
25 ACKNACK, DATA, DATA_FRAG, GAP, HEARTBEAT, HEARTBEAT_FRAG, INFO_DST, INFO_REPLY,
26 INFO_SRC, INFO_TS, NACK_FRAG, PAD,
27 },
28 },
29 types::{GuidPrefix, ProtocolVersion, VendorId},
30};
31use std::{marker::PhantomData, sync::Arc};
32
33pub type WriteEndianness = byteorder::LittleEndian;
34const BUFFER_SIZE: usize = 65000;
35
36pub trait Submessage<'a> {
37 type SubmessageList;
38
39 fn submessage_header(&self, octets_to_next_header: u16) -> SubmessageHeaderWrite;
40 fn submessage_elements(&'a self) -> Self::SubmessageList;
41}
42
43#[inline]
44fn write_submessage_bytes<'a>(
45 submessage: &'a impl Submessage<
46 'a,
47 SubmessageList = impl IntoIterator<Item = &'a SubmessageElement<'a>>,
48 >,
49 buf: &mut [u8],
50) -> usize {
51 let (header, body) = buf.split_at_mut(4);
52 let mut len = 0;
53 for submessage_element in submessage.submessage_elements().into_iter() {
54 len += submessage_element.write_bytes(&mut body[len..]);
55 }
56 let submessage_header = submessage.submessage_header(len as u16);
57 submessage_header.write_bytes(header) + len
58}
59
60pub trait FromBytes {
61 fn from_bytes<E: byteorder::ByteOrder>(v: &[u8]) -> Self;
62}
63
64pub trait SubmessageHeader {
65 fn submessage_header(&self) -> SubmessageHeaderRead;
66}
67
68pub trait RtpsMap: SubmessageHeader {
69 fn map<T: FromBytes>(&self, data: &[u8]) -> T {
70 if self.submessage_header().flags()[0] {
71 T::from_bytes::<byteorder::LittleEndian>(data)
72 } else {
73 T::from_bytes::<byteorder::BigEndian>(data)
74 }
75 }
76}
77
78impl<T: SubmessageHeader> RtpsMap for T {}
79
80#[derive(Debug, PartialEq, Eq, Clone)]
81pub struct RtpsMessageRead {
82 data: Arc<[u8]>,
83}
84
85impl RtpsMessageRead {
86 pub fn new(data: Arc<[u8]>) -> Self {
87 Self { data }
88 }
89
90 pub fn header(&self) -> RtpsMessageHeader {
91 let v = &self.data;
92 let protocol = ProtocolId::PROTOCOL_RTPS; let major = v[4];
94 let minor = v[5];
95 let version = ProtocolVersion::new(major, minor);
96 let vendor_id = [v[6], v[7]];
97 let guid_prefix = [
98 v[8], v[9], v[10], v[11], v[12], v[13], v[14], v[15], v[16], v[17], v[18], v[19],
99 ];
100 RtpsMessageHeader {
101 protocol,
102 version,
103 vendor_id,
104 guid_prefix,
105 }
106 }
107
108 pub fn submessages(&self) -> Vec<RtpsSubmessageReadKind> {
109 let mut offset = 20;
110 const MAX_SUBMESSAGES: usize = 2_usize.pow(16);
111 let mut submessages = vec![];
112 for _ in 0..MAX_SUBMESSAGES {
113 let buf = &self.data[offset..];
114 if buf.len() < 4 {
115 break;
116 }
117 let submessage_id = buf[0];
118 let endianness_flag = (buf[1] & 0b_0000_0001) != 0;
119 let submessage_length = if endianness_flag {
120 u16::from_le_bytes([buf[2], buf[3]])
121 } else {
122 u16::from_be_bytes([buf[2], buf[3]])
123 } as usize
124 + 4;
125
126 let submessage_data = &buf[..submessage_length];
127 let submessage_arc_slice =
128 ArcSlice::new(self.data.clone(), offset..offset + submessage_length);
129
130 let submessage = match submessage_id {
131 ACKNACK => {
132 RtpsSubmessageReadKind::AckNack(AckNackSubmessageRead::new(submessage_data))
133 }
134 DATA => {
135 RtpsSubmessageReadKind::Data(DataSubmessageRead::from(submessage_arc_slice))
136 }
137 DATA_FRAG => RtpsSubmessageReadKind::DataFrag(DataFragSubmessageRead::new(
138 submessage_arc_slice,
139 )),
140 GAP => RtpsSubmessageReadKind::Gap(GapSubmessageRead::new(submessage_data)),
141 HEARTBEAT => {
142 RtpsSubmessageReadKind::Heartbeat(HeartbeatSubmessageRead::new(submessage_data))
143 }
144 HEARTBEAT_FRAG => RtpsSubmessageReadKind::HeartbeatFrag(
145 HeartbeatFragSubmessageRead::new(submessage_data),
146 ),
147 INFO_DST => RtpsSubmessageReadKind::InfoDestination(
148 InfoDestinationSubmessageRead::new(submessage_data),
149 ),
150 INFO_REPLY => {
151 RtpsSubmessageReadKind::InfoReply(InfoReplySubmessageRead::new(submessage_data))
152 }
153 INFO_SRC => RtpsSubmessageReadKind::InfoSource(InfoSourceSubmessageRead::new(
154 submessage_data,
155 )),
156 INFO_TS => RtpsSubmessageReadKind::InfoTimestamp(InfoTimestampSubmessageRead::new(
157 submessage_data,
158 )),
159 NACK_FRAG => {
160 RtpsSubmessageReadKind::NackFrag(NackFragSubmessageRead::new(submessage_data))
161 }
162 PAD => RtpsSubmessageReadKind::Pad(PadSubmessageRead::new(submessage_data)),
163 _ => {
164 offset += submessage_length;
165 continue;
166 }
167 };
168 offset += submessage_length;
169 submessages.push(submessage);
170 }
171 submessages
172 }
173}
174
175pub trait WriteBytes {
176 fn write_bytes(&self, buf: &mut [u8]) -> usize;
177}
178
179#[allow(dead_code)] pub fn into_bytes_vec<T: WriteBytes>(value: T) -> Vec<u8> {
181 let mut buf = [0u8; BUFFER_SIZE];
182 let len = value.write_bytes(buf.as_mut_slice());
183 Vec::from(&buf[0..len])
184}
185
186#[derive(Debug, PartialEq, Eq)]
187pub struct RtpsMessageWrite {
188 buffer: [u8; BUFFER_SIZE],
189 len: usize,
190}
191
192impl RtpsMessageWrite {
193 pub fn new(header: &RtpsMessageHeader, submessages: &[RtpsSubmessageWriteKind<'_>]) -> Self {
194 let mut buffer = [0; BUFFER_SIZE];
195 let mut len = header.write_bytes(&mut buffer[0..]);
196 for submessage in submessages {
197 len += submessage.write_bytes(&mut buffer[len..]);
198 }
199 Self { buffer, len }
200 }
201
202 pub fn buffer(&self) -> &[u8] {
203 &self.buffer[0..self.len]
204 }
205}
206
207#[derive(Debug, PartialEq, Eq)]
208pub enum RtpsSubmessageReadKind<'a> {
209 AckNack(AckNackSubmessageRead<'a>),
210 Data(DataSubmessageRead),
211 DataFrag(DataFragSubmessageRead),
212 Gap(GapSubmessageRead<'a>),
213 Heartbeat(HeartbeatSubmessageRead<'a>),
214 HeartbeatFrag(HeartbeatFragSubmessageRead<'a>),
215 InfoDestination(InfoDestinationSubmessageRead<'a>),
216 InfoReply(InfoReplySubmessageRead<'a>),
217 InfoSource(InfoSourceSubmessageRead<'a>),
218 InfoTimestamp(InfoTimestampSubmessageRead<'a>),
219 NackFrag(NackFragSubmessageRead<'a>),
220 Pad(PadSubmessageRead<'a>),
221}
222
223#[allow(dead_code)]
224#[derive(Debug, PartialEq, Eq)]
225pub enum RtpsSubmessageWriteKind<'a> {
226 AckNack(AckNackSubmessageWrite<'a>),
227 Data(DataSubmessageWrite<'a>),
228 DataFrag(DataFragSubmessageWrite<'a>),
229 Gap(GapSubmessageWrite<'a>),
230 Heartbeat(HeartbeatSubmessageWrite<'a>),
231 HeartbeatFrag(HeartbeatFragSubmessageWrite<'a>),
232 InfoDestination(InfoDestinationSubmessageWrite<'a>),
233 InfoReply(InfoReplySubmessageWrite<'a>),
234 InfoSource(InfoSourceSubmessageWrite<'a>),
235 InfoTimestamp(InfoTimestampSubmessageWrite<'a>),
236 NackFrag(NackFragSubmessageWrite<'a>),
237 Pad(PadSubmessageWrite),
238}
239
240impl WriteBytes for RtpsSubmessageWriteKind<'_> {
241 #[inline]
242 fn write_bytes(&self, buf: &mut [u8]) -> usize {
243 match self {
244 RtpsSubmessageWriteKind::AckNack(s) => write_submessage_bytes(s, buf),
245 RtpsSubmessageWriteKind::Data(s) => write_submessage_bytes(s, buf),
246 RtpsSubmessageWriteKind::DataFrag(s) => write_submessage_bytes(s, buf),
247 RtpsSubmessageWriteKind::Gap(s) => write_submessage_bytes(s, buf),
248 RtpsSubmessageWriteKind::Heartbeat(s) => write_submessage_bytes(s, buf),
249 RtpsSubmessageWriteKind::HeartbeatFrag(s) => write_submessage_bytes(s, buf),
250 RtpsSubmessageWriteKind::InfoDestination(s) => write_submessage_bytes(s, buf),
251 RtpsSubmessageWriteKind::InfoReply(s) => write_submessage_bytes(s, buf),
252 RtpsSubmessageWriteKind::InfoSource(s) => write_submessage_bytes(s, buf),
253 RtpsSubmessageWriteKind::InfoTimestamp(s) => write_submessage_bytes(s, buf),
254 RtpsSubmessageWriteKind::NackFrag(s) => write_submessage_bytes(s, buf),
255 RtpsSubmessageWriteKind::Pad(s) => write_submessage_bytes(s, buf),
256 }
257 }
258}
259
260#[derive(Clone, Debug, PartialEq, Eq, Copy)]
261pub struct RtpsMessageHeader {
262 protocol: ProtocolId,
263 version: ProtocolVersion,
264 vendor_id: VendorId,
265 guid_prefix: GuidPrefix,
266}
267
268impl RtpsMessageHeader {
269 pub fn new(version: ProtocolVersion, vendor_id: VendorId, guid_prefix: GuidPrefix) -> Self {
270 Self {
271 protocol: ProtocolId::PROTOCOL_RTPS,
272 version,
273 vendor_id,
274 guid_prefix,
275 }
276 }
277
278 pub fn _protocol(&self) -> ProtocolId {
279 self.protocol
280 }
281
282 pub fn version(&self) -> ProtocolVersion {
283 self.version
284 }
285
286 pub fn vendor_id(&self) -> VendorId {
287 self.vendor_id
288 }
289
290 pub fn guid_prefix(&self) -> GuidPrefix {
291 self.guid_prefix
292 }
293}
294
295impl WriteBytes for RtpsMessageHeader {
296 #[inline]
297 fn write_bytes(&self, buf: &mut [u8]) -> usize {
298 self.protocol.write_bytes(&mut buf[0..]);
299 self.version.write_bytes(&mut buf[4..]);
300 self.vendor_id.write_bytes(&mut buf[6..]);
301 self.guid_prefix.write_bytes(&mut buf[8..]);
302 20
303 }
304}
305
306#[derive(Debug, PartialEq, Eq)]
307pub struct SubmessageHeaderWrite {
308 submessage_id: SubmessageKind,
309 flags: [SubmessageFlag; 7],
311 submessage_length: u16,
312}
313
314impl SubmessageHeaderWrite {
315 pub fn new(
316 submessage_id: SubmessageKind,
317 flags: &[SubmessageFlag],
319 submessage_length: u16,
320 ) -> Self {
321 let mut flags_array = [false; 7];
322 flags_array[..flags.len()].copy_from_slice(flags);
323
324 Self {
325 submessage_id,
326 flags: flags_array,
327 submessage_length,
328 }
329 }
330}
331
332struct EndiannessFlag<E> {
333 endianness: PhantomData<E>,
334}
335impl EndiannessFlag<byteorder::LittleEndian> {
336 fn get() -> bool {
337 true
338 }
339}
340
341impl WriteBytes for SubmessageHeaderWrite {
342 #[inline]
343 fn write_bytes(&self, buf: &mut [u8]) -> usize {
344 self.submessage_id.write_bytes(&mut buf[0..]);
345 let flags = [
346 EndiannessFlag::<WriteEndianness>::get(),
347 self.flags[0],
348 self.flags[1],
349 self.flags[2],
350 self.flags[3],
351 self.flags[4],
352 self.flags[5],
353 self.flags[6],
354 ];
355 flags.write_bytes(&mut buf[1..]);
356 self.submessage_length.write_bytes(&mut buf[2..]);
357 4
358 }
359}
360
361#[derive(Debug, PartialEq, Eq)]
362pub struct SubmessageHeaderRead<'a> {
363 data: &'a [u8],
364}
365
366impl<'a> SubmessageHeaderRead<'a> {
367 pub fn new(data: &'a [u8]) -> Self {
368 Self { data }
369 }
370
371 pub fn flags(&self) -> [SubmessageFlag; 8] {
372 let flags_byte = self.data[1];
373 [
374 flags_byte & 0b_0000_0001 != 0,
375 flags_byte & 0b_0000_0010 != 0,
376 flags_byte & 0b_0000_0100 != 0,
377 flags_byte & 0b_0000_1000 != 0,
378 flags_byte & 0b_0001_0000 != 0,
379 flags_byte & 0b_0010_0000 != 0,
380 flags_byte & 0b_0100_0000 != 0,
381 flags_byte & 0b_1000_0000 != 0,
382 ]
383 }
384}
385
386#[cfg(test)]
387mod tests {
388 use super::*;
389 use crate::rtps::{
390 messages::{
391 submessage_elements::{Data, Parameter, ParameterList},
392 submessages::{data::DataSubmessageRead, heartbeat::HeartbeatSubmessageRead},
393 },
394 types::{EntityId, SequenceNumber, USER_DEFINED_READER_GROUP, USER_DEFINED_READER_NO_KEY},
395 };
396
397 #[test]
398 fn serialize_rtps_message_no_submessage() {
399 let header = RtpsMessageHeader {
400 protocol: ProtocolId::PROTOCOL_RTPS,
401 version: ProtocolVersion::new(2, 3),
402 vendor_id: [9, 8],
403 guid_prefix: [3; 12],
404 };
405 let message = RtpsMessageWrite::new(&header, &[]);
406 #[rustfmt::skip]
407 assert_eq!(message.buffer(), vec![
408 b'R', b'T', b'P', b'S', 2, 3, 9, 8, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, ]);
414 }
415
416 #[test]
417 fn serialize_rtps_message() {
418 let header = RtpsMessageHeader {
419 protocol: ProtocolId::PROTOCOL_RTPS,
420 version: ProtocolVersion::new(2, 3),
421 vendor_id: [9, 8],
422 guid_prefix: [3; 12],
423 };
424 let inline_qos_flag = true;
425 let data_flag = false;
426 let key_flag = false;
427 let non_standard_payload_flag = false;
428 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
429 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
430 let writer_sn = SequenceNumber::from(5);
431 let parameter_1 = Parameter::new(6, vec![10, 11, 12, 13]);
432 let parameter_2 = Parameter::new(7, vec![20, 21, 22, 23]);
433 let inline_qos = &ParameterList::new(vec![parameter_1, parameter_2]);
434 let serialized_payload = &Data::new(vec![].into());
435
436 let submessage = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
437 inline_qos_flag,
438 data_flag,
439 key_flag,
440 non_standard_payload_flag,
441 reader_id,
442 writer_id,
443 writer_sn,
444 inline_qos,
445 serialized_payload,
446 ));
447 let value = RtpsMessageWrite::new(&header, &[submessage]);
448 #[rustfmt::skip]
449 assert_eq!(value.buffer(), vec![
450 b'R', b'T', b'P', b'S', 2, 3, 9, 8, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 0, 0, ]);
467 }
468
469 #[test]
470 fn deserialize_rtps_message_no_submessage() {
471 let header = RtpsMessageHeader {
472 protocol: ProtocolId::PROTOCOL_RTPS,
473 version: ProtocolVersion::new(2, 3),
474 vendor_id: [9, 8],
475 guid_prefix: [3; 12],
476 };
477
478 #[rustfmt::skip]
479 let data = Arc::new([
480 b'R', b'T', b'P', b'S', 2, 3, 9, 8, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, ]);
486 let rtps_message = RtpsMessageRead::new(data);
487 assert_eq!(header, rtps_message.header());
488 }
489
490 #[test]
491 fn deserialize_rtps_message() {
492 let expected_header = RtpsMessageHeader {
493 protocol: ProtocolId::PROTOCOL_RTPS,
494 version: ProtocolVersion::new(2, 3),
495 vendor_id: [9, 8],
496 guid_prefix: [3; 12],
497 };
498
499 #[rustfmt::skip]
500 let data_submessage = RtpsSubmessageReadKind::Data(DataSubmessageRead::from(
501 ArcSlice::from(vec![0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 1, 0, ])));
513 #[rustfmt::skip]
514 let heartbeat_submessage = RtpsSubmessageReadKind::Heartbeat(HeartbeatSubmessageRead::new(&[
515 0x07, 0b_0000_0101, 28, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 2, 0, 0, 0, ]));
524
525 let expected_submessages = vec![data_submessage, heartbeat_submessage];
526
527 #[rustfmt::skip]
528 let data = Arc::new([
529 b'R', b'T', b'P', b'S', 2, 3, 9, 8, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 1, 0, 0x07, 0b_0000_0101, 28, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 2, 0, 0, 0, ]);
554
555 let rtps_message = RtpsMessageRead::new(data);
556 assert_eq!(expected_header, rtps_message.header());
557 assert_eq!(expected_submessages, rtps_message.submessages());
558 }
559
560 #[test]
561 fn deserialize_rtps_message_unknown_submessage() {
562 #[rustfmt::skip]
563 let submessage = RtpsSubmessageReadKind::Data(DataSubmessageRead::from(ArcSlice::from(vec![
564 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 0, 0, ])));
576 let expected_submessages = vec![submessage];
577
578 #[rustfmt::skip]
579 let data = Arc::new([
580 b'R', b'T', b'P', b'S', 2, 3, 9, 8, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 0x99, 0b_0101_0011, 4, 0, 9, 9, 9, 9, 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 0, 0, ]);
599
600 let rtps_message = RtpsMessageRead::new(data);
601 assert_eq!(expected_submessages, rtps_message.submessages());
602 }
603}