1use byteorder::ReadBytesExt;
2
3use crate::{
4 parameter_id_values::PID_SENTINEL,
5 rtps::{
6 messages::{
7 overall_structure::{
8 RtpsMap, RtpsSubmessageWriteKind, Submessage, SubmessageHeader,
9 SubmessageHeaderRead, SubmessageHeaderWrite, WriteBytes,
10 },
11 submessage_elements::{ArcSlice, Data, ParameterList, SubmessageElement},
12 types::{SubmessageFlag, SubmessageKind},
13 },
14 types::{EntityId, SequenceNumber},
15 },
16};
17
18#[derive(Debug, PartialEq, Eq)]
19pub struct DataSubmessageRead {
20 data: ArcSlice,
21}
22
23impl SubmessageHeader for DataSubmessageRead {
24 fn submessage_header(&self) -> SubmessageHeaderRead {
25 SubmessageHeaderRead::new(self.data.as_slice())
26 }
27}
28
29impl From<ArcSlice> for DataSubmessageRead {
30 fn from(value: ArcSlice) -> Self {
31 Self { data: value }
32 }
33}
34
35impl DataSubmessageRead {
36 #[allow(clippy::too_many_arguments)]
37 pub fn new(
38 inline_qos_flag: SubmessageFlag,
39 data_flag: SubmessageFlag,
40 key_flag: SubmessageFlag,
41 non_standard_payload_flag: SubmessageFlag,
42 reader_id: EntityId,
43 writer_id: EntityId,
44 writer_sn: SequenceNumber,
45 inline_qos: &ParameterList,
46 serialized_payload: &Data,
47 ) -> Self {
48 let empty_serialized_payload = Data::new(ArcSlice::from(vec![]));
49 let data_write = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
50 inline_qos_flag,
51 data_flag,
52 key_flag,
53 non_standard_payload_flag,
54 reader_id,
55 writer_id,
56 writer_sn,
57 inline_qos,
58 &empty_serialized_payload,
59 ));
60 let mut data = vec![0; 1500];
61 let length_sofar = data_write.write_bytes(&mut data);
62 data.truncate(length_sofar);
63 data.extend_from_slice(serialized_payload.as_ref());
64
65 Self {
66 data: ArcSlice::from(data),
67 }
68 }
69
70 fn octets_to_inline_qos(&self) -> usize {
71 (&self.data[6..])
72 .read_u16::<byteorder::LittleEndian>()
73 .unwrap() as usize
74 }
75
76 fn inline_qos_len(&self) -> usize {
77 let mut parameter_list_buf = &self.data[8 + self.octets_to_inline_qos()..];
78 let parameter_list_buf_length = parameter_list_buf.len();
79
80 if self.inline_qos_flag() {
81 loop {
82 let pid = parameter_list_buf
83 .read_i16::<byteorder::LittleEndian>()
84 .expect("pid read failed");
85 let length = parameter_list_buf
86 .read_i16::<byteorder::LittleEndian>()
87 .expect("length read failed");
88 if pid == PID_SENTINEL {
89 break;
90 } else {
91 (_, parameter_list_buf) = parameter_list_buf.split_at(length as usize);
92 }
93 }
94 parameter_list_buf_length - parameter_list_buf.len()
95 } else {
96 0
97 }
98 }
99
100 pub fn inline_qos_flag(&self) -> bool {
101 self.submessage_header().flags()[1]
102 }
103
104 pub fn _data_flag(&self) -> bool {
105 self.submessage_header().flags()[2]
106 }
107
108 pub fn key_flag(&self) -> bool {
109 self.submessage_header().flags()[3]
110 }
111
112 pub fn _non_standard_payload_flag(&self) -> bool {
113 self.submessage_header().flags()[4]
114 }
115
116 pub fn reader_id(&self) -> EntityId {
117 self.map(&self.data[8..])
118 }
119
120 pub fn writer_id(&self) -> EntityId {
121 self.map(&self.data[12..])
122 }
123
124 pub fn writer_sn(&self) -> SequenceNumber {
125 self.map(&self.data[16..])
126 }
127
128 pub fn inline_qos(&self) -> ParameterList {
129 if self.inline_qos_flag() {
130 self.map(&self.data[self.octets_to_inline_qos() + 8..])
131 } else {
132 ParameterList::empty()
133 }
134 }
135
136 pub fn serialized_payload(&self) -> Data {
137 Data::new(
138 self.data
139 .sub_slice(8 + self.octets_to_inline_qos() + self.inline_qos_len()..),
140 )
141 }
142}
143
144#[derive(Debug, PartialEq, Eq)]
145pub struct DataSubmessageWrite<'a> {
146 inline_qos_flag: SubmessageFlag,
147 data_flag: SubmessageFlag,
148 key_flag: SubmessageFlag,
149 non_standard_payload_flag: SubmessageFlag,
150 submessage_elements: [SubmessageElement<'a>; 5],
151 inline_qos_submessage_element: Option<SubmessageElement<'a>>,
152 serialized_payload_submessage_element: Option<SubmessageElement<'a>>,
153}
154
155impl<'a> DataSubmessageWrite<'a> {
156 #[allow(clippy::too_many_arguments)]
157 pub fn new(
158 inline_qos_flag: SubmessageFlag,
159 data_flag: SubmessageFlag,
160 key_flag: SubmessageFlag,
161 non_standard_payload_flag: SubmessageFlag,
162 reader_id: EntityId,
163 writer_id: EntityId,
164 writer_sn: SequenceNumber,
165 inline_qos: &'a ParameterList,
166 serialized_payload: &'a Data,
167 ) -> Self {
168 const EXTRA_FLAGS: u16 = 0;
169 const OCTETS_TO_INLINE_QOS: u16 = 16;
170 let submessage_elements = [
171 SubmessageElement::UShort(EXTRA_FLAGS),
172 SubmessageElement::UShort(OCTETS_TO_INLINE_QOS),
173 SubmessageElement::EntityId(reader_id),
174 SubmessageElement::EntityId(writer_id),
175 SubmessageElement::SequenceNumber(writer_sn),
176 ];
177 let inline_qos_submessage_element = if inline_qos_flag {
178 Some(SubmessageElement::ParameterList(inline_qos))
179 } else {
180 None
181 };
182
183 let serialized_payload_submessage_element = if data_flag || key_flag {
184 Some(SubmessageElement::SerializedData(serialized_payload))
185 } else {
186 None
187 };
188 Self {
189 inline_qos_flag,
190 data_flag,
191 key_flag,
192 non_standard_payload_flag,
193 submessage_elements,
194 inline_qos_submessage_element,
195 serialized_payload_submessage_element,
196 }
197 }
198}
199
200impl<'a> Submessage<'a> for DataSubmessageWrite<'a> {
201 type SubmessageList = std::iter::Chain<
202 std::iter::Chain<
203 std::slice::Iter<'a, SubmessageElement<'a>>,
204 std::option::Iter<'a, SubmessageElement<'a>>,
205 >,
206 std::option::Iter<'a, SubmessageElement<'a>>,
207 >;
208
209 fn submessage_header(&self, octets_to_next_header: u16) -> SubmessageHeaderWrite {
210 SubmessageHeaderWrite::new(
211 SubmessageKind::DATA,
212 &[
213 self.inline_qos_flag,
214 self.data_flag,
215 self.key_flag,
216 self.non_standard_payload_flag,
217 ],
218 octets_to_next_header,
219 )
220 }
221
222 fn submessage_elements(&'a self) -> Self::SubmessageList {
223 self.submessage_elements
224 .iter()
225 .chain(self.inline_qos_submessage_element.iter())
226 .chain(self.serialized_payload_submessage_element.iter())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use crate::rtps::{
234 messages::{
235 overall_structure::{into_bytes_vec, RtpsSubmessageWriteKind},
236 submessage_elements::Parameter,
237 },
238 types::{USER_DEFINED_READER_GROUP, USER_DEFINED_READER_NO_KEY},
239 };
240
241 #[test]
242 fn serialize_no_inline_qos_no_serialized_payload() {
243 let inline_qos_flag = false;
244 let data_flag = false;
245 let key_flag = false;
246 let non_standard_payload_flag = false;
247 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
248 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
249 let writer_sn = SequenceNumber::from(5);
250 let inline_qos = &ParameterList::empty();
251 let serialized_payload = &Data::new(vec![].into());
252 let submessage = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
253 inline_qos_flag,
254 data_flag,
255 key_flag,
256 non_standard_payload_flag,
257 reader_id,
258 writer_id,
259 writer_sn,
260 inline_qos,
261 serialized_payload,
262 ));
263 #[rustfmt::skip]
264 assert_eq!(into_bytes_vec(submessage), vec![
265 0x15_u8, 0b_0000_0001, 20, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, ]
272 );
273 }
274
275 #[test]
276 fn serialize_with_inline_qos_no_serialized_payload() {
277 let inline_qos_flag = true;
278 let data_flag = false;
279 let key_flag = false;
280 let non_standard_payload_flag = false;
281 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
282 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
283 let writer_sn = SequenceNumber::from(5);
284 let parameter_1 = Parameter::new(6, vec![10, 11, 12, 13]);
285 let parameter_2 = Parameter::new(7, vec![20, 21, 22, 23]);
286 let inline_qos = &ParameterList::new(vec![parameter_1, parameter_2]);
287 let serialized_payload = &Data::new(vec![].into());
288
289 let submessage = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
290 inline_qos_flag,
291 data_flag,
292 key_flag,
293 non_standard_payload_flag,
294 reader_id,
295 writer_id,
296 writer_sn,
297 inline_qos,
298 serialized_payload,
299 ));
300 #[rustfmt::skip]
301 assert_eq!(into_bytes_vec(submessage), vec![
302 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 0, 0, ]
314 );
315 }
316
317 #[test]
318 fn serialize_no_inline_qos_with_serialized_payload() {
319 let inline_qos_flag = false;
320 let data_flag = true;
321 let key_flag = false;
322 let non_standard_payload_flag = false;
323 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
324 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
325 let writer_sn = SequenceNumber::from(5);
326 let inline_qos = &ParameterList::empty();
327 let serialized_payload = &Data::new(vec![1, 2, 3, 4].into());
328 let submessage = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
329 inline_qos_flag,
330 data_flag,
331 key_flag,
332 non_standard_payload_flag,
333 reader_id,
334 writer_id,
335 writer_sn,
336 inline_qos,
337 serialized_payload,
338 ));
339 #[rustfmt::skip]
340 assert_eq!(into_bytes_vec(submessage), vec![
341 0x15, 0b_0000_0101, 24, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 1, 2, 3, 4, ]
349 );
350 }
351
352 #[test]
353 fn serialize_no_inline_qos_with_serialized_payload_non_multiple_of_4() {
354 let inline_qos_flag = false;
355 let data_flag = true;
356 let key_flag = false;
357 let non_standard_payload_flag = false;
358 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
359 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
360 let writer_sn = SequenceNumber::from(5);
361 let inline_qos = &ParameterList::empty();
362 let serialized_payload = &Data::new(vec![1, 2, 3].into());
363 let submessage = RtpsSubmessageWriteKind::Data(DataSubmessageWrite::new(
364 inline_qos_flag,
365 data_flag,
366 key_flag,
367 non_standard_payload_flag,
368 reader_id,
369 writer_id,
370 writer_sn,
371 inline_qos,
372 serialized_payload,
373 ));
374 #[rustfmt::skip]
375 assert_eq!(into_bytes_vec(submessage), vec![
376 0x15, 0b_0000_0101, 24, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 1, 2, 3, 0, ]
384 );
385 }
386
387 #[test]
388 fn deserialize_no_inline_qos_no_serialized_payload() {
389 let inline_qos_flag = false;
390 let data_flag = false;
391 let key_flag = false;
392 let non_standard_payload_flag = false;
393 let reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
394 let writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
395 let writer_sn = SequenceNumber::from(5);
396 let inline_qos = ParameterList::empty();
397 let serialized_payload = Data::new(vec![].into());
398
399 #[rustfmt::skip]
400 let data_submessage = DataSubmessageRead::from(ArcSlice::from(vec![
401 0x15, 0b_0000_0001, 20, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, ]));
408
409 assert_eq!(inline_qos_flag, data_submessage.inline_qos_flag());
410 assert_eq!(data_flag, data_submessage._data_flag());
411 assert_eq!(key_flag, data_submessage.key_flag());
412 assert_eq!(
413 non_standard_payload_flag,
414 data_submessage._non_standard_payload_flag()
415 );
416 assert_eq!(reader_id, data_submessage.reader_id());
417 assert_eq!(writer_id, data_submessage.writer_id());
418 assert_eq!(writer_sn, data_submessage.writer_sn());
419 assert_eq!(inline_qos, data_submessage.inline_qos());
420 assert_eq!(serialized_payload, data_submessage.serialized_payload());
421 }
422
423 #[test]
424 fn deserialize_no_inline_qos_with_serialized_payload() {
425 let inline_qos = ParameterList::empty();
426 let serialized_payload = Data::new(vec![1, 2, 3, 4].into());
427
428 #[rustfmt::skip]
429 let data_submessage = DataSubmessageRead::from(ArcSlice::from(vec![
430 0x15, 0b_0000_0101, 24, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 1, 2, 3, 4, ]));
438 assert_eq!(inline_qos, data_submessage.inline_qos());
439 assert_eq!(serialized_payload, data_submessage.serialized_payload());
440 }
441
442 #[test]
443 fn deserialize_with_inline_qos_no_serialized_payload() {
444 let inline_qos = ParameterList::new(vec![
445 Parameter::new(6, vec![10, 11, 12, 13]),
446 Parameter::new(7, vec![20, 21, 22, 23]),
447 ]);
448 let serialized_payload = Data::new(vec![].into());
449
450 #[rustfmt::skip]
451 let data_submessage = DataSubmessageRead::from(ArcSlice::from(vec![
452 0x15, 0b_0000_0011, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 1, 0, ]));
464 assert_eq!(inline_qos, data_submessage.inline_qos());
465 assert_eq!(serialized_payload, data_submessage.serialized_payload());
466 }
467
468 #[test]
469 fn deserialize_with_inline_qos_with_serialized_payload() {
470 let inline_qos = ParameterList::new(vec![
471 Parameter::new(6, vec![10, 11, 12, 13]),
472 Parameter::new(7, vec![20, 21, 22, 23]),
473 ]);
474 let serialized_payload = Data::new(vec![1, 2, 3, 4].into());
475
476 #[rustfmt::skip]
477 let data_submessage = DataSubmessageRead::from(ArcSlice::from(vec![
478 0x15, 0b_0000_0111, 40, 0, 0, 0, 16, 0, 1, 2, 3, 4, 6, 7, 8, 9, 0, 0, 0, 0, 5, 0, 0, 0, 6, 0, 4, 0, 10, 11, 12, 13, 7, 0, 4, 0, 20, 21, 22, 23, 1, 0, 1, 0, 1, 2, 3, 4, ]));
491 assert_eq!(inline_qos, data_submessage.inline_qos());
492 assert_eq!(serialized_payload, data_submessage.serialized_payload());
493 }
494}