rustdds/rtps/
message.rs

1use std::{cmp::min, collections::BTreeSet, io};
2
3#[allow(unused_imports)]
4use log::{debug, error, trace, warn};
5use speedy::{Context, Endianness, Readable, Writable, Writer};
6use enumflags2::BitFlags;
7use bytes::Bytes;
8
9use crate::{
10  dds::ddsdata::DDSData,
11  messages::{
12    header::Header,
13    protocol_id::ProtocolId,
14    protocol_version::ProtocolVersion,
15    submessages::{
16      elements::{parameter::Parameter, parameter_list::ParameterList},
17      submessages::*,
18    },
19    validity_trait::Validity,
20    vendor_id::VendorId,
21  },
22  rtps::{Submessage, SubmessageBody},
23  structure::{
24    cache_change::CacheChange,
25    guid::{EntityId, GuidPrefix, GUID},
26    parameter_id::ParameterId,
27    sequence_number::{FragmentNumber, SequenceNumber, SequenceNumberSet},
28    time::Timestamp,
29  },
30};
31#[cfg(feature = "security")]
32use crate::{
33  create_security_error_and_log,
34  security::{security_plugins::SecurityPluginsHandle, SecurityError},
35};
36#[cfg(not(feature = "security"))]
37use crate::no_security::SecurityPluginsHandle;
38
39#[derive(Debug, Clone)]
40pub struct Message {
41  pub header: Header,
42  pub submessages: Vec<Submessage>,
43}
44
45impl Message {
46  pub fn add_submessage(&mut self, submessage: Submessage) {
47    self.submessages.push(submessage);
48  }
49
50  #[cfg(test)]
51  pub fn submessages(self) -> Vec<Submessage> {
52    self.submessages
53  }
54
55  #[cfg(test)]
56  pub fn set_header(&mut self, header: Header) {
57    self.header = header;
58  }
59
60  // We implement this instead of Speedy trait Readable, because
61  // we need to run-time decide which endianness we input. Speedy requires the
62  // top level to fix that. And there seems to be no reasonable way to change
63  // endianness. TODO: The error type should be something better
64  pub fn read_from_buffer(buffer: &Bytes) -> io::Result<Self> {
65    // The Header deserializes the same
66    let rtps_header =
67      Header::read_from_buffer(buffer).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
68    if !rtps_header.valid() {
69      return Err(io::Error::new(io::ErrorKind::Other, "Invalid RTPS header"));
70    }
71    let mut message = Self::new(rtps_header);
72    let mut submessages_left: Bytes = buffer.slice(20..); // header is 20 bytes
73                                                          // submessage loop
74    while !submessages_left.is_empty() {
75      if let Some(submessage) = Submessage::read_from_buffer(&mut submessages_left)? {
76        message.submessages.push(submessage);
77      }
78    } // loop
79
80    Ok(message)
81  }
82}
83
84impl Message {
85  pub fn new(header: Header) -> Self {
86    Self {
87      header,
88      submessages: vec![],
89    }
90  }
91}
92
93impl Default for Message {
94  fn default() -> Self {
95    Self {
96      header: Header::new(GuidPrefix::UNKNOWN),
97      submessages: vec![],
98    }
99  }
100}
101
102impl<C: Context> Writable<C> for Message {
103  fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
104    writer.write_value(&self.header)?;
105    for x in &self.submessages {
106      writer.write_value(&x)?;
107    }
108    Ok(())
109  }
110}
111
112#[derive(Default, Clone)]
113pub(crate) struct MessageBuilder {
114  submessages: Vec<Submessage>,
115}
116
117impl MessageBuilder {
118  pub fn new() -> Self {
119    Self::default()
120  }
121
122  pub fn dst_submessage(mut self, endianness: Endianness, guid_prefix: GuidPrefix) -> Self {
123    let flags = BitFlags::<INFODESTINATION_Flags>::from_endianness(endianness);
124    let submessage_header = SubmessageHeader {
125      kind: SubmessageKind::INFO_DST,
126      flags: flags.bits(),
127      content_length: 12u16,
128      // InfoDST length is always 12 because message contains only GuidPrefix
129    };
130    let dst_submessage = Submessage {
131      header: submessage_header,
132      body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoDestination(
133        InfoDestination { guid_prefix },
134        flags,
135      )),
136      original_bytes: None,
137    };
138
139    self.submessages.push(dst_submessage);
140    self
141  }
142
143  /// Argument Some(timestamp) means that a timestamp is sent.
144  /// Argument None means "invalidate", i.e. the previously sent
145  /// [`InfoTimestamp`] submessage no longer applies.
146  pub fn ts_msg(mut self, endianness: Endianness, timestamp: Option<Timestamp>) -> Self {
147    let mut flags = BitFlags::<INFOTIMESTAMP_Flags>::from_endianness(endianness);
148    if timestamp.is_none() {
149      flags |= INFOTIMESTAMP_Flags::Invalidate;
150    }
151
152    let content_length = match timestamp {
153      Some(_) => 8, // Timestamp is serialized as 2 x 32b words
154      None => 0,    // Not serialized at all
155    };
156
157    let submessage_header = SubmessageHeader {
158      kind: SubmessageKind::INFO_TS,
159      flags: flags.bits(),
160      content_length,
161    };
162
163    let submessage = Submessage {
164      header: submessage_header,
165      body: SubmessageBody::Interpreter(InterpreterSubmessage::InfoTimestamp(
166        InfoTimestamp { timestamp },
167        flags,
168      )),
169      original_bytes: None,
170    };
171
172    self.submessages.push(submessage);
173    self
174  }
175
176  pub fn data_msg(
177    mut self,
178    cache_change: &CacheChange,
179    reader_entity_id: EntityId, // The entity id to be included in the submessage
180    writer_guid: GUID,
181    endianness: Endianness,
182    security_plugins: Option<&SecurityPluginsHandle>,
183  ) -> Self {
184    #[cfg(not(feature = "security"))]
185    // Parameter not used
186    let _ = security_plugins;
187
188    let writer_entity_id = writer_guid.entity_id;
189
190    let mut param_list = ParameterList::new(); // inline QoS goes here
191
192    // Check if we are disposing (by key or by key hash).
193    // If yes, then Indicate Dispose by PID_STATUS_INFO in Inline QoS
194    // RTPS Spec v2.5 Section "9.6.4.9 StatusInfo_t (PID_STATUS_INFO)"
195    // Dispose must be indicated in Inline QoS:
196    // RTPS Spec v2.5 Section "8.7.4 Changes in the Instance State"
197    //
198    // When disposing, we also indicate "unregistered", although we do not
199    // support whole register/unregister mechanism at all. TODO: Does this
200    // make sense?
201    match cache_change.data_value {
202      DDSData::Data { .. } => (), // data sample, not dispose
203
204      DDSData::DisposeByKey { .. } => {
205        param_list.push(Parameter::create_pid_status_info_parameter(
206          /* disposed */ true, /* unregistered */ true, /* filtered */ false,
207        ));
208      }
209      DDSData::DisposeByKeyHash { key_hash, .. } => {
210        // yes, insert key hash to inline QoS
211        param_list.push(Parameter {
212          parameter_id: ParameterId::PID_KEY_HASH,
213          value: key_hash.to_vec(),
214        });
215        // ... and tell what the key_hash means
216        let status_info = Parameter::create_pid_status_info_parameter(
217          /* disposed */ true, /* unregistered */ true, /* filtered */ false,
218        );
219        param_list.push(status_info);
220      }
221    }
222
223    // If we are sending related sample identity, then insert that.
224    if let Some(si) = cache_change.write_options.related_sample_identity() {
225      let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
226      // Insert two parameters, because we are not sure which one is the correct
227      // parameter id. Or what the receiver thinks is correct. This behaviour
228      // was observed from eProsima FastDDS on 2024-11-18.
229      param_list.push(Parameter {
230        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
231        value: related_sample_identity_serialized.clone(),
232      });
233      param_list.push(Parameter {
234        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY_CUSTOM,
235        value: related_sample_identity_serialized,
236      });
237    }
238
239    let serialized_payload = match cache_change.data_value {
240      DDSData::Data {
241        ref serialized_payload,
242      } => Some(serialized_payload.clone()), // contents is Bytes
243      DDSData::DisposeByKey { ref key, .. } => Some(key.clone()),
244      DDSData::DisposeByKeyHash { .. } => None,
245    };
246
247    #[cfg(not(feature = "security"))]
248    let encoded_payload = serialized_payload;
249
250    #[cfg(feature = "security")]
251    let encoded_payload = match serialized_payload
252      // Encode payload if it exists
253      .map(|serialized_payload| {
254        serialized_payload
255          // Serialize
256          .write_to_vec()
257          .map_err(|e| create_security_error_and_log!("{e:?}"))
258          .and_then(|serialized_payload| {
259            match security_plugins.map(SecurityPluginsHandle::get_plugins) {
260              Some(security_plugins) => {
261                security_plugins
262                  .encode_serialized_payload(serialized_payload, &writer_guid)
263                  // Add the extra qos
264                  .map(|(encoded_payload, extra_inline_qos)| {
265                    param_list.concat(extra_inline_qos);
266                    encoded_payload
267                  })
268              }
269              None => Ok(serialized_payload),
270            }
271          })
272      })
273      .transpose()
274    {
275      Ok(encoded_payload) => encoded_payload,
276      Err(e) => {
277        error!("{e:?}");
278        return self;
279      }
280    }; // end security
281
282    let have_inline_qos = !param_list.is_empty(); // we need this later also
283    let inline_qos = if have_inline_qos {
284      Some(param_list)
285    } else {
286      None
287    };
288
289    let data_message = Data {
290      reader_id: reader_entity_id,
291      writer_id: writer_entity_id,
292      writer_sn: cache_change.sequence_number,
293      inline_qos,
294      serialized_payload: encoded_payload.map(Bytes::from),
295    };
296
297    let flags: BitFlags<DATA_Flags> = BitFlags::<DATA_Flags>::from_endianness(endianness)
298      | (match cache_change.data_value {
299        DDSData::Data { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Data),
300        DDSData::DisposeByKey { .. } => BitFlags::<DATA_Flags>::from_flag(DATA_Flags::Key),
301        DDSData::DisposeByKeyHash { .. } => {
302          BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
303        }
304      })
305      | (if have_inline_qos {
306        BitFlags::<DATA_Flags>::from_flag(DATA_Flags::InlineQos)
307      } else {
308        BitFlags::<DATA_Flags>::empty()
309      });
310
311    self.submessages.push(Submessage {
312      header: SubmessageHeader {
313        kind: SubmessageKind::DATA,
314        flags: flags.bits(),
315        content_length: data_message.len_serialized() as u16, // TODO: Handle overflow?
316      },
317      body: SubmessageBody::Writer(WriterSubmessage::Data(data_message, flags)),
318      original_bytes: None,
319    });
320    self
321  }
322
323  // This whole MessageBuilder structure should be refactored into something more
324  // coherent. Now it just looks messy.
325  #[allow(clippy::too_many_arguments)]
326  pub fn data_frag_msg(
327    mut self,
328    cache_change: &CacheChange,
329    reader_entity_id: EntityId,
330    writer_guid: GUID,
331    fragment_number: FragmentNumber, // We support only submessages with one fragment
332    fragment_size: u16,
333    sample_size: u32, // all fragments together
334    endianness: Endianness,
335    security_plugins: Option<&SecurityPluginsHandle>,
336  ) -> Self {
337    #[cfg(not(feature = "security"))]
338    // Parameter not used
339    let _ = security_plugins;
340
341    let writer_entity_id = writer_guid.entity_id;
342
343    let mut param_list = ParameterList::new(); // inline QoS goes here
344
345    // Check if we are disposing by key hash
346    match cache_change.data_value {
347      DDSData::Data { .. } | DDSData::DisposeByKey { .. } => (), // no => ok
348      DDSData::DisposeByKeyHash { .. } => {
349        error!(
350          "data_frag_msg: Called with DDSData::DisposeByKeyHash. This is not legit! Discarding."
351        );
352        // DataFrag must contain either data or key payload, disposing by key hash
353        // sent in inline QoS (without key or data) is not possible like in Data
354        // submessages. See e.g. RTPS spec v2.5 Table 8.42 in Section "8.3.8.3
355        // DataFrag"
356        return self;
357      }
358    }
359
360    // If we are sending related sample identity, then insert that.
361    if let Some(si) = cache_change.write_options.related_sample_identity() {
362      let related_sample_identity_serialized = si.write_to_vec_with_ctx(endianness).unwrap();
363      // Insert two parameters, because we are not sure which one is the correct
364      // parameter id. Or what the receiver thinks is correct. This behaviour
365      // was observed from eProsima FastDDS on 2024-11-18.
366      param_list.push(Parameter {
367        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY,
368        value: related_sample_identity_serialized.clone(),
369      });
370      param_list.push(Parameter {
371        parameter_id: ParameterId::PID_RELATED_SAMPLE_IDENTITY_CUSTOM,
372        value: related_sample_identity_serialized,
373      });
374    }
375
376    let have_inline_qos = !param_list.is_empty(); // we need this later also
377
378    // fragments are numbered starting from 1, not 0.
379    let from_byte: usize = (usize::from(fragment_number) - 1) * usize::from(fragment_size);
380    let up_to_before_byte: usize = min(
381      usize::from(fragment_number) * usize::from(fragment_size),
382      sample_size.try_into().unwrap(),
383    );
384
385    let serialized_payload = Vec::from(
386      cache_change
387        .data_value
388        .bytes_slice(from_byte, up_to_before_byte),
389    );
390
391    #[cfg(not(feature = "security"))]
392    let encoded_payload = serialized_payload;
393
394    #[cfg(feature = "security")]
395    let encoded_payload = {
396      let encode_result = match security_plugins.map(SecurityPluginsHandle::get_plugins) {
397        Some(security_plugins) => {
398          security_plugins
399            .encode_serialized_payload(serialized_payload, &writer_guid)
400            // Add the extra qos
401            .map(|(encoded_payload, extra_inline_qos)| {
402              param_list.concat(extra_inline_qos);
403              encoded_payload
404            })
405        }
406        None =>
407        // If there are no security plugins, use plaintext
408        {
409          Ok(serialized_payload)
410        }
411      };
412
413      match encode_result {
414        Ok(encoded_payload) => encoded_payload,
415        Err(e) => {
416          error!("{e:?}");
417          return self;
418        }
419      }
420    }; // end security encoding
421
422    let data_message = DataFrag {
423      reader_id: reader_entity_id,
424      writer_id: writer_entity_id,
425      writer_sn: cache_change.sequence_number,
426      fragment_starting_num: fragment_number,
427      fragments_in_submessage: 1,
428      data_size: sample_size, // total, assembled data (SerializedPayload) size
429      fragment_size,
430      inline_qos: if have_inline_qos {
431        Some(param_list)
432      } else {
433        None
434      },
435      serialized_payload: Bytes::from(encoded_payload),
436    };
437
438    let flags: BitFlags<DATAFRAG_Flags> =
439      // endianness flag
440      BitFlags::<DATAFRAG_Flags>::from_endianness(endianness)
441      // key flag
442      | (match cache_change.data_value {
443        DDSData::Data { .. } => BitFlags::<DATAFRAG_Flags>::empty(),
444        DDSData::DisposeByKey { .. } => BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::Key),
445        DDSData::DisposeByKeyHash { .. } => unreachable!(),
446      })
447      // inline QoS flag
448      | (if have_inline_qos {
449        BitFlags::<DATAFRAG_Flags>::from_flag(DATAFRAG_Flags::InlineQos)
450      } else {
451        BitFlags::<DATAFRAG_Flags>::empty()
452      });
453
454    self.submessages.push(Submessage {
455      header: SubmessageHeader {
456        kind: SubmessageKind::DATA_FRAG,
457        flags: flags.bits(),
458        content_length: data_message.len_serialized() as u16, // TODO: Handle overflow
459      },
460      body: SubmessageBody::Writer(WriterSubmessage::DataFrag(data_message, flags)),
461      original_bytes: None,
462    });
463    self
464  }
465
466  pub fn gap_msg(
467    mut self,
468    irrelevant_sns: &BTreeSet<SequenceNumber>,
469    writer_entity_id: EntityId,
470    writer_endianness: Endianness,
471    reader_guid: GUID,
472  ) -> Self {
473    match (irrelevant_sns.first(), irrelevant_sns.last()) {
474      (Some(&gap_start), Some(&_last_sn)) => {
475        // Determine the contiguous range of irrelevant seqnums starting from gap_start.
476        // Do this by finding the first seqnum which is larger than gap_start but is not
477        // included in irrelevant_sns. That is, find the
478        // exclusive endpoint of the contiguous range.
479        let mut range_endpoint_excl = gap_start.plus_1();
480        while irrelevant_sns.contains(&range_endpoint_excl) {
481          range_endpoint_excl = range_endpoint_excl.plus_1();
482        }
483        // Set gapList.base as this exclusive endpoint of the range
484        let list_base = range_endpoint_excl;
485        let list_set = irrelevant_sns.clone().split_off(&list_base);
486        let gap_list = SequenceNumberSet::from_base_and_set(list_base, &list_set);
487
488        let gap = Gap {
489          reader_id: reader_guid.entity_id,
490          writer_id: writer_entity_id,
491          gap_start,
492          gap_list,
493        };
494        let gap_flags = BitFlags::<GAP_Flags>::from_endianness(writer_endianness);
495        gap
496          .create_submessage(gap_flags)
497          .map(|s| self.submessages.push(s));
498      }
499      (_, _) => error!("gap_msg called with empty SN set. Skipping GAP submessage"),
500    }
501    self
502  }
503
504  // GAP for range 1 <= SN < irrelevant_sns_before
505  pub fn gap_msg_before(
506    mut self,
507    irrelevant_sns_before: SequenceNumber,
508    writer_entity_id: EntityId,
509    writer_endianness: Endianness,
510    reader_guid: GUID,
511  ) -> Self {
512    let gap_list = SequenceNumberSet::new_empty(irrelevant_sns_before);
513    let gap = Gap {
514      reader_id: reader_guid.entity_id,
515      writer_id: writer_entity_id,
516      gap_start: SequenceNumber::from(1),
517      gap_list,
518    };
519
520    let gap_flags = BitFlags::<GAP_Flags>::from_endianness(writer_endianness);
521    gap
522      .create_submessage(gap_flags)
523      .map(|s| self.submessages.push(s));
524    self
525  }
526
527  #[allow(clippy::too_many_arguments)] // Heartbeat just is complicated.
528  pub fn heartbeat_msg(
529    mut self,
530    writer_entity_id: EntityId,
531    first: SequenceNumber,
532    last: SequenceNumber,
533    heartbeat_count: i32,
534    endianness: Endianness,
535    reader_entity_id: EntityId,
536    set_final_flag: bool,
537    set_liveliness_flag: bool,
538  ) -> Self {
539    let heartbeat = Heartbeat {
540      reader_id: reader_entity_id,
541      writer_id: writer_entity_id,
542      first_sn: first,
543      last_sn: last,
544      count: heartbeat_count,
545    };
546
547    let mut flags = BitFlags::<HEARTBEAT_Flags>::from_endianness(endianness);
548
549    if set_final_flag {
550      flags.insert(HEARTBEAT_Flags::Final);
551    }
552    if set_liveliness_flag {
553      flags.insert(HEARTBEAT_Flags::Liveliness);
554    }
555
556    let submessage = heartbeat.create_submessage(flags);
557    match submessage {
558      Some(sm) => self.submessages.push(sm),
559      None => return self,
560    }
561    self
562  }
563
564  pub fn add_header_and_build(self, guid_prefix: GuidPrefix) -> Message {
565    Message {
566      header: Header {
567        protocol_id: ProtocolId::default(),
568        protocol_version: ProtocolVersion::THIS_IMPLEMENTATION,
569        vendor_id: VendorId::THIS_IMPLEMENTATION,
570        guid_prefix,
571      },
572      submessages: self.submessages,
573    }
574  }
575}
576
577#[cfg(test)]
578#[allow(non_snake_case)]
579mod tests {
580  use log::info;
581
582  use super::*;
583
584  #[test]
585
586  fn rtps_message_test_shapes_demo_message_deserialization() {
587    // Data message should contain Shapetype values.
588    // captured with wireshark from shapes demo.
589    // packet with INFO_DST, INFO_TS, DATA, HEARTBEAT
590    let bits1 = Bytes::from_static(&[
591      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
592      0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
593      0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x1a, 0x15, 0xf3, 0x5e, 0x00,
594      0xcc, 0xfb, 0x13, 0x15, 0x05, 0x2c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x07,
595      0x00, 0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
596      0x00, 0x04, 0x00, 0x00, 0x00, 0x52, 0x45, 0x44, 0x00, 0x69, 0x00, 0x00, 0x00, 0x17, 0x00,
597      0x00, 0x00, 0x1e, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x00, 0x07, 0x00,
598      0x00, 0x01, 0x02, 0x00, 0x00, 0x00, 0x00, 0x5b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
599      0x5b, 0x00, 0x00, 0x00, 0x1f, 0x00, 0x00, 0x00,
600    ]);
601    let rtps = Message::read_from_buffer(&bits1).unwrap();
602    info!("{rtps:?}");
603
604    let serialized = Bytes::from(
605      rtps
606        .write_to_vec_with_ctx(Endianness::LittleEndian)
607        .unwrap(),
608    );
609    assert_eq!(bits1, serialized);
610  }
611  #[test]
612  fn rtps_message_test_shapes_demo_DataP() {
613    // / captured with wireshark from shapes demo.
614    // packet with DATA(p)
615    let bits2 = Bytes::from_static(&[
616      0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
617      0xa2, 0x28, 0x20, 0x02, 0x08, 0x15, 0x05, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00,
618      0x00, 0x00, 0x00, 0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x00, 0x00, 0x00,
619      0x03, 0x00, 0x00, 0x77, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0f, 0x00, 0x04, 0x00,
620      0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x04, 0x00, 0x02, 0x04, 0x00, 0x00, 0x50, 0x00, 0x10,
621      0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x00, 0x00,
622      0x01, 0xc1, 0x16, 0x00, 0x04, 0x00, 0x01, 0x03, 0x00, 0x00, 0x44, 0x00, 0x04, 0x00, 0x3f,
623      0x0c, 0x00, 0x00, 0x58, 0x00, 0x04, 0x00, 0x3f, 0x0c, 0x00, 0x00, 0x32, 0x00, 0x18, 0x00,
624      0x01, 0x00, 0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
625      0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0xc9, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00,
626      0x00, 0x00, 0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
627      0x00, 0x00, 0x00, 0xc0, 0xa8, 0x45, 0x14, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00,
628      0x9f, 0xa4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
629      0x00, 0xac, 0x11, 0x00, 0x01, 0x33, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xea, 0x1c,
630      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xef,
631      0xff, 0x00, 0x01, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00,
632      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00,
633      0x01, 0x48, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0x39, 0x30, 0x00, 0x00, 0x00, 0x00,
634      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x7f, 0x00, 0x00, 0x01, 0x34,
635      0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0xb0, 0x04, 0x00, 0x01, 0x00, 0x00, 0x00,
636      0x02, 0x00, 0x08, 0x00, 0x2c, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
637      0x00,
638    ]);
639
640    let rtps_data = Message::read_from_buffer(&bits2).unwrap();
641
642    let serialized_data = Bytes::from(
643      rtps_data
644        .write_to_vec_with_ctx(Endianness::LittleEndian)
645        .unwrap(),
646    );
647    assert_eq!(bits2, serialized_data);
648  }
649
650  #[test]
651  fn rtps_message_test_shapes_demo_info_TS_dataP() {
652    // captured with wireshark from shapes demo.
653    // rtps packet with info TS and Data(p)
654    let bits1 = Bytes::from_static(&[
655      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
656      0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
657      0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
658      0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
659      0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
660      0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
661      0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
662      0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
663      0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
664      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
665      0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
666      0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
667      0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
668      0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
669    ]);
670
671    let rtps = Message::read_from_buffer(&bits1).unwrap();
672    info!("{rtps:?}");
673
674    let serialized = Bytes::from(
675      rtps
676        .write_to_vec_with_ctx(Endianness::LittleEndian)
677        .unwrap(),
678    );
679    assert_eq!(bits1, serialized);
680  }
681
682  #[test]
683  fn rtps_message_test_shapes_demo_info_TS_AckNack() {
684    // captured with wireshark from shapes demo.
685    // rtps packet with info TS three AckNacks
686    let bits1 = Bytes::from_static(&[
687      0x52, 0x54, 0x50, 0x53, 0x02, 0x04, 0x01, 0x03, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d, 0x31,
688      0xa2, 0x28, 0x20, 0x02, 0x08, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34,
689      0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00,
690      0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
691      0x01, 0x00, 0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x00, 0x04, 0xc7, 0x00, 0x00, 0x04,
692      0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
693      0x00, 0x00, 0x06, 0x03, 0x18, 0x00, 0x00, 0x02, 0x00, 0xc7, 0x00, 0x02, 0x00, 0xc2, 0x00,
694      0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
695    ]);
696
697    let rtps = Message::read_from_buffer(&bits1).unwrap();
698    info!("{rtps:?}");
699
700    let serialized = Bytes::from(
701      rtps
702        .write_to_vec_with_ctx(Endianness::LittleEndian)
703        .unwrap(),
704    );
705    assert_eq!(bits1, serialized);
706  }
707
708  #[test]
709  fn rtps_message_info_ts_and_dataP() {
710    // captured with wireshark from shapes demo.
711    // rtps packet with info TS and data(p)
712    let bits1 = Bytes::from_static(&[
713      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
714      0x00, 0x01, 0x00, 0x00, 0x00, 0x09, 0x01, 0x08, 0x00, 0x0e, 0x15, 0xf3, 0x5e, 0x00, 0x28,
715      0x74, 0xd2, 0x15, 0x05, 0xa8, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x01, 0x00, 0xc7, 0x00,
716      0x01, 0x00, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00,
717      0x15, 0x00, 0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00,
718      0x00, 0x50, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00,
719      0x00, 0x00, 0x00, 0x00, 0x01, 0xc1, 0x32, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf4,
720      0x1c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
721      0x0a, 0x50, 0x8e, 0x68, 0x31, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00,
722      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50,
723      0x8e, 0x68, 0x02, 0x00, 0x08, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x58,
724      0x00, 0x04, 0x00, 0x3f, 0x0c, 0x3f, 0x0c, 0x62, 0x00, 0x18, 0x00, 0x14, 0x00, 0x00, 0x00,
725      0x66, 0x61, 0x73, 0x74, 0x72, 0x74, 0x70, 0x73, 0x50, 0x61, 0x72, 0x74, 0x69, 0x63, 0x69,
726      0x70, 0x61, 0x6e, 0x74, 0x00, 0x01, 0x00, 0x00, 0x00,
727    ]);
728
729    let rtps = Message::read_from_buffer(&bits1).unwrap();
730    info!("{rtps:?}");
731
732    let serialized = Bytes::from(
733      rtps
734        .write_to_vec_with_ctx(Endianness::LittleEndian)
735        .unwrap(),
736    );
737    assert_eq!(bits1, serialized);
738  }
739
740  #[test]
741  fn rtps_message_infoDST_infoTS_Data_w_heartbeat() {
742    // captured with wireshark from shapes demo.
743    // rtps packet with InfoDST InfoTS Data(w) Heartbeat
744    // This datamessage serialized payload maybe contains topic name (square) and
745    // its type (shapetype) look https://www.omg.org/spec/DDSI-RTPS/2.3/PDF page 185
746    let bits1 = Bytes::from_static(&[
747      0x52, 0x54, 0x50, 0x53, 0x02, 0x03, 0x01, 0x0f, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00,
748      0x00, 0x01, 0x00, 0x00, 0x00, 0x0e, 0x01, 0x0c, 0x00, 0x01, 0x03, 0x00, 0x0c, 0x29, 0x2d,
749      0x31, 0xa2, 0x28, 0x20, 0x02, 0x08, 0x09, 0x01, 0x08, 0x00, 0x12, 0x15, 0xf3, 0x5e, 0x00,
750      0xc8, 0xa9, 0xfa, 0x15, 0x05, 0x0c, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x03, 0xc7,
751      0x00, 0x00, 0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x03, 0x00,
752      0x00, 0x2f, 0x00, 0x18, 0x00, 0x01, 0x00, 0x00, 0x00, 0xf5, 0x1c, 0x00, 0x00, 0x00, 0x00,
753      0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x50, 0x8e, 0x68, 0x50,
754      0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
755      0x00, 0x00, 0x01, 0xc1, 0x05, 0x00, 0x0c, 0x00, 0x07, 0x00, 0x00, 0x00, 0x53, 0x71, 0x75,
756      0x61, 0x72, 0x65, 0x00, 0x00, 0x07, 0x00, 0x10, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x53, 0x68,
757      0x61, 0x70, 0x65, 0x54, 0x79, 0x70, 0x65, 0x00, 0x00, 0x00, 0x70, 0x00, 0x10, 0x00, 0x01,
758      0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02,
759      0x5a, 0x00, 0x10, 0x00, 0x01, 0x0f, 0x99, 0x06, 0x78, 0x34, 0x00, 0x00, 0x01, 0x00, 0x00,
760      0x00, 0x00, 0x00, 0x01, 0x02, 0x60, 0x00, 0x04, 0x00, 0x5f, 0x01, 0x00, 0x00, 0x15, 0x00,
761      0x04, 0x00, 0x02, 0x03, 0x00, 0x00, 0x16, 0x00, 0x04, 0x00, 0x01, 0x0f, 0x00, 0x00, 0x1d,
762      0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x23, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f,
763      0xff, 0xff, 0xff, 0xff, 0x27, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
764      0x00, 0x1b, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff,
765      0xff, 0xff, 0x1a, 0x00, 0x0c, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x9a,
766      0x99, 0x99, 0x19, 0x2b, 0x00, 0x08, 0x00, 0xff, 0xff, 0xff, 0x7f, 0xff, 0xff, 0xff, 0xff,
767      0x1f, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x25, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00,
768      0x00, 0x01, 0x00, 0x00, 0x00, 0x07, 0x01, 0x1c, 0x00, 0x00, 0x00, 0x03, 0xc7, 0x00, 0x00,
769      0x03, 0xc2, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
770      0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00,
771    ]);
772
773    let rtps = Message::read_from_buffer(&bits1).unwrap();
774    info!("{rtps:?}");
775
776    let data_submessage = match &rtps.submessages[2] {
777      Submessage {
778        header: _,
779        body: SubmessageBody::Writer(WriterSubmessage::Data(d, _flags)),
780        ..
781      } => d,
782      wtf => panic!("Unexpected message structure {wtf:?}"),
783    };
784
785    let serialized_payload = data_submessage.unwrap_serialized_payload_value().clone();
786    info!("{serialized_payload:x?}");
787
788    let serialized = Bytes::from(
789      rtps
790        .write_to_vec_with_ctx(Endianness::LittleEndian)
791        .unwrap(),
792    );
793    assert_eq!(bits1, serialized);
794  }
795
796  #[test]
797  fn fuzz_rtps() {
798    // https://github.com/jhelovuo/RustDDS/issues/280
799    use hex_literal::hex;
800
801    let bits = Bytes::copy_from_slice(&hex!(
802      "
803      52 54 50 53
804      02 02 ff ff 01 0f 45 d2 b3 f5 58 b9 01 00 00 00
805      15 0b 18 00 00 00 00 00 00 00 02 c2 00 00 00 00
806      7d 00 00 00 00 01 00 00
807    "
808    ));
809    info!("bytes = {bits:?}");
810    let rtps = Message::read_from_buffer(&bits);
811    info!("read_from_buffer() --> {rtps:?}");
812    // if we get here without panic, the test passes
813  }
814}