1use std::{cmp::min, collections::BTreeSet, io};
2
3#[allow(unused_imports)]
4use log::{debug, error, trace, warn};
5use speedy::{Context, Endianness, Readable, Writable, Writer};
6use enumflags2::BitFlags;
7use bytes::Bytes;
8
9use crate::{
10 dds::ddsdata::DDSData,
11 messages::{
12 header::Header,
13 protocol_id::ProtocolId,
14 protocol_version::ProtocolVersion,
15 submessages::{
16 elements::{parameter::Parameter, parameter_list::ParameterList},
17 submessages::*,
18 },
19 validity_trait::Validity,
20 vendor_id::VendorId,
21 },
22 rtps::{Submessage, SubmessageBody},
23 structure::{
24 cache_change::CacheChange,
25 guid::{EntityId, GuidPrefix, GUID},
26 parameter_id::ParameterId,
27 sequence_number::{FragmentNumber, SequenceNumber, SequenceNumberSet},
28 time::Timestamp,
29 },
30};
31#[cfg(feature = "security")]
32use crate::{
33 create_security_error_and_log,
34 security::{security_plugins::SecurityPluginsHandle, SecurityError},
35};
36#[cfg(not(feature = "security"))]
37use crate::no_security::SecurityPluginsHandle;
38
39#[derive(Debug, Clone)]
40pub struct Message {
41 pub header: Header,
42 pub submessages: Vec<Submessage>,
43}
44
45impl Message {
46 pub fn add_submessage(&mut self, submessage: Submessage) {
47 self.submessages.push(submessage);
48 }
49
50 #[cfg(test)]
51 pub fn submessages(self) -> Vec<Submessage> {
52 self.submessages
53 }
54
55 #[cfg(test)]
56 pub fn set_header(&mut self, header: Header) {
57 self.header = header;
58 }
59
60 pub fn read_from_buffer(buffer: &Bytes) -> io::Result<Self> {
65 let rtps_header =
67 Header::read_from_buffer(buffer).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
68 if !rtps_header.valid() {
69 return Err(io::Error::new(io::ErrorKind::Other, "Invalid RTPS header"));
70 }
71 let mut message = Self::new(rtps_header);
72 let mut submessages_left: Bytes = buffer.slice(20..); while !submessages_left.is_empty() {
75 if let Some(submessage) = Submessage::read_from_buffer(&mut submessages_left)? {
76 message.submessages.push(submessage);
77 }
78 } Ok(message)
81 }
82}
83
84impl Message {
85 pub fn new(header: Header) -> Self {
86 Self {
87 header,
88 submessages: vec![],
89 }
90 }
91}
92
93impl Default for Message {
94 fn default() -> Self {
95 Self {
96 header: Header::new(GuidPrefix::UNKNOWN),
97 submessages: vec![],
98 }
99 }
100}
101
102impl<C: Context> Writable<C> for Message {
103 fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
104 writer.write_value(&self.header)?;
105 for x in &self.submessages {
106 writer.write_value(&x)?;
107 }
108 Ok(())
109 }
110}
111
112#[derive(Default, Clone)]
113pub(crate) struct MessageBuilder {
114 submessages: Vec<Submessage>,
115}
116
117impl MessageBuilder {
118 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn dst_submessage(mut self, endianness: Endianness, guid_prefix: GuidPrefix) -> Self {
123 let flags = BitFlags::<INFODESTINATION_Flags>::from_endianness(endianness);
124 let submessage_header = SubmessageHeader {
125 kind: SubmessageKind::INFO_DST,
126 flags: flags.bits(),
127 content_length: 12u16,
128 };
130 let dst_submessage = Submessage {
131 header: submessage_header,
132 body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoDestination(
133 InfoDestination { guid_prefix },
134 flags,
135 )),
136 original_bytes: None,
137 };
138
139 self.submessages.push(dst_submessage);
140 self
141 }
142
143 pub fn ts_msg(mut self, endianness: Endianness, timestamp: Option<Timestamp>) -> Self {
147 let mut flags = BitFlags::<INFOTIMESTAMP_Flags>::from_endianness(endianness);
148 if timestamp.is_none() {
149 flags |= INFOTIMESTAMP_Flags::Invalidate;
150 }
151
152 let content_length = match timestamp {
153 Some(_) => 8, None => 0, };
156
157 let submessage_header = SubmessageHeader {
158 kind: SubmessageKind::INFO_TS,
159 flags: flags.bits(),
160 content_length,
161 };
162
163 let submessage = Submessage {
164 header: submessage_header,
165 body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoTimestamp(
166 InfoTimestamp { timestamp },
167 flags,
168 )),
169 original_bytes: None,
170 };
171
172 self.submessages.push(submessage);
173 self
174 }
175
176 pub fn data_msg(
177 mut self,
178 cache_change: &CacheChange,
179 reader_entity_id: EntityId, writer_guid: GUID,
181 endianness: Endianness,
182 security_plugins: Option<&SecurityPluginsHandle>,
183 ) -> Self {
184 #[cfg(not(feature = "security"))]
185 let _ = security_plugins;
187
188 let writer_entity_id = writer_guid.entity_id;
189
190 let mut param_list = ParameterList::new(); match cache_change.data_value {
202 DDSData::Data { .. } => (), DDSData::DisposeByKey { .. } => {
205 param_list.push(Parameter::create_pid_status_info_parameter(
206 true, true, false,
207 ));
208 }
209 DDSData::DisposeByKeyHash { key_hash, .. } => {
210 param_list.push(Parameter {
212 parameter_id: ParameterId::PID_KEY_HASH,
213 value: key_hash.to_vec(),
214 });
215 let status_info = Parameter::create_pid_status_info_parameter(
217 true, true, false,
218 );
219 param_list.push(status_info);
220 }
221 }
222
223 if let Some(si) = cache_change.write_options.related_sample_identity() {
225 let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
226 param_list.push(Parameter {
230 parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
231 value: related_sample_identity_serialized.clone(),
232 });
233 param_list.push(Parameter {
234 parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY_CUSTOM,
235 value: related_sample_identity_serialized,
236 });
237 }
238
239 let serialized_payload = match cache_change.data_value {
240 DDSData::Data {
241 ref serialized_payload,
242 } => Some(serialized_payload.clone()), DDSData::DisposeByKey { ref key, .. } => Some(key.clone()),
244 DDSData::DisposeByKeyHash { .. } => None,
245 };
246
247 #[cfg(not(feature = "security"))]
248 let encoded_payload = serialized_payload;
249
250 #[cfg(feature = "security")]
251 let encoded_payload = match serialized_payload
252 .map(|serialized_payload| {
254 serialized_payload
255 .write_to_vec()
257 .map_err(|e| create_security_error_and_log!("{e:?}"))
258 .and_then(|serialized_payload| {
259 match security_plugins.map(SecurityPluginsHandle::get_plugins) {
260 Some(security_plugins) => {
261 security_plugins
262 .encode_serialized_payload(serialized_payload, &writer_guid)
263 .map(|(encoded_payload, extra_inline_qos)| {
265 param_list.concat(extra_inline_qos);
266 encoded_payload
267 })
268 }
269 None => Ok(serialized_payload),
270 }
271 })
272 })
273 .transpose()
274 {
275 Ok(encoded_payload) => encoded_payload,
276 Err(e) => {
277 error!("{e:?}");
278 return self;
279 }
280 }; let have_inline_qos = !param_list.is_empty(); let inline_qos = if have_inline_qos {
284 Some(param_list)
285 } else {
286 None
287 };
288
289 let data_message = Data {
290 reader_id: reader_entity_id,
291 writer_id: writer_entity_id,
292 writer_sn: cache_change.sequence_number,
293 inline_qos,
294 serialized_payload: encoded_payload.map(Bytes::from),
295 };
296
297 let flags: BitFlags<DATA_Flags> = BitFlags::<DATA_Flags>::from_endianness(endianness)
298 | (match cache_change.data_value {
299 DDSData::Data { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Data),
300 DDSData::DisposeByKey { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Key),
301 DDSData::DisposeByKeyHash { .. } => {
302 BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
303 }
304 })
305 | (if have_inline_qos {
306 BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
307 } else {
308 BitFlags::<DATA_Flags>::empty()
309 });
310
311 self.submessages.push(Submessage {
312 header: SubmessageHeader {
313 kind: SubmessageKind::DATA,
314 flags: flags.bits(),
315 content_length: data_message.len_serialized() as u16, },
317 body: SubmessageBody::Writer(WriterSubmessage::Data(data_message, flags)),
318 original_bytes: None,
319 });
320 self
321 }
322
323 #[allow(clippy::too_many_arguments)]
326 pub fn data_frag_msg(
327 mut self,
328 cache_change: &CacheChange,
329 reader_entity_id: EntityId,
330 writer_guid: GUID,
331 fragment_number: FragmentNumber, fragment_size: u16,
333 sample_size: u32, endianness: Endianness,
335 security_plugins: Option<&SecurityPluginsHandle>,
336 ) -> Self {
337 #[cfg(not(feature = "security"))]
338 let _ = security_plugins;
340
341 let writer_entity_id = writer_guid.entity_id;
342
343 let mut param_list = ParameterList::new(); match cache_change.data_value {
347 DDSData::Data { .. } | DDSData::DisposeByKey { .. } => (), DDSData::DisposeByKeyHash { .. } => {
349 error!(
350 "data_frag_msg: Called with DDSData::DisposeByKeyHash. This is not legit! Discarding."
351 );
352 return self;
357 }
358 }
359
360 if let Some(si) = cache_change.write_options.related_sample_identity() {
362 let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
363 param_list.push(Parameter {
367 parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
368 value: related_sample_identity_serialized.clone(),
369 });
370 param_list.push(Parameter {
371 parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY_CUSTOM,
372 value: related_sample_identity_serialized,
373 });
374 }
375
376 let have_inline_qos = !param_list.is_empty(); let from_byte: usize = (usize::from(fragment_number) - 1) * usize::from(fragment_size);
380 let up_to_before_byte: usize = min(
381 usize::from(fragment_number) * usize::from(fragment_size),
382 sample_size.try_into().unwrap(),
383 );
384
385 let serialized_payload = Vec::from(
386 cache_change
387 .data_value
388 .bytes_slice(from_byte, up_to_before_byte),
389 );
390
391 #[cfg(not(feature = "security"))]
392 let encoded_payload = serialized_payload;
393
394 #[cfg(feature = "security")]
395 let encoded_payload = {
396 let encode_result = match security_plugins.map(SecurityPluginsHandle::get_plugins) {
397 Some(security_plugins) => {
398 security_plugins
399 .encode_serialized_payload(serialized_payload, &writer_guid)
400 .map(|(encoded_payload, extra_inline_qos)| {
402 param_list.concat(extra_inline_qos);
403 encoded_payload
404 })
405 }
406 None =>
407 {
409 Ok(serialized_payload)
410 }
411 };
412
413 match encode_result {
414 Ok(encoded_payload) => encoded_payload,
415 Err(e) => {
416 error!("{e:?}");
417 return self;
418 }
419 }
420 }; let data_message = DataFrag {
423 reader_id: reader_entity_id,
424 writer_id: writer_entity_id,
425 writer_sn: cache_change.sequence_number,
426 fragment_starting_num: fragment_number,
427 fragments_in_submessage: 1,
428 data_size: sample_size, fragment_size,
430 inline_qos: if have_inline_qos {
431 Some(param_list)
432 } else {
433 None
434 },
435 serialized_payload: Bytes::from(encoded_payload),
436 };
437
438 let flags: BitFlags<DATAFRAG_Flags> =
439 BitFlags::<DATAFRAG_Flags>::from_endianness(endianness)
441 | (match cache_change.data_value {
443 DDSData::Data { .. } => BitFlags::<DATAFRAG_Flags>::empty(),
444 DDSData::DisposeByKey { .. } => BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::Key),
445 DDSData::DisposeByKeyHash { .. } => unreachable!(),
446 })
447 | (if have_inline_qos {
449 BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::InlineQos)
450 } else {
451 BitFlags::<DATAFRAG_Flags>::empty()
452 });
453
454 self.submessages.push(Submessage {
455 header: SubmessageHeader {
456 kind: SubmessageKind::DATA_FRAG,
457 flags: flags.bits(),
458 content_length: data_message.len_serialized() as u16, },
460 body: SubmessageBody::Writer(WriterSubmessage::DataFrag(data_message, flags)),
461 original_bytes: None,
462 });
463 self
464 }
465
466 pub fn gap_msg(
467 mut self,
468 irrelevant_sns: &BTreeSet<SequenceNumber>,
469 writer_entity_id: EntityId,
470 writer_endianness: Endianness,
471 reader_guid: GUID,
472 ) -> Self {
473 match (irrelevant_sns.first(), irrelevant_sns.last()) {
474 (Some(&gap_start), Some(&_last_sn)) => {
475 let mut range_endpoint_excl = gap_start.plus_1();
480 while irrelevant_sns.contains(&range_endpoint_excl) {
481 range_endpoint_excl = range_endpoint_excl.plus_1();
482 }
483 let list_base = range_endpoint_excl;
485 let list_set = irrelevant_sns.clone().split_off(&list_base);
486 let gap_list = SequenceNumberSet::from_base_and_set(list_base, &list_set);
487
488 let gap = Gap {
489 reader_id: reader_guid.entity_id,
490 writer_id: writer_entity_id,
491 gap_start,
492 gap_list,
493 };
494 let gap_flags = BitFlags::<GAP_Flags>::from_endianness(writer_endianness);
495 gap
496 .create_submessage(gap_flags)
497 .map(|s| self.submessages.push(s));
498 }
499 (_, _) => error!("gap_msg called with empty SN set. Skipping GAP submessage"),
500 }
501 self
502 }
503
504 pub fn gap_msg_before(
506 mut self,
507 irrelevant_sns_before: SequenceNumber,
508 writer_entity_id: EntityId,
509 writer_endianness: Endianness,
510 reader_guid: GUID,
511 ) -> Self {
512 let gap_list = SequenceNumberSet::new_empty(irrelevant_sns_before);
513 let gap = Gap {
514 reader_id: reader_guid.entity_id,
515 writer_id: writer_entity_id,
516 gap_start: SequenceNumber::from(1),
517 gap_list,
518 };
519
520 let gap_flags = BitFlags::<GAP_Flags>::from_endianness(writer_endianness);
521 gap
522 .create_submessage(gap_flags)
523 .map(|s| self.submessages.push(s));
524 self
525 }
526
527 #[allow(clippy::too_many_arguments)] pub fn heartbeat_msg(
529 mut self,
530 writer_entity_id: EntityId,
531 first: SequenceNumber,
532 last: SequenceNumber,
533 heartbeat_count: i32,
534 endianness: Endianness,
535 reader_entity_id: EntityId,
536 set_final_flag: bool,
537 set_liveliness_flag: bool,
538 ) -> Self {
539 let heartbeat = Heartbeat {
540 reader_id: reader_entity_id,
541 writer_id: writer_entity_id,
542 first_sn: first,
543 last_sn: last,
544 count: heartbeat_count,
545 };
546
547 let mut flags = BitFlags::<HEARTBEAT_Flags>::from_endianness(endianness);
548
549 if set_final_flag {
550 flags.insert(HEARTBEAT_Flags::Final);
551 }
552 if set_liveliness_flag {
553 flags.insert(HEARTBEAT_Flags::Liveliness);
554 }
555
556 let submessage = heartbeat.create_submessage(flags);
557 match submessage {
558 Some(sm) => self.submessages.push(sm),
559 None => return self,
560 }
561 self
562 }
563
564 pub fn add_header_and_build(self, guid_prefix: GuidPrefix) -> Message {
565 Message {
566 header: Header {
567 protocol_id: ProtocolId::default(),
568 protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
569 vendor_id: VendorId::THIS_IMPLEMENTATION,
570 guid_prefix,
571 },
572 submessages: self.submessages,
573 }
574 }
575}
576
577#[cfg(test)]
578#[allow(non_snake_case)]
579mod tests {
580 use log::info;
581
582 use super::*;
583
584 #[test]
585
586 fn rtps_message_test_shapes_demo_message_deserialization() {
587 let bits1 = Bytes::from_static(&[
591 0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
592 0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
593 0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x1a, 0x15, 0xf3, 0x5e, 0x00,
594 0xcc, 0xfb, 0x13, 0x15, 0x05, 0x2c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x07,
595 0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
596 0x00, 0x04, 0x00, 0x00, 0x00, 0x52, 0x45, 0x44, 0x00, 0x69, 0x00, 0x00, 0x00, 0x17, 0x00,
597 0x00, 0x00, 0x1e, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x00, 0x07, 0x00,
598 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
599 0x5b, 0x00, 0x00, 0x00, 0x1f, 0x00, 0x00, 0x00,
600 ]);
601 let rtps = Message::read_from_buffer(&bits1).unwrap();
602 info!("{rtps:?}");
603
604 let serialized = Bytes::from(
605 rtps
606 .write_to_vec_with_ctx(Endianness::LittleEndian)
607 .unwrap(),
608 );
609 assert_eq!(bits1, serialized);
610 }
611 #[test]
612 fn rtps_message_test_shapes_demo_DataP() {
613 let bits2 = Bytes::from_static(&[
616 0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
617 0xa2, 0x28, 0x20, 0x02, 0x08, 0x15, 0x05, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
618 0x00, 0x00, 0x00, 0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x00, 0x00, 0x00,
619 0x03, 0x00, 0x00, 0x77, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x00, 0x04, 0x00,
620 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x04, 0x00, 0x02, 0x04, 0x00, 0x00, 0x50, 0x00, 0x10,
621 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x00, 0x00,
622 0x01, 0xc1, 0x16, 0x00, 0x04, 0x00, 0x01, 0x03, 0x00, 0x00, 0x44, 0x00, 0x04, 0x00, 0x3f,
623 0x0c, 0x00, 0x00, 0x58, 0x00, 0x04, 0x00, 0x3f, 0x0c, 0x00, 0x00, 0x32, 0x00, 0x18, 0x00,
624 0x01, 0x00, 0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
625 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0xc9, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00,
626 0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
627 0x00, 0x00, 0x00, 0xc0, 0xa8, 0x45, 0x14, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00,
628 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
629 0x00, 0xac, 0x11, 0x00, 0x01, 0x33, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xea, 0x1c,
630 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xef,
631 0xff, 0x00, 0x01, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00,
632 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00,
633 0x01, 0x48, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00, 0x00, 0x00,
634 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00, 0x01, 0x34,
635 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0xb0, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00,
636 0x02, 0x00, 0x08, 0x00, 0x2c, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
637 0x00,
638 ]);
639
640 let rtps_data = Message::read_from_buffer(&bits2).unwrap();
641
642 let serialized_data = Bytes::from(
643 rtps_data
644 .write_to_vec_with_ctx(Endianness::LittleEndian)
645 .unwrap(),
646 );
647 assert_eq!(bits2, serialized_data);
648 }
649
650 #[test]
651 fn rtps_message_test_shapes_demo_info_TS_dataP() {
652 let bits1 = Bytes::from_static(&[
655 0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
656 0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
657 0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
658 0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
659 0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
660 0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
661 0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
662 0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
663 0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
664 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
665 0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
666 0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
667 0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
668 0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
669 ]);
670
671 let rtps = Message::read_from_buffer(&bits1).unwrap();
672 info!("{rtps:?}");
673
674 let serialized = Bytes::from(
675 rtps
676 .write_to_vec_with_ctx(Endianness::LittleEndian)
677 .unwrap(),
678 );
679 assert_eq!(bits1, serialized);
680 }
681
682 #[test]
683 fn rtps_message_test_shapes_demo_info_TS_AckNack() {
684 let bits1 = Bytes::from_static(&[
687 0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
688 0xa2, 0x28, 0x20, 0x02, 0x08, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34,
689 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00,
690 0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
691 0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x04, 0xc7, 0x00, 0x00, 0x04,
692 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
693 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x02, 0x00, 0xc7, 0x00, 0x02, 0x00, 0xc2, 0x00,
694 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
695 ]);
696
697 let rtps = Message::read_from_buffer(&bits1).unwrap();
698 info!("{rtps:?}");
699
700 let serialized = Bytes::from(
701 rtps
702 .write_to_vec_with_ctx(Endianness::LittleEndian)
703 .unwrap(),
704 );
705 assert_eq!(bits1, serialized);
706 }
707
708 #[test]
709 fn rtps_message_info_ts_and_dataP() {
710 let bits1 = Bytes::from_static(&[
713 0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
714 0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
715 0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
716 0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
717 0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
718 0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
719 0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
720 0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
721 0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
722 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
723 0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
724 0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
725 0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
726 0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
727 ]);
728
729 let rtps = Message::read_from_buffer(&bits1).unwrap();
730 info!("{rtps:?}");
731
732 let serialized = Bytes::from(
733 rtps
734 .write_to_vec_with_ctx(Endianness::LittleEndian)
735 .unwrap(),
736 );
737 assert_eq!(bits1, serialized);
738 }
739
740 #[test]
741 fn rtps_message_infoDST_infoTS_Data_w_heartbeat() {
742 let bits1 = Bytes::from_static(&[
747 0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
748 0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
749 0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x12, 0x15, 0xf3, 0x5e, 0x00,
750 0xc8, 0xa9, 0xfa, 0x15, 0x05, 0x0c, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x03, 0xc7,
751 0x00, 0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00,
752 0x00, 0x2f, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00, 0x00, 0x00, 0x00,
753 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0x68, 0x50,
754 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
755 0x00, 0x00, 0x01, 0xc1, 0x05, 0x00, 0x0c, 0x00, 0x07, 0x00, 0x00, 0x00, 0x53, 0x71, 0x75,
756 0x61, 0x72, 0x65, 0x00, 0x00, 0x07, 0x00, 0x10, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x53, 0x68,
757 0x61, 0x70, 0x65, 0x54, 0x79, 0x70, 0x65, 0x00, 0x00, 0x00, 0x70, 0x00, 0x10, 0x00, 0x01,
758 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02,
759 0x5a, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00,
760 0x00, 0x00, 0x00, 0x01, 0x02, 0x60, 0x00, 0x04, 0x00, 0x5f, 0x01, 0x00, 0x00, 0x15, 0x00,
761 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00, 0x00, 0x1d,
762 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f,
763 0xff, 0xff, 0xff, 0xff, 0x27, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
764 0x00, 0x1b, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff,
765 0xff, 0xff, 0x1a, 0x00, 0x0c, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x9a,
766 0x99, 0x99, 0x19, 0x2b, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff, 0xff, 0xff,
767 0x1f, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00,
768 0x00, 0x01, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00, 0x00,
769 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
770 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
771 ]);
772
773 let rtps = Message::read_from_buffer(&bits1).unwrap();
774 info!("{rtps:?}");
775
776 let data_submessage = match &rtps.submessages[2] {
777 Submessage {
778 header: _,
779 body: SubmessageBody::Writer(WriterSubmessage::Data(d, _flags)),
780 ..
781 } => d,
782 wtf => panic!("Unexpected message structure {wtf:?}"),
783 };
784
785 let serialized_payload = data_submessage.unwrap_serialized_payload_value().clone();
786 info!("{serialized_payload:x?}");
787
788 let serialized = Bytes::from(
789 rtps
790 .write_to_vec_with_ctx(Endianness::LittleEndian)
791 .unwrap(),
792 );
793 assert_eq!(bits1, serialized);
794 }
795
796 #[test]
797 fn fuzz_rtps() {
798 use hex_literal::hex;
800
801 let bits = Bytes::copy_from_slice(&hex!(
802 "
803 52 54 50 53
804 02 02 ff ff 01 0f 45 d2 b3 f5 58 b9 01 00 00 00
805 15 0b 18 00 00 00 00 00 00 00 02 c2 00 00 00 00
806 7d 00 00 00 00 01 00 00
807 "
808 ));
809 info!("bytes = {bits:?}");
810 let rtps = Message::read_from_buffer(&bits);
811 info!("read_from_buffer() --> {rtps:?}");
812 }
814}