rustdds/messages/submessages/
data.rs1use std::io;
2
3use bytes::Bytes;
4use speedy::{Context, Error, Readable, Writable, Writer};
5use enumflags2::BitFlags;
6
7use crate::{
8 messages::submessages::{elements::parameter_list::ParameterList, submessages::*},
9 serialization::{padding_needed_for_alignment_4, round_up_to_4},
10 structure::{guid::EntityId, sequence_number::SequenceNumber},
11};
12#[cfg(test)]
14use super::elements::serialized_payload::SerializedPayload;
15
16#[derive(Debug, PartialEq, Eq, Clone)]
24pub struct Data {
25 pub reader_id: EntityId,
28
29 pub writer_id: EntityId,
32
33 pub writer_sn: SequenceNumber,
38
39 pub inline_qos: Option<ParameterList>,
42
43 pub serialized_payload: Option<Bytes>,
51}
52
53impl Data {
54 pub fn deserialize_data(buffer: &Bytes, flags: BitFlags<DATA_Flags>) -> io::Result<Self> {
58 let mut cursor = io::Cursor::new(&buffer);
59 let endianness = endianness_flag(flags.bits());
60 let map_speedy_err = |p: Error| io::Error::new(io::ErrorKind::Other, p);
61
62 let _extra_flags =
63 u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
64 let octets_to_inline_qos =
65 u16::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor).map_err(map_speedy_err)?;
66 let reader_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
67 .map_err(map_speedy_err)?;
68 let writer_id = EntityId::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
69 .map_err(map_speedy_err)?;
70 let sequence_number =
71 SequenceNumber::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
72 .map_err(map_speedy_err)?;
73
74 let expect_qos = flags.contains(DATA_Flags::InlineQos);
75 let expect_data = flags.contains(DATA_Flags::Data) || flags.contains(DATA_Flags::Key);
76
77 let rtps_v23_data_header_size: u16 = 16;
82 if octets_to_inline_qos < rtps_v23_data_header_size {
87 return Err(io::Error::new(
88 io::ErrorKind::InvalidData,
89 format!("DATA submessage has invalid octets_to_inline_qos={octets_to_inline_qos}."),
90 ));
91 }
92
93 if octets_to_inline_qos > rtps_v23_data_header_size {
96 let extra_octets = octets_to_inline_qos - rtps_v23_data_header_size;
97 cursor.set_position(cursor.position() + u64::from(extra_octets));
103
104 if cursor.position() > buffer.len().try_into().unwrap() {
105 return Err(io::Error::new(
108 io::ErrorKind::InvalidData,
109 format!(
110 "DATA submessage octets_to_inline_qos points to byte {}, but message len={}.",
111 cursor.position(),
112 buffer.len()
113 ),
114 ));
115 }
116 }
117
118 let parameter_list = if expect_qos {
120 Some(
121 ParameterList::read_from_stream_unbuffered_with_ctx(endianness, &mut cursor)
122 .map_err(map_speedy_err)?,
123 )
124 } else {
125 None
126 };
127
128 let serialized_payload = if expect_data {
132 Some(buffer.clone().split_off(cursor.position() as usize))
133 } else {
134 None
135 };
136
137 Ok(Self {
138 reader_id,
139 writer_id,
140 writer_sn: sequence_number,
141 inline_qos: parameter_list,
142 serialized_payload,
143 })
144 }
145
146 pub fn len_serialized(&self) -> usize {
151 round_up_to_4(
152 2 + 2 + 4 + 4 + 8 + self.inline_qos.as_ref().map(|q| q.len_serialized() ).unwrap_or(0) + self.serialized_payload.as_ref().map(|q| q.len()).unwrap_or(0),
159 )
160 }
161
162 #[cfg(test)]
163 pub(crate) fn unwrap_serialized_payload(&self) -> SerializedPayload {
164 self
165 .serialized_payload
166 .as_ref()
167 .map(SerializedPayload::from_bytes)
168 .unwrap()
169 .unwrap()
170 }
171
172 #[cfg(test)]
173 pub(crate) fn unwrap_serialized_payload_value(&self) -> Bytes {
174 self.unwrap_serialized_payload().value
175 }
176
177 #[cfg(test)]
178 pub(crate) fn update_serialized_payload_value(&mut self, new_value: Bytes) {
179 let mut payload = self.unwrap_serialized_payload();
180 payload.value = new_value;
181 self.serialized_payload = Some(payload.into());
182 }
183}
184
185impl<C: Context> Writable<C> for Data {
186 fn write_to<T: ?Sized + Writer<C>>(&self, writer: &mut T) -> Result<(), C::Error> {
187 writer.write_u16(0)?;
190 writer.write_u16(16)?;
196
197 writer.write_value(&self.reader_id)?;
198 writer.write_value(&self.writer_id)?;
199 writer.write_value(&self.writer_sn)?;
200 if let Some(inline_qos) = self.inline_qos.as_ref() {
201 writer.write_value(inline_qos)?;
202 }
203
204 if let Some(serialized_payload) = self.serialized_payload.as_ref() {
205 writer.write_bytes(serialized_payload)?;
206 for _ in 0..padding_needed_for_alignment_4(serialized_payload.len()) {
207 writer.write_u8(0)?;
208 }
209 }
210
211 Ok(())
212 }
213}
214
215impl HasEntityIds for Data {
216 fn receiver_entity_id(&self) -> EntityId {
217 self.reader_id
218 }
219 fn sender_entity_id(&self) -> EntityId {
220 self.writer_id
221 }
222}