Skip to main content

zerodds_rtps/
writer.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Best-Effort Stateless RTPS-Writer (W4).
4//!
5//! ein einziges 1:1-Modell mit einem Reader. Reliable-
6//! AckNack-Loop, Heartbeat-Timer, History-Cache mit Resend-Logik
7//! folgen mit Phase 1.
8//!
9//! Der Writer ist eine reine Datenstruktur ohne I/O — er produziert
10//! Datagramme als `Vec<u8>`, der Caller leitet sie an seinen Transport
11//! weiter. Diese Trennung erleichtert Testing (no fakes needed).
12
13extern crate alloc;
14use alloc::sync::Arc;
15use alloc::vec::Vec;
16
17use crate::datagram::encode_data_datagram;
18use crate::error::WireError;
19use crate::header::RtpsHeader;
20use crate::submessages::DataSubmessage;
21use crate::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};
22
23/// Stateless Best-Effort Writer.
24///
25/// Pflegt nur den naechsten zu vergebenden `SequenceNumber`. Jeder
26/// `write()`-Call inkrementiert die SN, baut eine DATA-Submessage und
27/// liefert das fertige Datagram. Der Caller sendet das via Transport.
28#[derive(Debug, Clone)]
29pub struct BestEffortWriter {
30    guid: Guid,
31    vendor_id: VendorId,
32    next_sn: i64,
33    /// EntityId des einen erlaubten Reader-Endpoints.
34    target_reader: EntityId,
35}
36
37impl BestEffortWriter {
38    /// Konstruiert einen Writer.
39    ///
40    /// `next_sn` startet bei 1 (Spec-Konvention: erste valid SN).
41    #[must_use]
42    pub fn new(
43        participant_prefix: GuidPrefix,
44        writer_id: EntityId,
45        target_reader: EntityId,
46    ) -> Self {
47        Self {
48            guid: Guid::new(participant_prefix, writer_id),
49            vendor_id: VendorId::ZERODDS,
50            next_sn: 1,
51            target_reader,
52        }
53    }
54
55    /// Setzt die VendorId (Default `VendorId::ZERODDS`).
56    pub fn set_vendor_id(&mut self, vendor: VendorId) {
57        self.vendor_id = vendor;
58    }
59
60    /// Eigene GUID.
61    #[must_use]
62    pub fn guid(&self) -> Guid {
63        self.guid
64    }
65
66    /// SequenceNumber, die beim naechsten `write()` vergeben wird.
67    #[must_use]
68    pub fn next_sequence_number(&self) -> SequenceNumber {
69        SequenceNumber(self.next_sn)
70    }
71
72    /// Encoded eine DATA-Submessage mit `payload` und liefert das
73    /// fertige RTPS-Datagram. SequenceNumber wird vor Return
74    /// inkrementiert.
75    ///
76    /// # Errors
77    /// `WireError::ValueOutOfRange`, wenn der Body groesser als
78    /// `u16::MAX` Bytes wird.
79    pub fn write(&mut self, payload: &[u8]) -> Result<Vec<u8>, WireError> {
80        let sn = SequenceNumber(self.next_sn);
81        self.next_sn = self
82            .next_sn
83            .checked_add(1)
84            .ok_or(WireError::ValueOutOfRange {
85                message: "writer sequence number overflow",
86            })?;
87        let data = DataSubmessage {
88            extra_flags: 0,
89            reader_id: self.target_reader,
90            writer_id: self.guid.entity_id,
91            writer_sn: sn,
92            inline_qos: None,
93            key_flag: false,
94            non_standard_flag: false,
95            serialized_payload: Arc::from(payload),
96        };
97        let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
98        encode_data_datagram(header, &[data])
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
105    use super::*;
106    use crate::datagram::{ParsedSubmessage, decode_datagram};
107
108    fn writer() -> BestEffortWriter {
109        BestEffortWriter::new(
110            GuidPrefix::from_bytes([1; 12]),
111            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
112            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
113        )
114    }
115
116    #[test]
117    fn writer_starts_at_sequence_number_one() {
118        let w = writer();
119        assert_eq!(w.next_sequence_number(), SequenceNumber(1));
120    }
121
122    #[test]
123    fn writer_increments_sequence_number_per_write() {
124        let mut w = writer();
125        w.write(b"a").unwrap();
126        assert_eq!(w.next_sequence_number(), SequenceNumber(2));
127        w.write(b"b").unwrap();
128        assert_eq!(w.next_sequence_number(), SequenceNumber(3));
129    }
130
131    #[test]
132    fn writer_produces_decodable_datagram() {
133        let mut w = writer();
134        let bytes = w.write(b"hello world").unwrap();
135        let parsed = decode_datagram(&bytes).unwrap();
136        assert_eq!(parsed.submessages.len(), 1);
137        match &parsed.submessages[0] {
138            ParsedSubmessage::Data(d) => {
139                assert_eq!(d.writer_sn, SequenceNumber(1));
140                assert_eq!(d.serialized_payload.as_ref(), b"hello world");
141                assert_eq!(d.writer_id.entity_key, [0x10, 0x20, 0x30]);
142                assert_eq!(d.reader_id.entity_key, [0xA0, 0xB0, 0xC0]);
143            }
144            other => panic!("expected Data, got {other:?}"),
145        }
146    }
147
148    #[test]
149    fn writer_sets_header_with_zerodds_vendor() {
150        let mut w = writer();
151        let bytes = w.write(b"x").unwrap();
152        let parsed = decode_datagram(&bytes).unwrap();
153        assert_eq!(parsed.header.vendor_id, VendorId::ZERODDS);
154    }
155
156    #[test]
157    fn writer_set_vendor_id_overrides_default() {
158        let mut w = writer();
159        w.set_vendor_id(VendorId([0xAB, 0xCD]));
160        let bytes = w.write(b"x").unwrap();
161        let parsed = decode_datagram(&bytes).unwrap();
162        assert_eq!(parsed.header.vendor_id, VendorId([0xAB, 0xCD]));
163    }
164
165    #[test]
166    fn writer_sn_overflow_is_error() {
167        let mut w = writer();
168        w.next_sn = i64::MAX;
169        let res = w.write(b"x");
170        assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
171    }
172
173    #[test]
174    fn writer_three_writes_have_increasing_sn_in_decoded() {
175        use alloc::format;
176        let mut w = writer();
177        let mut sns = Vec::new();
178        for i in 0..3 {
179            let bytes = w.write(format!("msg-{i}").as_bytes()).unwrap();
180            let parsed = decode_datagram(&bytes).unwrap();
181            if let ParsedSubmessage::Data(d) = &parsed.submessages[0] {
182                sns.push(d.writer_sn);
183            }
184        }
185        assert_eq!(
186            sns,
187            alloc::vec![SequenceNumber(1), SequenceNumber(2), SequenceNumber(3)]
188        );
189    }
190}