1use super::types::{EntityId, SequenceNumber};
12use crate::error::{Error, Result};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[repr(u8)]
17pub enum SubmessageKind {
18 Data = 0x15,
20 Heartbeat = 0x07,
22 AckNack = 0x06,
24}
25
26impl SubmessageKind {
27 pub fn from_u8(value: u8) -> Option<Self> {
29 match value {
30 0x15 => Some(Self::Data),
31 0x07 => Some(Self::Heartbeat),
32 0x06 => Some(Self::AckNack),
33 _ => None,
34 }
35 }
36}
37
38const _: () = {
40 assert!(
41 SubmessageKind::Data as u8 == 0x15,
42 "DATA submessage ID must be 0x15"
43 );
44 assert!(
45 SubmessageKind::Heartbeat as u8 == 0x07,
46 "HEARTBEAT submessage ID must be 0x07"
47 );
48 assert!(
49 SubmessageKind::AckNack as u8 == 0x06,
50 "ACKNACK submessage ID must be 0x06"
51 );
52};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub struct SubmessageFlags(pub u8);
59
60impl SubmessageFlags {
61 pub const LITTLE_ENDIAN: u8 = 0x01;
63
64 pub const fn is_little_endian(&self) -> bool {
66 self.0 & Self::LITTLE_ENDIAN != 0
67 }
68
69 pub const fn little_endian() -> Self {
71 Self(Self::LITTLE_ENDIAN)
72 }
73
74 pub const fn big_endian() -> Self {
76 Self(0)
77 }
78}
79
80impl Default for SubmessageFlags {
81 fn default() -> Self {
82 Self::little_endian()
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub struct SubmessageHeader {
90 pub kind: SubmessageKind,
92 pub flags: SubmessageFlags,
94 pub octets_to_next: u16,
96}
97
98impl SubmessageHeader {
99 pub const SIZE: usize = 4;
101
102 pub const fn new(kind: SubmessageKind, flags: SubmessageFlags, octets_to_next: u16) -> Self {
104 Self {
105 kind,
106 flags,
107 octets_to_next,
108 }
109 }
110
111 pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
113 if buf.len() < Self::SIZE {
114 return Err(Error::BufferTooSmall);
115 }
116
117 buf[0] = match self.kind {
119 SubmessageKind::Data => 0x15,
120 SubmessageKind::Heartbeat => 0x07,
121 SubmessageKind::AckNack => 0x06,
122 };
123 buf[1] = self.flags.0;
124
125 buf[2] = (self.octets_to_next & 0xff) as u8;
127 buf[3] = ((self.octets_to_next >> 8) & 0xff) as u8;
128
129 Ok(Self::SIZE)
130 }
131
132 pub fn decode(buf: &[u8]) -> Result<Self> {
134 if buf.len() < Self::SIZE {
135 return Err(Error::BufferTooSmall);
136 }
137
138 let kind = SubmessageKind::from_u8(buf[0]).ok_or(Error::InvalidSubmessage)?;
139 let flags = SubmessageFlags(buf[1]);
140
141 let octets_to_next = u16::from_le_bytes([buf[2], buf[3]]);
143
144 Ok(Self {
145 kind,
146 flags,
147 octets_to_next,
148 })
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
156pub struct Data {
157 pub reader_id: EntityId,
159 pub writer_id: EntityId,
161 pub writer_sn: SequenceNumber,
163}
164
165impl Data {
166 pub const MIN_SIZE: usize = 24;
169
170 pub const fn new(reader_id: EntityId, writer_id: EntityId, writer_sn: SequenceNumber) -> Self {
172 Self {
173 reader_id,
174 writer_id,
175 writer_sn,
176 }
177 }
178
179 pub fn encode_header(&self, buf: &mut [u8]) -> Result<usize> {
191 if buf.len() < Self::MIN_SIZE {
192 return Err(Error::BufferTooSmall);
193 }
194
195 let header = SubmessageHeader::new(
199 SubmessageKind::Data,
200 SubmessageFlags(0x05), 20, );
203 header.encode(&mut buf[0..4])?;
204
205 buf[4] = 0x00;
207 buf[5] = 0x00;
208
209 buf[6] = 0x10; buf[7] = 0x00;
213
214 buf[8..12].copy_from_slice(self.reader_id.as_bytes());
216
217 buf[12..16].copy_from_slice(self.writer_id.as_bytes());
219
220 let sn = self.writer_sn.value();
223 let sn_high = (sn >> 32) as i32;
224 let sn_low = sn as u32;
225 buf[16..20].copy_from_slice(&sn_high.to_le_bytes());
226 buf[20..24].copy_from_slice(&sn_low.to_le_bytes());
227
228 Ok(Self::MIN_SIZE)
229 }
230
231 pub fn decode(buf: &[u8]) -> Result<(Self, usize)> {
237 if buf.len() < Self::MIN_SIZE {
238 return Err(Error::BufferTooSmall);
239 }
240
241 let header = SubmessageHeader::decode(&buf[0..4])?;
243 if header.kind != SubmessageKind::Data {
244 return Err(Error::InvalidSubmessage);
245 }
246
247 let octets_to_inline_qos = u16::from_le_bytes([buf[6], buf[7]]) as usize;
250
251 let mut reader_id_bytes = [0u8; 4];
253 reader_id_bytes.copy_from_slice(&buf[8..12]);
254 let reader_id = EntityId::new(reader_id_bytes);
255
256 let mut writer_id_bytes = [0u8; 4];
258 writer_id_bytes.copy_from_slice(&buf[12..16]);
259 let writer_id = EntityId::new(writer_id_bytes);
260
261 let sn_high = i32::from_le_bytes([buf[16], buf[17], buf[18], buf[19]]);
264 let sn_low = u32::from_le_bytes([buf[20], buf[21], buf[22], buf[23]]);
265 let sn_value = ((sn_high as i64) << 32) | (sn_low as i64);
266 let writer_sn = SequenceNumber::new(sn_value);
267
268 let data = Self {
269 reader_id,
270 writer_id,
271 writer_sn,
272 };
273
274 let payload_offset = 8 + octets_to_inline_qos;
277
278 Ok((data, payload_offset))
279 }
280}
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
286pub struct Heartbeat {
287 pub reader_id: EntityId,
289 pub writer_id: EntityId,
291 pub first_sn: SequenceNumber,
293 pub last_sn: SequenceNumber,
295 pub count: u32,
297}
298
299impl Heartbeat {
300 pub const SIZE: usize = 32; pub const fn new(
305 reader_id: EntityId,
306 writer_id: EntityId,
307 first_sn: SequenceNumber,
308 last_sn: SequenceNumber,
309 count: u32,
310 ) -> Self {
311 Self {
312 reader_id,
313 writer_id,
314 first_sn,
315 last_sn,
316 count,
317 }
318 }
319
320 pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
322 if buf.len() < Self::SIZE {
323 return Err(Error::BufferTooSmall);
324 }
325
326 let header =
328 SubmessageHeader::new(SubmessageKind::Heartbeat, SubmessageFlags::default(), 28);
329 header.encode(&mut buf[0..4])?;
330
331 buf[4..8].copy_from_slice(self.reader_id.as_bytes());
333
334 buf[8..12].copy_from_slice(self.writer_id.as_bytes());
336
337 let first_sn_bytes = self.first_sn.value().to_le_bytes();
339 buf[12..20].copy_from_slice(&first_sn_bytes);
340
341 let last_sn_bytes = self.last_sn.value().to_le_bytes();
343 buf[20..28].copy_from_slice(&last_sn_bytes);
344
345 let count_bytes = self.count.to_le_bytes();
347 buf[28..32].copy_from_slice(&count_bytes);
348
349 Ok(Self::SIZE)
350 }
351
352 pub fn decode(buf: &[u8]) -> Result<Self> {
354 if buf.len() < Self::SIZE {
355 return Err(Error::BufferTooSmall);
356 }
357
358 let header = SubmessageHeader::decode(&buf[0..4])?;
360 if header.kind != SubmessageKind::Heartbeat {
361 return Err(Error::InvalidSubmessage);
362 }
363
364 let mut reader_id_bytes = [0u8; 4];
366 reader_id_bytes.copy_from_slice(&buf[4..8]);
367 let reader_id = EntityId::new(reader_id_bytes);
368
369 let mut writer_id_bytes = [0u8; 4];
371 writer_id_bytes.copy_from_slice(&buf[8..12]);
372 let writer_id = EntityId::new(writer_id_bytes);
373
374 let mut first_sn_bytes = [0u8; 8];
376 first_sn_bytes.copy_from_slice(&buf[12..20]);
377 let first_sn = SequenceNumber::new(i64::from_le_bytes(first_sn_bytes));
378
379 let mut last_sn_bytes = [0u8; 8];
381 last_sn_bytes.copy_from_slice(&buf[20..28]);
382 let last_sn = SequenceNumber::new(i64::from_le_bytes(last_sn_bytes));
383
384 let mut count_bytes = [0u8; 4];
386 count_bytes.copy_from_slice(&buf[28..32]);
387 let count = u32::from_le_bytes(count_bytes);
388
389 Ok(Self {
390 reader_id,
391 writer_id,
392 first_sn,
393 last_sn,
394 count,
395 })
396 }
397}
398
399#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub struct AckNack {
404 pub reader_id: EntityId,
406 pub writer_id: EntityId,
408 pub reader_sn_state_base: SequenceNumber,
410 pub count: u32,
412}
413
414impl AckNack {
415 pub const MIN_SIZE: usize = 24; pub const fn new(
420 reader_id: EntityId,
421 writer_id: EntityId,
422 reader_sn_state_base: SequenceNumber,
423 count: u32,
424 ) -> Self {
425 Self {
426 reader_id,
427 writer_id,
428 reader_sn_state_base,
429 count,
430 }
431 }
432
433 pub fn encode(&self, buf: &mut [u8]) -> Result<usize> {
435 if buf.len() < Self::MIN_SIZE {
436 return Err(Error::BufferTooSmall);
437 }
438
439 let header = SubmessageHeader::new(SubmessageKind::AckNack, SubmessageFlags::default(), 20);
441 header.encode(&mut buf[0..4])?;
442
443 buf[4..8].copy_from_slice(self.reader_id.as_bytes());
445
446 buf[8..12].copy_from_slice(self.writer_id.as_bytes());
448
449 let base_sn_bytes = self.reader_sn_state_base.value().to_le_bytes();
451 buf[12..20].copy_from_slice(&base_sn_bytes);
452
453 let count_bytes = self.count.to_le_bytes();
455 buf[20..24].copy_from_slice(&count_bytes);
456
457 Ok(Self::MIN_SIZE)
458 }
459
460 pub fn decode(buf: &[u8]) -> Result<Self> {
462 if buf.len() < Self::MIN_SIZE {
463 return Err(Error::BufferTooSmall);
464 }
465
466 let header = SubmessageHeader::decode(&buf[0..4])?;
468 if header.kind != SubmessageKind::AckNack {
469 return Err(Error::InvalidSubmessage);
470 }
471
472 let mut reader_id_bytes = [0u8; 4];
474 reader_id_bytes.copy_from_slice(&buf[4..8]);
475 let reader_id = EntityId::new(reader_id_bytes);
476
477 let mut writer_id_bytes = [0u8; 4];
479 writer_id_bytes.copy_from_slice(&buf[8..12]);
480 let writer_id = EntityId::new(writer_id_bytes);
481
482 let mut base_sn_bytes = [0u8; 8];
484 base_sn_bytes.copy_from_slice(&buf[12..20]);
485 let reader_sn_state_base = SequenceNumber::new(i64::from_le_bytes(base_sn_bytes));
486
487 let mut count_bytes = [0u8; 4];
489 count_bytes.copy_from_slice(&buf[20..24]);
490 let count = u32::from_le_bytes(count_bytes);
491
492 Ok(Self {
493 reader_id,
494 writer_id,
495 reader_sn_state_base,
496 count,
497 })
498 }
499}
500
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
503pub enum Submessage {
504 Data(Data),
506 Heartbeat(Heartbeat),
508 AckNack(AckNack),
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515
516 #[test]
517 fn test_submessage_header_encode_decode() {
518 let header =
519 SubmessageHeader::new(SubmessageKind::Data, SubmessageFlags::little_endian(), 100);
520
521 let mut buf = [0u8; 16];
522 header.encode(&mut buf).unwrap();
523
524 let decoded = SubmessageHeader::decode(&buf).unwrap();
525 assert_eq!(decoded.kind, SubmessageKind::Data);
526 assert_eq!(decoded.octets_to_next, 100);
527 }
528
529 #[test]
530 fn test_data_encode_decode() {
531 let data = Data::new(
532 EntityId::new([0, 0, 0, 1]),
533 EntityId::new([0, 0, 0, 2]),
534 SequenceNumber::new(42),
535 );
536
537 let mut buf = [0u8; 64];
538 let written = data.encode_header(&mut buf).unwrap();
539 assert_eq!(written, Data::MIN_SIZE);
540
541 let (decoded, offset) = Data::decode(&buf).unwrap();
542 assert_eq!(decoded, data);
543 assert_eq!(offset, Data::MIN_SIZE);
544 }
545
546 #[test]
547 fn test_heartbeat_encode_decode() {
548 let hb = Heartbeat::new(
549 EntityId::new([0, 0, 0, 1]),
550 EntityId::new([0, 0, 0, 2]),
551 SequenceNumber::new(1),
552 SequenceNumber::new(10),
553 5,
554 );
555
556 let mut buf = [0u8; 64];
557 let written = hb.encode(&mut buf).unwrap();
558 assert_eq!(written, Heartbeat::SIZE);
559
560 let decoded = Heartbeat::decode(&buf).unwrap();
561 assert_eq!(decoded, hb);
562 }
563
564 #[test]
565 fn test_acknack_encode_decode() {
566 let acknack = AckNack::new(
567 EntityId::new([0, 0, 0, 1]),
568 EntityId::new([0, 0, 0, 2]),
569 SequenceNumber::new(5),
570 3,
571 );
572
573 let mut buf = [0u8; 64];
574 let written = acknack.encode(&mut buf).unwrap();
575 assert_eq!(written, AckNack::MIN_SIZE);
576
577 let decoded = AckNack::decode(&buf).unwrap();
578 assert_eq!(decoded, acknack);
579 }
580}