zerodds-rtps 1.0.0-rc.3

DDSI-RTPS 2.5 wire stack for ZeroDDS — submessages, writer/reader state machines, reliable + fragmentation, inline QoS, ParameterList, BuiltinTopicData. Pure-Rust no_std + alloc.
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright 2026 ZeroDDS Contributors
//! Best-Effort Stateless RTPS-Writer (W4).
//!
//! a single 1:1 model with one reader. Reliable-
//! The AckNack loop, heartbeat timer, history cache with resend logic
//! follow with phase 1.
//!
//! The writer is a pure data structure without I/O — it produces
//! datagrams as `Vec<u8>`, the caller forwards them to its transport
//! on. This separation eases testing (no fakes needed).

extern crate alloc;
use alloc::sync::Arc;
use alloc::vec::Vec;

use crate::datagram::encode_data_datagram;
use crate::error::WireError;
use crate::header::RtpsHeader;
use crate::submessages::DataSubmessage;
use crate::wire_types::{EntityId, Guid, GuidPrefix, SequenceNumber, VendorId};

/// Stateless Best-Effort Writer.
///
/// Pflegt nur den naechsten zu vergebenden `SequenceNumber`. Jeder
/// `write()` call increments the SN, builds a DATA submessage and
/// returns the finished datagram. The caller sends it via the transport.
#[derive(Debug, Clone)]
pub struct BestEffortWriter {
    guid: Guid,
    vendor_id: VendorId,
    next_sn: i64,
    /// EntityId of the single allowed reader endpoint.
    target_reader: EntityId,
}

impl BestEffortWriter {
    /// Constructs a writer.
    ///
    /// `next_sn` starts at 1 (spec convention: first valid SN).
    #[must_use]
    pub fn new(
        participant_prefix: GuidPrefix,
        writer_id: EntityId,
        target_reader: EntityId,
    ) -> Self {
        Self {
            guid: Guid::new(participant_prefix, writer_id),
            vendor_id: VendorId::ZERODDS,
            next_sn: 1,
            target_reader,
        }
    }

    /// Sets the VendorId (default `VendorId::ZERODDS`).
    pub fn set_vendor_id(&mut self, vendor: VendorId) {
        self.vendor_id = vendor;
    }

    /// Eigene GUID.
    #[must_use]
    pub fn guid(&self) -> Guid {
        self.guid
    }

    /// SequenceNumber assigned on the next `write()`.
    #[must_use]
    pub fn next_sequence_number(&self) -> SequenceNumber {
        SequenceNumber(self.next_sn)
    }

    /// Encodes a DATA submessage with `payload` and returns the
    /// finished RTPS datagram. The SequenceNumber is incremented before
    /// return.
    ///
    /// # Errors
    /// `WireError::ValueOutOfRange` if the body becomes larger than
    /// `u16::MAX` bytes.
    pub fn write(&mut self, payload: &[u8]) -> Result<Vec<u8>, WireError> {
        let sn = SequenceNumber(self.next_sn);
        self.next_sn = self
            .next_sn
            .checked_add(1)
            .ok_or(WireError::ValueOutOfRange {
                message: "writer sequence number overflow",
            })?;
        let data = DataSubmessage {
            extra_flags: 0,
            reader_id: self.target_reader,
            writer_id: self.guid.entity_id,
            writer_sn: sn,
            inline_qos: None,
            key_flag: false,
            non_standard_flag: false,
            serialized_payload: Arc::from(payload),
        };
        let header = RtpsHeader::new(self.vendor_id, self.guid.prefix);
        encode_data_datagram(header, &[data])
    }
}

#[cfg(test)]
mod tests {
    #![allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
    use super::*;
    use crate::datagram::{ParsedSubmessage, decode_datagram};

    fn writer() -> BestEffortWriter {
        BestEffortWriter::new(
            GuidPrefix::from_bytes([1; 12]),
            EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
            EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
        )
    }

    #[test]
    fn writer_starts_at_sequence_number_one() {
        let w = writer();
        assert_eq!(w.next_sequence_number(), SequenceNumber(1));
    }

    #[test]
    fn writer_increments_sequence_number_per_write() {
        let mut w = writer();
        w.write(b"a").unwrap();
        assert_eq!(w.next_sequence_number(), SequenceNumber(2));
        w.write(b"b").unwrap();
        assert_eq!(w.next_sequence_number(), SequenceNumber(3));
    }

    #[test]
    fn writer_produces_decodable_datagram() {
        let mut w = writer();
        let bytes = w.write(b"hello world").unwrap();
        let parsed = decode_datagram(&bytes).unwrap();
        assert_eq!(parsed.submessages.len(), 1);
        match &parsed.submessages[0] {
            ParsedSubmessage::Data(d) => {
                assert_eq!(d.writer_sn, SequenceNumber(1));
                assert_eq!(d.serialized_payload.as_ref(), b"hello world");
                assert_eq!(d.writer_id.entity_key, [0x10, 0x20, 0x30]);
                assert_eq!(d.reader_id.entity_key, [0xA0, 0xB0, 0xC0]);
            }
            other => panic!("expected Data, got {other:?}"),
        }
    }

    #[test]
    fn writer_sets_header_with_zerodds_vendor() {
        let mut w = writer();
        let bytes = w.write(b"x").unwrap();
        let parsed = decode_datagram(&bytes).unwrap();
        assert_eq!(parsed.header.vendor_id, VendorId::ZERODDS);
    }

    #[test]
    fn writer_set_vendor_id_overrides_default() {
        let mut w = writer();
        w.set_vendor_id(VendorId([0xAB, 0xCD]));
        let bytes = w.write(b"x").unwrap();
        let parsed = decode_datagram(&bytes).unwrap();
        assert_eq!(parsed.header.vendor_id, VendorId([0xAB, 0xCD]));
    }

    #[test]
    fn writer_sn_overflow_is_error() {
        let mut w = writer();
        w.next_sn = i64::MAX;
        let res = w.write(b"x");
        assert!(matches!(res, Err(WireError::ValueOutOfRange { .. })));
    }

    #[test]
    fn writer_three_writes_have_increasing_sn_in_decoded() {
        use alloc::format;
        let mut w = writer();
        let mut sns = Vec::new();
        for i in 0..3 {
            let bytes = w.write(format!("msg-{i}").as_bytes()).unwrap();
            let parsed = decode_datagram(&bytes).unwrap();
            if let ParsedSubmessage::Data(d) = &parsed.submessages[0] {
                sns.push(d.writer_sn);
            }
        }
        assert_eq!(
            sns,
            alloc::vec![SequenceNumber(1), SequenceNumber(2), SequenceNumber(3)]
        );
    }
}