rustdds/messages/submessages/
data.rs

1use std::io;
2
3use bytes::Bytes;
4use speedy::{Context, Error, Readable, Writable, Writer};
5use enumflags2::BitFlags;
6
7use crate::{
8  messages::submessages::{elements::parameter_list::ParameterList, submessages::*},
9  serialization::{padding_needed_for_alignment_4, round_up_to_4},
10  structure::{guid::EntityId, sequence_number::SequenceNumber},
11};
12// use log::debug;
13#[cfg(test)]
14use super::elements::serialized_payload::SerializedPayload;
15
16/// This Submessage is sent from an RTPS Writer (NO_KEY or WITH_KEY)
17/// to an RTPS Reader (NO_KEY or WITH_KEY)
18///
19/// The Submessage notifies the RTPS Reader of a change to
20/// a data-object belonging to the RTPS Writer. The possible changes
21/// include both changes in value as well as changes to the lifecycle
22/// of the data-object.
23#[derive(Debug, PartialEq, Eq, Clone)]
24pub struct Data {
25  /// Identifies the RTPS Reader entity that is being informed of the change
26  /// to the data-object.
27  pub reader_id: EntityId,
28
29  /// Identifies the RTPS Writer entity that made the change to the
30  /// data-object.
31  pub writer_id: EntityId,
32
33  /// Uniquely identifies the change and the relative order for all changes
34  /// made by the RTPS Writer identified by the writerGuid. Each change
35  /// gets a consecutive sequence number. Each RTPS Writer maintains is
36  /// own sequence number.
37  pub writer_sn: SequenceNumber,
38
39  /// Contains QoS that may affect the interpretation of the message.
40  /// Present only if the InlineQosFlag is set in the header.
41  pub inline_qos: Option<ParameterList>,
42
43  /// If the DataFlag is set, then serialized_payload contains the encapsulation
44  /// of the new value of the data-object after the change.
45  /// If the KeyFlag is set, then it contains the encapsulation of
46  /// the key of the data-object the message refers to.
47  ///
48  /// In case of submessage protection, this payload contains the
49  /// encoded version of the original payload
50  pub serialized_payload: Option<Bytes>,
51}
52
53impl Data {
54  /// DATA submessage cannot be speedy Readable because deserializing this
55  /// requires info from submessage header. Required information is  expect_qos
56  /// and expect_payload, which are told on submessage header flags.
57  pub fn deserialize_data(buffer: &Bytes, flags: BitFlags<DATA_Flags>) -> io::Result<Self> {
58    let mut cursor = io::Cursor::new(&buffer);
59    let endianness = endianness_flag(flags.bits());
60    let map_speedy_err = |p: Error| io::Error::new(io::ErrorKind::Other, p);
61
62    let _extra_flags =
63      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
64    let octets_to_inline_qos =
65      u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
66    let reader_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
67      .map_err(map_speedy_err)?;
68    let writer_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
69      .map_err(map_speedy_err)?;
70    let sequence_number =
71      SequenceNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
72        .map_err(map_speedy_err)?;
73
74    let expect_qos = flags.contains(DATA_Flags::InlineQos);
75    let expect_data = flags.contains(DATA_Flags::Data) || flags.contains(DATA_Flags::Key);
76
77    // size of DATA-specific header above is
78    // extraFlags (2) + octetsToInlineQos (2) + readerId (4) + writerId (4) +
79    // writerSN (8) = 20 bytes
80    // of which 16 bytes is after octetsToInlineQos field.
81    let rtps_v23_data_header_size: u16 = 16;
82    // ... and octets_to_inline_qos must be at least this much, or otherwise inline
83    // Qos (or in case it is absent, the following SerializedPayload) would
84    // overlap with the rtps_v23_data_header fields (readerId, writerId, and
85    // writerSN).
86    if octets_to_inline_qos < rtps_v23_data_header_size {
87      return Err(io::Error::new(
88        io::ErrorKind::InvalidData,
89        format!("DATA submessage has invalid octets_to_inline_qos={octets_to_inline_qos}."),
90      ));
91    }
92
93    // We need to check to avoid subtract overflow
94    // https://github.com/jhelovuo/RustDDS/issues/277
95    if octets_to_inline_qos > rtps_v23_data_header_size {
96      let extra_octets = octets_to_inline_qos - rtps_v23_data_header_size;
97      // There may be some extra data between writerSN and inlineQos, if the header is
98      // extended in future versions. But as of RTPS v2.3 , extra_octets should be
99      // always zero.
100
101      // Nevertheless, skip over that extra data, if we are told such exists.
102      cursor.set_position(cursor.position() + u64::from(extra_octets));
103
104      if cursor.position() > buffer.len().try_into().unwrap() {
105        // octets_to_inline_qos told us to skip past the end of the message.
106        // This is a malformed message.
107        return Err(io::Error::new(
108          io::ErrorKind::InvalidData,
109          format!(
110            "DATA submessage octets_to_inline_qos points to byte {}, but message len={}.",
111            cursor.position(),
112            buffer.len()
113          ),
114        ));
115      }
116    }
117
118    // read the inline Qos
119    let parameter_list = if expect_qos {
120      Some(
121        ParameterList::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
122          .map_err(map_speedy_err)?,
123      )
124    } else {
125      None
126    };
127
128    // Serialized data is the rest of the submessage. It may contain some
129    // alignment padding, but at least a CDR decoder should be able to cope with
130    // that.
131    let serialized_payload = if expect_data {
132      Some(buffer.clone().split_off(cursor.position() as usize))
133    } else {
134      None
135    };
136
137    Ok(Self {
138      reader_id,
139      writer_id,
140      writer_sn: sequence_number,
141      inline_qos: parameter_list,
142      serialized_payload,
143    })
144  }
145
146  // Serialized length of Data submessage without submessage header.
147  // This is compatible with the definition of the definition of
148  // "octetsToNextHeader" field in RTPS spec v2.5 Section "9.4.5.1 Submessage
149  // Header".
150  pub fn len_serialized(&self) -> usize {
151    round_up_to_4(
152      2 + // extraFlags
153      2 + // octetsToInlineSos
154      4 + // readerId
155      4 + // writerId
156      8 + // writerSN
157      self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + // QoS ParameterList
158      self.serialized_payload.as_ref().map(|q| q.len()).unwrap_or(0),
159    )
160  }
161
162  #[cfg(test)]
163  pub(crate) fn unwrap_serialized_payload(&self) -> SerializedPayload {
164    self
165      .serialized_payload
166      .as_ref()
167      .map(SerializedPayload::from_bytes)
168      .unwrap()
169      .unwrap()
170  }
171
172  #[cfg(test)]
173  pub(crate) fn unwrap_serialized_payload_value(&self) -> Bytes {
174    self.unwrap_serialized_payload().value
175  }
176
177  #[cfg(test)]
178  pub(crate) fn update_serialized_payload_value(&mut self, new_value: Bytes) {
179    let mut payload = self.unwrap_serialized_payload();
180    payload.value = new_value;
181    self.serialized_payload = Some(payload.into());
182  }
183}
184
185impl<C: Context> Writable<C> for Data {
186  fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
187    // This version of the protocol (2.3) should set all the bits in the extraFlags
188    // to zero
189    writer.write_u16(0)?;
190    // The octetsToInlineQos field contains the number of octets starting from the
191    // first octet immediately following this field until the first octet of the
192    // inlineQos SubmessageElement. If the inlineQos SubmessageElement is not
193    // present (i.e., the InlineQosFlag is not set), then octetsToInlineQos contains
194    // the offset to the next field after the inlineQos.
195    writer.write_u16(16)?;
196
197    writer.write_value(&self.reader_id)?;
198    writer.write_value(&self.writer_id)?;
199    writer.write_value(&self.writer_sn)?;
200    if let Some(inline_qos) = self.inline_qos.as_ref() {
201      writer.write_value(inline_qos)?;
202    }
203
204    if let Some(serialized_payload) = self.serialized_payload.as_ref() {
205      writer.write_bytes(serialized_payload)?;
206      for _ in 0..padding_needed_for_alignment_4(serialized_payload.len()) {
207        writer.write_u8(0)?;
208      }
209    }
210
211    Ok(())
212  }
213}
214
215impl HasEntityIds for Data {
216  fn receiver_entity_id(&self) -> EntityId {
217    self.reader_id
218  }
219  fn sender_entity_id(&self) -> EntityId {
220    self.writer_id
221  }
222}