rtps_parser/rtps/messages/submessages/
data_frag.rs

1use std::io::BufRead;
2
3use crate::{
4    parameter_id_values::PID_SENTINEL,
5    rtps::{
6        messages::{
7            overall_structure::{
8                RtpsMap, Submessage, SubmessageHeader, SubmessageHeaderRead, SubmessageHeaderWrite,
9            },
10            submessage_elements::{ArcSlice, Data, ParameterList, SubmessageElement},
11            types::{FragmentNumber, ParameterId, SubmessageFlag, SubmessageKind},
12        },
13        types::{EntityId, SequenceNumber},
14    },
15};
16
17#[derive(Debug, PartialEq, Eq, Clone)]
18pub struct DataFragSubmessageRead {
19    data: ArcSlice,
20}
21
22impl SubmessageHeader for DataFragSubmessageRead {
23    fn submessage_header(&self) -> SubmessageHeaderRead {
24        SubmessageHeaderRead::new(self.data.as_slice())
25    }
26}
27
28impl DataFragSubmessageRead {
29    pub fn new(data: ArcSlice) -> Self {
30        Self { data }
31    }
32
33    fn octets_to_inline_qos(&self) -> u16 {
34        self.map(&self.data[6..])
35    }
36
37    fn inline_qos_len(&self) -> usize {
38        if self.inline_qos_flag() {
39            let mut parameter_list_buf = &self.data[8 + self.octets_to_inline_qos() as usize..];
40            let parameter_list_buf_length = parameter_list_buf.len();
41            loop {
42                let pid: ParameterId = self.map(parameter_list_buf);
43                parameter_list_buf.consume(2);
44                let length: i16 = self.map(parameter_list_buf);
45                parameter_list_buf.consume(2);
46                if pid == PID_SENTINEL {
47                    break;
48                } else {
49                    parameter_list_buf.consume(length as usize);
50                }
51            }
52            parameter_list_buf_length - parameter_list_buf.len()
53        } else {
54            0
55        }
56    }
57
58    pub fn _endianness_flag(&self) -> bool {
59        (self.data[1] & 0b_0000_0001) != 0
60    }
61
62    pub fn inline_qos_flag(&self) -> bool {
63        (self.data[1] & 0b_0000_0010) != 0
64    }
65
66    pub fn key_flag(&self) -> bool {
67        (self.data[1] & 0b_0000_0100) != 0
68    }
69
70    pub fn _non_standard_payload_flag(&self) -> bool {
71        (self.data[1] & 0b_0000_1000) != 0
72    }
73
74    pub fn reader_id(&self) -> EntityId {
75        self.map(&self.data[8..])
76    }
77
78    pub fn writer_id(&self) -> EntityId {
79        self.map(&self.data[12..])
80    }
81
82    pub fn writer_sn(&self) -> SequenceNumber {
83        self.map(&self.data[16..])
84    }
85
86    pub fn fragment_starting_num(&self) -> FragmentNumber {
87        self.map(&self.data[24..])
88    }
89
90    pub fn fragments_in_submessage(&self) -> u16 {
91        self.map(&self.data[28..])
92    }
93
94    pub fn fragment_size(&self) -> u16 {
95        self.map(&self.data[30..])
96    }
97
98    pub fn data_size(&self) -> u32 {
99        self.map(&self.data[32..])
100    }
101
102    pub fn inline_qos(&self) -> ParameterList {
103        if self.inline_qos_flag() {
104            self.map(&self.data[self.octets_to_inline_qos() as usize + 8..])
105        } else {
106            ParameterList::empty()
107        }
108    }
109
110    pub fn serialized_payload(&self) -> Data {
111        Data::new(
112            self.data
113                .sub_slice(8 + self.octets_to_inline_qos() as usize + self.inline_qos_len()..),
114        )
115    }
116}
117
118#[derive(Debug, PartialEq, Eq)]
119pub struct DataFragSubmessageWrite<'a> {
120    inline_qos_flag: SubmessageFlag,
121    non_standard_payload_flag: SubmessageFlag,
122    key_flag: SubmessageFlag,
123    submessage_elements: [SubmessageElement<'a>; 9],
124    inline_qos_submessage_element: Option<SubmessageElement<'a>>,
125    serialized_payload_submessage_element: SubmessageElement<'a>,
126}
127
128impl<'a> DataFragSubmessageWrite<'a> {
129    #[allow(clippy::too_many_arguments)]
130    pub fn new(
131        inline_qos_flag: SubmessageFlag,
132        non_standard_payload_flag: SubmessageFlag,
133        key_flag: SubmessageFlag,
134        reader_id: EntityId,
135        writer_id: EntityId,
136        writer_sn: SequenceNumber,
137        fragment_starting_num: FragmentNumber,
138        fragments_in_submessage: u16,
139        data_size: u32,
140        fragment_size: u16,
141        inline_qos: &'a ParameterList,
142        serialized_payload: &'a Data,
143    ) -> Self {
144        const EXTRA_FLAGS: u16 = 0;
145        const OCTETS_TO_INLINE_QOS: u16 = 28;
146        let submessage_elements = [
147            SubmessageElement::UShort(EXTRA_FLAGS),
148            SubmessageElement::UShort(OCTETS_TO_INLINE_QOS),
149            SubmessageElement::EntityId(reader_id),
150            SubmessageElement::EntityId(writer_id),
151            SubmessageElement::SequenceNumber(writer_sn),
152            SubmessageElement::FragmentNumber(fragment_starting_num),
153            SubmessageElement::UShort(fragments_in_submessage),
154            SubmessageElement::UShort(fragment_size),
155            SubmessageElement::ULong(data_size),
156        ];
157        let inline_qos_submessage_element = if inline_qos_flag {
158            Some(SubmessageElement::ParameterList(inline_qos))
159        } else {
160            None
161        };
162        let serialized_payload_submessage_element =
163            SubmessageElement::SerializedData(serialized_payload);
164
165        Self {
166            inline_qos_flag,
167            non_standard_payload_flag,
168            key_flag,
169            submessage_elements,
170            inline_qos_submessage_element,
171            serialized_payload_submessage_element,
172        }
173    }
174}
175
176impl<'a> Submessage<'a> for DataFragSubmessageWrite<'a> {
177    type SubmessageList = std::iter::Chain<
178        std::iter::Chain<
179            std::slice::Iter<'a, SubmessageElement<'a>>,
180            std::option::Iter<'a, SubmessageElement<'a>>,
181        >,
182        std::iter::Once<&'a SubmessageElement<'a>>,
183    >;
184
185    fn submessage_header(&self, octets_to_next_header: u16) -> SubmessageHeaderWrite {
186        SubmessageHeaderWrite::new(
187            SubmessageKind::DATA_FRAG,
188            &[
189                self.inline_qos_flag,
190                self.key_flag,
191                self.non_standard_payload_flag,
192            ],
193            octets_to_next_header,
194        )
195    }
196
197    fn submessage_elements(&'a self) -> Self::SubmessageList {
198        self.submessage_elements
199            .iter()
200            .chain(self.inline_qos_submessage_element.iter())
201            .chain(std::iter::once(&self.serialized_payload_submessage_element))
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::rtps::{
209        messages::{
210            overall_structure::{into_bytes_vec, RtpsSubmessageWriteKind},
211            submessage_elements::Parameter,
212        },
213        types::{USER_DEFINED_READER_GROUP, USER_DEFINED_READER_NO_KEY},
214    };
215
216    #[test]
217    fn serialize_no_inline_qos_no_serialized_payload() {
218        let inline_qos = &ParameterList::empty();
219        let serialized_payload = &Data::new(vec![].into());
220        let submessage = RtpsSubmessageWriteKind::DataFrag(DataFragSubmessageWrite::new(
221            false,
222            false,
223            false,
224            EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY),
225            EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP),
226            SequenceNumber::from(5),
227            2,
228            3,
229            4,
230            5,
231            inline_qos,
232            serialized_payload,
233        ));
234        #[rustfmt::skip]
235        assert_eq!(into_bytes_vec(submessage), vec![
236                0x16_u8, 0b_0000_0001, 32, 0, // Submessage header
237                0, 0, 28, 0, // extraFlags, octetsToInlineQos
238                1, 2, 3, 4, // readerId: value[4]
239                6, 7, 8, 9, // writerId: value[4]
240                0, 0, 0, 0, // writerSN: high
241                5, 0, 0, 0, // writerSN: low
242                2, 0, 0, 0, // fragmentStartingNum
243                3, 0, 5, 0, // fragmentsInSubmessage | fragmentSize
244                4, 0, 0, 0, // sampleSize
245            ]
246        );
247    }
248
249    #[test]
250    fn serialize_with_inline_qos_with_serialized_payload() {
251        let inline_qos = ParameterList::new(vec![Parameter::new(8, vec![71, 72, 73, 74])]);
252        let serialized_payload = Data::new(vec![1, 2, 3].into());
253        let submessage = RtpsSubmessageWriteKind::DataFrag(DataFragSubmessageWrite::new(
254            true,
255            false,
256            false,
257            EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY),
258            EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP),
259            SequenceNumber::from(6),
260            2,
261            3,
262            8,
263            5,
264            &inline_qos,
265            &serialized_payload,
266        ));
267        #[rustfmt::skip]
268        assert_eq!(into_bytes_vec(submessage), vec![
269                0x16_u8, 0b_0000_0011, 48, 0, // Submessage header
270                0, 0, 28, 0, // extraFlags | octetsToInlineQos
271                1, 2, 3, 4, // readerId
272                6, 7, 8, 9, // writerId
273                0, 0, 0, 0, // writerSN: high
274                6, 0, 0, 0, // writerSN: low
275                2, 0, 0, 0, // fragmentStartingNum
276                3, 0, 5, 0, // fragmentsInSubmessage | fragmentSize
277                8, 0, 0, 0, // sampleSize
278                8, 0, 4, 0, // inlineQos: parameterId, length
279                71, 72, 73, 74, // inlineQos: value[length]
280                1, 0, 0, 0, // inlineQos: Sentinel
281                1, 2, 3, 0, // serializedPayload
282            ]
283        );
284    }
285
286    #[test]
287    fn deserialize_no_inline_qos_no_serialized_payload() {
288        #[rustfmt::skip]
289        let submessage = DataFragSubmessageRead::new(vec![
290            0x16_u8, 0b_0000_0001, 32, 0, // Submessage header
291            0, 0, 28, 0, // extraFlags, octetsToInlineQos
292            1, 2, 3, 4, // readerId: value[4]
293            6, 7, 8, 9, // writerId: value[4]
294            0, 0, 0, 0, // writerSN: high
295            5, 0, 0, 0, // writerSN: low
296            2, 0, 0, 0, // fragmentStartingNum
297            3, 0, 5, 0, // fragmentsInSubmessage | fragmentSize
298            4, 0, 0, 0, // sampleSize
299        ].into());
300
301        let expected_inline_qos_flag = false;
302        let expected_non_standard_payload_flag = false;
303        let expected_key_flag = false;
304        let expected_reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
305        let expected_writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
306        let expected_writer_sn = SequenceNumber::from(5);
307        let expected_fragment_starting_num = 2;
308        let expected_fragments_in_submessage = 3;
309        let expected_data_size = 4;
310        let expected_fragment_size = 5;
311        let expected_inline_qos = ParameterList::empty();
312        let expected_serialized_payload = Data::new(vec![].into());
313
314        assert_eq!(expected_inline_qos_flag, submessage.inline_qos_flag());
315        assert_eq!(
316            expected_non_standard_payload_flag,
317            submessage._non_standard_payload_flag()
318        );
319        assert_eq!(expected_key_flag, submessage.key_flag());
320        assert_eq!(expected_reader_id, submessage.reader_id());
321        assert_eq!(expected_writer_id, submessage.writer_id());
322        assert_eq!(expected_writer_sn, submessage.writer_sn());
323        assert_eq!(
324            expected_fragment_starting_num,
325            submessage.fragment_starting_num()
326        );
327        assert_eq!(
328            expected_fragments_in_submessage,
329            submessage.fragments_in_submessage()
330        );
331        assert_eq!(expected_data_size, submessage.data_size());
332        assert_eq!(expected_fragment_size, submessage.fragment_size());
333        assert_eq!(expected_inline_qos, submessage.inline_qos());
334        assert_eq!(expected_serialized_payload, submessage.serialized_payload());
335    }
336
337    #[test]
338    fn deserialize_with_inline_qos_with_serialized_payload() {
339        #[rustfmt::skip]
340        let submessage = DataFragSubmessageRead::new(vec![
341            0x16_u8, 0b_0000_0011, 48, 0, // Submessage header
342            0, 0, 28, 0, // extraFlags | octetsToInlineQos
343            1, 2, 3, 4, // readerId
344            6, 7, 8, 9, // writerId
345            0, 0, 0, 0, // writerSN: high
346            6, 0, 0, 0, // writerSN: low
347            2, 0, 0, 0, // fragmentStartingNum
348            3, 0, 5, 0, // fragmentsInSubmessage | fragmentSize
349            8, 0, 0, 0, // sampleSize
350            8, 0, 4, 0, // inlineQos: parameterId, length
351            71, 72, 73, 74, // inlineQos: value[length]
352            1, 0, 0, 0, // inlineQos: Sentinel
353            1, 2, 3, 0, // serializedPayload
354        ].into());
355
356        let expected_inline_qos_flag = true;
357        let expected_non_standard_payload_flag = false;
358        let expected_key_flag = false;
359        let expected_reader_id = EntityId::new([1, 2, 3], USER_DEFINED_READER_NO_KEY);
360        let expected_writer_id = EntityId::new([6, 7, 8], USER_DEFINED_READER_GROUP);
361        let expected_writer_sn = SequenceNumber::from(6);
362        let expected_fragment_starting_num = 2;
363        let expected_fragments_in_submessage = 3;
364        let expected_data_size = 8;
365        let expected_fragment_size = 5;
366        let expected_inline_qos = ParameterList::new(vec![Parameter::new(8, vec![71, 72, 73, 74])]);
367        let expected_serialized_payload = Data::new(vec![1, 2, 3, 0].into());
368
369        assert_eq!(expected_inline_qos_flag, submessage.inline_qos_flag());
370        assert_eq!(
371            expected_non_standard_payload_flag,
372            submessage._non_standard_payload_flag()
373        );
374        assert_eq!(expected_key_flag, submessage.key_flag());
375        assert_eq!(expected_reader_id, submessage.reader_id());
376        assert_eq!(expected_writer_id, submessage.writer_id());
377        assert_eq!(expected_writer_sn, submessage.writer_sn());
378        assert_eq!(
379            expected_fragment_starting_num,
380            submessage.fragment_starting_num()
381        );
382        assert_eq!(
383            expected_fragments_in_submessage,
384            submessage.fragments_in_submessage()
385        );
386        assert_eq!(expected_data_size, submessage.data_size());
387        assert_eq!(expected_fragment_size, submessage.fragment_size());
388        assert_eq!(expected_inline_qos, submessage.inline_qos());
389        assert_eq!(expected_serialized_payload, submessage.serialized_payload());
390    }
391}