Skip to main content

zerodds_rtps/
message_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! MessageBuilder — Submessage-Aggregation in ein UDP-Datagramm.
4//!
5//! Analog zu Fast-DDS `RTPSMessageGroup` (Recherche WP 1.4). Der Writer
6//! oeffnet einen Builder pro Ziel-Locator-Set, haengt mehrere Submessages
7//! an, und finalisiert zu einem [`OutboundDatagram`]. Aggregation spart
8//! RTPS-Header + UDP-Overhead bei SEDP-Announce-all-Runden und kleinen
9//! Samples.
10//!
11//! # Flush-Regeln
12//!
13//! 1. **Size-Trigger**: `try_add_submessage` lehnt ab, wenn der Body
14//!    nicht mehr ins MTU passt. Caller muss `finish()` + neuen Builder.
15//! 2. **DATA_FRAG geht alleine**: Aufrufer soll DATA_FRAG-Submessages
16//!    nicht mit anderen bundlen (Fragment ist typisch MTU-nah).
17//! 3. **Piggyback-HEARTBEAT am Ende**: Aufrufer haengt HB nach allen
18//!    DATAs an, vor `finish()`.
19//! 4. **Kein INFO_DST**: Phase 1 baut ein Datagramm pro Proxy — GuidPrefix
20//!    ist statisch. INFO_DST wird fuer Multicast-Fan-out mit gemischten
21//!    Zielen in Phase 2 ergaenzt.
22//! 5. **Kein INFO_TS**: Writer hat heute keine Source-Timestamps.
23//!
24//! # Ziel-Locators
25//!
26//! Ein Datagramm geht an **alle** `targets`. Typisch: die Unicast-Locators
27//! des Remote-Readers, oder ein Multicast-Locator. Transport-Layer kippt
28//! es einmal pro Locator auf die Leitung.
29
30extern crate alloc;
31use alloc::rc::Rc;
32use alloc::vec::Vec;
33
34use crate::header::RtpsHeader;
35use crate::submessage_header::{FLAG_E_LITTLE_ENDIAN, SubmessageHeader, SubmessageId};
36use crate::wire_types::Locator;
37
38/// Default-MTU fuer Aggregation (Ethernet 1500 − 20 IP − 8 UDP).
39pub const DEFAULT_MTU: usize = 1472;
40
41/// Ein fertig aggregiertes Datagramm mit Zielen.
42///
43/// `targets` ist als `Rc<Vec<Locator>>` geteilt, um Allocation-Overhead
44/// bei Multi-Reader-tick-Loops zu vermeiden — der gleiche Proxy-
45/// Locator-Set wird ueber alle Submessages eines Proxies wiederverwendet.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub struct OutboundDatagram {
48    /// Wire-Bytes (RTPS-Header + N Submessages).
49    pub bytes: Vec<u8>,
50    /// Ziel-Locators. Transport-Layer sendet an alle.
51    pub targets: Rc<Vec<Locator>>,
52}
53
54/// Grund, warum [`MessageBuilder::try_add_submessage`] ablehnt.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum AddError {
57    /// Submessage passt nicht mehr ins MTU-Budget.
58    WouldExceedMtu {
59        /// Byte-Anzahl, die nicht mehr passt (inkl. Submessage-Header).
60        needed: usize,
61        /// Verbleibendes Budget.
62        remaining: usize,
63    },
64    /// Submessage-Body > u16::MAX (Wire-Feld `octetsToNextHeader`).
65    BodyTooLarge,
66}
67
68/// Submessage-Aggregator.
69///
70/// Wird bei `open` mit einer vor-allokierten Byte-Liste beginnend beim
71/// RTPS-Header initialisiert. Submessages werden per `try_add_submessage`
72/// angehaengt; bei Full muss Caller `finish()` + neuen Builder.
73#[derive(Debug)]
74pub struct MessageBuilder {
75    bytes: Vec<u8>,
76    targets: Rc<Vec<Locator>>,
77    mtu: usize,
78    submsg_count: usize,
79}
80
81impl MessageBuilder {
82    /// Oeffnet einen neuen Builder mit gegebenem RTPS-Header,
83    /// Zielen und MTU-Budget.
84    ///
85    /// Panics: wenn `mtu` kleiner als der RTPS-Header (20 Byte).
86    #[must_use]
87    pub fn open(header: RtpsHeader, targets: Rc<Vec<Locator>>, mtu: usize) -> Self {
88        assert!(
89            mtu >= 20,
90            "MTU must accommodate at least the 20-byte RTPS header"
91        );
92        let mut bytes = Vec::with_capacity(mtu);
93        bytes.extend_from_slice(&header.to_bytes());
94        Self {
95            bytes,
96            targets,
97            mtu,
98            submsg_count: 0,
99        }
100    }
101
102    /// Anzahl bisher eingefuegter Submessages.
103    #[must_use]
104    pub fn submsg_count(&self) -> usize {
105        self.submsg_count
106    }
107
108    /// True wenn der Builder nur den RTPS-Header enthaelt.
109    #[must_use]
110    pub fn is_empty(&self) -> bool {
111        self.submsg_count == 0
112    }
113
114    /// Aktuelle Gesamt-Byte-Zahl (Header + bereits angehaengte
115    /// Submessages).
116    #[must_use]
117    pub fn len(&self) -> usize {
118        self.bytes.len()
119    }
120
121    /// Verbleibendes Budget in Bytes.
122    #[must_use]
123    pub fn remaining(&self) -> usize {
124        self.mtu.saturating_sub(self.bytes.len())
125    }
126
127    /// Versucht, eine Submessage anzuhaengen. Liefert
128    /// [`AddError::WouldExceedMtu`], wenn sie nicht mehr reinpasst —
129    /// dann ist `finish()` + neuer Builder faellig.
130    ///
131    /// `flags` enthaelt nur die **submessage-spezifischen** Flags
132    /// (F, L, Q, H, K, N etc.). Das E-Bit (Little-Endian) setzt der
133    /// Builder selbst, konsistent fuer das ganze Datagramm.
134    ///
135    /// # Errors
136    /// - [`AddError::WouldExceedMtu`] bei Size-Overflow.
137    /// - [`AddError::BodyTooLarge`] wenn `body.len() > u16::MAX`.
138    pub fn try_add_submessage(
139        &mut self,
140        id: SubmessageId,
141        flags: u8,
142        body: &[u8],
143    ) -> Result<(), AddError> {
144        let body_len = u16::try_from(body.len()).map_err(|_| AddError::BodyTooLarge)?;
145        let needed = SubmessageHeader::WIRE_SIZE + body.len();
146        if self.bytes.len() + needed > self.mtu {
147            return Err(AddError::WouldExceedMtu {
148                needed,
149                remaining: self.remaining(),
150            });
151        }
152        let sh = SubmessageHeader {
153            submessage_id: id,
154            flags: flags | FLAG_E_LITTLE_ENDIAN,
155            octets_to_next_header: body_len,
156        };
157        self.bytes.extend_from_slice(&sh.to_bytes());
158        self.bytes.extend_from_slice(body);
159        self.submsg_count += 1;
160        Ok(())
161    }
162
163    /// Wandelt in ein fertiges [`OutboundDatagram`] um.
164    ///
165    /// Liefert `None` bei leerem Builder (nur RTPS-Header ohne
166    /// Submessages) — das erlaubt Aufrufern, unbenutzte Builder
167    /// einfach zu verwerfen, ohne vorher `is_empty()` pruefen zu
168    /// muessen.
169    #[must_use]
170    pub fn finish(self) -> Option<OutboundDatagram> {
171        if self.submsg_count == 0 {
172            return None;
173        }
174        Some(OutboundDatagram {
175            bytes: self.bytes,
176            targets: self.targets,
177        })
178    }
179}
180
181#[cfg(test)]
182#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
183mod tests {
184    use super::*;
185    use crate::datagram::{ParsedSubmessage, decode_datagram};
186    use crate::submessages::{DataSubmessage, HeartbeatSubmessage};
187    use crate::wire_types::{EntityId, GuidPrefix, Locator, SequenceNumber, VendorId};
188
189    fn sample_header() -> RtpsHeader {
190        RtpsHeader::new(VendorId::ZERODDS, GuidPrefix::from_bytes([1; 12]))
191    }
192
193    fn sample_data(sn: i64, payload_len: usize) -> DataSubmessage {
194        DataSubmessage {
195            extra_flags: 0,
196            reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
197            writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
198            writer_sn: SequenceNumber(sn),
199            inline_qos: None,
200            key_flag: false,
201            non_standard_flag: false,
202            serialized_payload: alloc::sync::Arc::from(alloc::vec![0xAB; payload_len]),
203        }
204    }
205
206    fn targets() -> Rc<Vec<Locator>> {
207        Rc::new(alloc::vec![Locator::udp_v4([127, 0, 0, 1], 7400)])
208    }
209
210    #[test]
211    fn fresh_builder_contains_only_rtps_header() {
212        let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
213        assert!(b.is_empty());
214        assert_eq!(b.len(), 20, "only RTPS header");
215        assert_eq!(b.submsg_count(), 0);
216        assert_eq!(b.remaining(), DEFAULT_MTU - 20);
217    }
218
219    #[test]
220    fn single_data_submessage_fits_and_decodes() {
221        let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
222        let (body, flags) = sample_data(1, 10).write_body(true);
223        b.try_add_submessage(SubmessageId::Data, flags, &body)
224            .unwrap();
225        let dg = b.finish().unwrap();
226        assert_eq!(dg.targets.len(), 1);
227        let parsed = decode_datagram(&dg.bytes).unwrap();
228        assert_eq!(parsed.submessages.len(), 1);
229        assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
230    }
231
232    #[test]
233    fn four_small_datas_aggregate_into_one_datagram() {
234        let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
235        for sn in 1..=4i64 {
236            let (body, flags) = sample_data(sn, 10).write_body(true);
237            b.try_add_submessage(SubmessageId::Data, flags, &body)
238                .unwrap();
239        }
240        let dg = b.finish().unwrap();
241        let parsed = decode_datagram(&dg.bytes).unwrap();
242        // 4 DATA-Submessages in einem Datagramm
243        let data_count = parsed
244            .submessages
245            .iter()
246            .filter(|s| matches!(s, ParsedSubmessage::Data(_)))
247            .count();
248        assert_eq!(data_count, 4);
249    }
250
251    #[test]
252    fn overflow_rejects_with_would_exceed_mtu() {
253        let mtu = 100; // sehr klein
254        let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
255        // 1 DATA mit 50-Byte-Payload passt (20 hdr + 4 sub_hdr + 20 body + 50 payload = 94)
256        let (body, flags) = sample_data(1, 50).write_body(true);
257        b.try_add_submessage(SubmessageId::Data, flags, &body)
258            .unwrap();
259        // 2. DATA wuerde sprengen
260        let (body2, flags2) = sample_data(2, 50).write_body(true);
261        let res = b.try_add_submessage(SubmessageId::Data, flags2, &body2);
262        assert!(matches!(res, Err(AddError::WouldExceedMtu { .. })));
263        assert_eq!(b.submsg_count(), 1, "first add must still be counted");
264    }
265
266    #[test]
267    fn overflow_allows_caller_to_open_new_builder() {
268        let mtu = 100;
269        let (body, flags) = sample_data(1, 50).write_body(true);
270        let mut out: Vec<OutboundDatagram> = Vec::new();
271        let mut b = MessageBuilder::open(sample_header(), targets(), mtu);
272
273        for sn in 1..=3i64 {
274            let (body_n, flags_n) = sample_data(sn, 50).write_body(true);
275            if b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
276                .is_err()
277            {
278                out.push(b.finish().unwrap());
279                b = MessageBuilder::open(sample_header(), targets(), mtu);
280                b.try_add_submessage(SubmessageId::Data, flags_n, &body_n)
281                    .unwrap();
282            }
283        }
284        if !b.is_empty() {
285            out.push(b.finish().unwrap());
286        }
287        let _ = flags;
288        let _ = body;
289        // 3 DATAs, je 1 pro Datagramm (weil MTU 100 nur 1 passt)
290        assert_eq!(out.len(), 3);
291    }
292
293    #[test]
294    fn finish_on_empty_builder_returns_none() {
295        let b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
296        assert!(b.finish().is_none());
297    }
298
299    #[test]
300    fn piggyback_heartbeat_after_data_aggregates() {
301        let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
302        let (body, flags) = sample_data(1, 10).write_body(true);
303        b.try_add_submessage(SubmessageId::Data, flags, &body)
304            .unwrap();
305        let hb = HeartbeatSubmessage {
306            reader_id: EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
307            writer_id: EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
308            first_sn: SequenceNumber(1),
309            last_sn: SequenceNumber(1),
310            count: 1,
311            final_flag: true,
312            liveliness_flag: false,
313            group_info: None,
314        };
315        let (hb_body, hb_flags) = hb.write_body(true);
316        b.try_add_submessage(SubmessageId::Heartbeat, hb_flags, &hb_body)
317            .unwrap();
318        let dg = b.finish().unwrap();
319        let parsed = decode_datagram(&dg.bytes).unwrap();
320        assert_eq!(parsed.submessages.len(), 2);
321        assert!(matches!(&parsed.submessages[0], ParsedSubmessage::Data(_)));
322        assert!(matches!(
323            &parsed.submessages[1],
324            ParsedSubmessage::Heartbeat(h) if h.final_flag
325        ));
326    }
327
328    #[test]
329    fn builder_propagates_little_endian_flag_e() {
330        // Wir schreiben eine DATA mit Body-LE. Der Builder soll
331        // automatisch das E-Bit im Submessage-Header setzen.
332        let mut b = MessageBuilder::open(sample_header(), targets(), DEFAULT_MTU);
333        let (body, _flags_from_write) = sample_data(1, 10).write_body(true);
334        // Caller uebergibt flags ohne E-Bit; Builder muss es setzen.
335        b.try_add_submessage(SubmessageId::Data, 0, &body).unwrap();
336        let dg = b.finish().unwrap();
337        // Submessage-Header-Byte 1 (Flags) muss E-Bit gesetzt haben
338        let sub_header_flags = dg.bytes[21]; // 20 bytes RTPS header + 1 byte id
339        assert_eq!(
340            sub_header_flags & FLAG_E_LITTLE_ENDIAN,
341            FLAG_E_LITTLE_ENDIAN
342        );
343    }
344
345    #[test]
346    #[should_panic(expected = "MTU must accommodate")]
347    fn open_panics_on_mtu_below_header() {
348        let _ = MessageBuilder::open(sample_header(), targets(), 10);
349    }
350
351    #[test]
352    fn body_too_large_rejected() {
353        let mut b = MessageBuilder::open(sample_header(), targets(), 100_000);
354        let oversize = alloc::vec![0u8; u16::MAX as usize + 1];
355        let res = b.try_add_submessage(SubmessageId::Data, 0, &oversize);
356        assert!(matches!(res, Err(AddError::BodyTooLarge)));
357    }
358}