1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
use std::io;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use speedy::{Context, Error, Readable, Writable, Writer};
use enumflags2::BitFlags;
use bytes::Bytes;
use crate::{
messages::submessages::{elements::parameter_list::ParameterList, submessages::*},
structure::{
guid::EntityId,
sequence_number::{FragmentNumber, SequenceNumber},
},
};
/// The DataFrag Submessage extends the Data Submessage by enabling the
/// serializedData to be fragmented and sent as multiple DataFrag Submessages.
/// The fragments contained in the DataFrag Submessages are then re-assembled by
/// the RTPS Reader.
#[derive(Debug, PartialEq, Eq, Clone)]
#[cfg_attr(test, derive(Default))]
pub struct DataFrag {
/// Identifies the RTPS Reader entity that is being informed of the change
/// to the data-object.
pub reader_id: EntityId,
/// Identifies the RTPS Writer entity that made the change to the
/// data-object.
pub writer_id: EntityId,
/// Uniquely identifies the change and the relative order for all changes
/// made by the RTPS Writer identified by the writerGuid.
/// Each change gets a consecutive sequence number.
/// Each RTPS Writer maintains is own sequence number.
pub writer_sn: SequenceNumber,
/// Indicates the starting fragment for the series of fragments in
/// serialized_data. Fragment numbering starts with number 1.
pub fragment_starting_num: FragmentNumber,
/// The number of consecutive fragments contained in this Submessage,
/// starting at fragment_starting_num.
pub fragments_in_submessage: u16,
/// The total size in bytes of the original data before fragmentation.
pub data_size: u32,
/// The size of an individual fragment in bytes. The maximum fragment size
/// equals 64K.
pub fragment_size: u16,
/// Contains QoS that may affect the interpretation of the message.
/// Present only if the InlineQosFlag is set in the header.
pub inline_qos: Option<ParameterList>,
/// Encapsulation of a consecutive series of fragments, starting at
/// fragment_starting_num for a total of fragments_in_submessage.
/// Represents part of the new value of the data-object
/// after the change.
///
/// If payloads are protected, contains the buffer that decodes to the series
/// of fragments. In particular, a CryptoHeader, a plaintext buffer or
/// CryptoContent depending on the transformation kind, and
/// CryptoFooter, which have been serialized and concatenated.
///
/// Note: RTPS spec says the serialized_payload is of type SerializedPayload,
/// but that is technically incorrect. It is a fragment of
/// SerializedPayload. The headers at the beginning of SerializedPayload
/// appear only at the first fragment. The fragmentation mechanism here
/// should treat serialized_payload as an opaque stream of bytes.
pub serialized_payload: Bytes,
}
impl DataFrag {
// Serialized length of DataFrag submessage without submessage header.
// This is compatible with the definition of the definition of
// "octetsToNextHeader" field in RTPS spec v2.5 Section "9.4.5.1 Submessage
// Header".
pub fn len_serialized(&self) -> usize {
2 + // extraFlags (unused in RTPS v2.5)
2 + // octetsToInlineSos
4 + // readerId
4 + // writerId
8 + // writerSN
4 + // fragmentStartingNum
2 + // fragmentsInSubmessage
2 + // fragmentSize
4 + // sampleSize
self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + // QoS ParameterList
self.serialized_payload.len()
}
/// Spec talks about (expected) total number of fragments.
/// This is technically the last (Expected) fragment number, which should be
/// the same value.
pub fn total_number_of_fragments(&self) -> FragmentNumber {
// RTPS spec v2.5 Section "8.3.8.3.5 Logical Interpretation" defines this as
// follows "The total number of fragments to expect equals:
// (dataSize / fragmentSize) + ((dataSize % fragmentSize) ? 1 : 0) "
//
// Note: The above formula is a bit suspect, since fragmentSize == 0 seems to
// be allowed by the spec.
// This is a integer division with rounding up.
let frag_size = self.fragment_size as u32;
if frag_size < 1 {
FragmentNumber::INVALID
} else {
FragmentNumber::new(self.data_size.div_ceil(frag_size))
}
}
pub fn deserialize(buffer: &Bytes, flags: BitFlags<DATAFRAG_Flags>) -> io::Result<Self> {
let mut cursor = io::Cursor::new(&buffer);
let endianness = endianness_flag(flags.bits());
let map_speedy_err = |p: Error| io::Error::new(io::ErrorKind::Other, p);
let _extra_flags =
u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
let octets_to_inline_qos =
u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
let reader_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
.map_err(map_speedy_err)?;
let writer_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
.map_err(map_speedy_err)?;
let writer_sn = SequenceNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
.map_err(map_speedy_err)?;
let fragment_starting_num =
FragmentNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
.map_err(map_speedy_err)?;
let fragments_in_submessage =
u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
let fragment_size =
u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
let data_size =
u32::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
let expect_qos = flags.contains(DATAFRAG_Flags::InlineQos);
// let expect_key = flags.contains(DATAFRAG_Flags::Key);
// Size of header after "octets_to_inline_qos" field:
// reader_id: 4
// writer_id: 4
// writer_sn: 8
// fragment_starting_num: 4
// fragments_in_submessage: 2
// fragment_size: 2
// data_size: 4
//
// Total: 28
// Skip any possible fields we do not know about.
let rtps_v25_header_size: u16 = 28;
if octets_to_inline_qos < rtps_v25_header_size {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("DataFrag has too low octetsToInlineQos = {octets_to_inline_qos}"),
));
}
// condition to avoid subtract overflow
if octets_to_inline_qos > rtps_v25_header_size {
let extra_octets = octets_to_inline_qos - rtps_v25_header_size;
cursor.set_position(cursor.position() + u64::from(extra_octets));
if cursor.position() > buffer.len().try_into().unwrap() {
// octets_to_inline_qos told us to skip past the end of the message.
// This is a malformed message.
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"DATAFRAG submessage octets_to_inline_qos points to byte {}, but message len={}.",
cursor.position(),
buffer.len()
),
));
}
}
let inline_qos = if expect_qos {
Some(
ParameterList::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
.map_err(map_speedy_err)?,
)
} else {
None
};
// Validity checks from RTPS spec v2.5 Section 8.3.8.3.3 "Validity"
// writer_sn strictly positive
if writer_sn < SequenceNumber::new(1) {
return Err(io::Error::new(
io::ErrorKind::Other,
"DataFrag SequenceNumber < 1. Discarding as invalid.",
));
}
// Fragment size == 0 is not strictly forbidden in the RTPS spec, but
// it smells so fishy that we just drop it to avoid confusion later.
// TODO:
// Is there any valid use case for sending zero-sized fragments?
// Sending zero-sized data may be ok, but it could be sent as zero fragments of
// some positive size, or preferably as non-fragmented DATA submessage.
if fragment_size < 1 || (fragment_size as u32) > data_size {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid DataFrag. fragment_size={fragment_size} data_size={data_size} Expected 1 <= \
fragment_size <= data_size."
),
));
}
// Payload should be always present, be it data or key fragments.
let serialized_payload = buffer.clone().split_off(cursor.position() as usize);
let datafrag = Self {
reader_id,
writer_id,
writer_sn,
fragment_starting_num,
fragments_in_submessage,
data_size,
fragment_size,
inline_qos,
serialized_payload,
};
// fragment_starting_num strictly positive and must not exceed total number of
// fragments
let expected_total = datafrag.total_number_of_fragments();
if fragment_starting_num < FragmentNumber::new(1) || fragment_starting_num > expected_total {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"DataFrag fragmentStartingNum={fragment_starting_num:?} \
expected_total={expected_total:?}. Expected 1 <= fragmentStartingNum <= \
expected_total. Discarding as invalid."
),
));
}
Ok(datafrag)
}
}
impl<C: Context> Writable<C> for DataFrag {
fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
writer.write_u16(0)?; // extraflags
writer.write_u16(28)?; // See calculation of this value in deserialization above.
// We always write constant 28 here, because this implementation does not (yet)
// write any fields between sampleSize ( = data_size)
// and inline QoS. If some future protocol version adds fields there, then this
// must be changed.
writer.write_value(&self.reader_id)?;
writer.write_value(&self.writer_id)?;
writer.write_value(&self.writer_sn)?;
writer.write_value(&self.fragment_starting_num)?;
writer.write_value(&self.fragments_in_submessage)?;
writer.write_value(&self.fragment_size)?;
writer.write_value(&self.data_size)?;
if self.inline_qos.is_some() && !self.inline_qos.as_ref().unwrap().parameters.is_empty() {
writer.write_value(&self.inline_qos)?;
}
writer.write_bytes(&self.serialized_payload)?;
Ok(())
}
}
impl HasEntityIds for DataFrag {
fn receiver_entity_id(&self) -> EntityId {
self.reader_id
}
fn sender_entity_id(&self) -> EntityId {
self.writer_id
}
}