zerodds-rtps
Writer/reader state machines, RTPS submessages, wire-format encoding.
Part of ZeroDDS. Safety class SAFE —
forbid(unsafe_code), no_std + alloc.
DDSI-RTPS 2.5 — fully spec-conformant (K3b audit completed 2026-04-28: 121 done / 0 partial / 0 open / 3 n/a).
Quick start (E2E with UDP)
use Ipv4Addr;
use BestEffortReader;
use BestEffortWriter;
use ;
use Transport;
use UdpTransport;
let prefix = from_bytes;
let writer_id = user_writer_with_key;
let reader_id = user_reader_with_key;
let writer_xport = bind_v4?;
let reader_xport = bind_v4?;
let dest = reader_xport.local_locator;
let mut writer = new;
let reader = new;
let datagram = writer.write?;
writer_xport.send?;
let received = reader_xport.recv?;
let samples = reader.recv_datagram?;
assert_eq!;
# Ok::
Modules
| Module | Purpose |
|---|---|
error |
WireError variants |
wire_types |
Guid, EntityId, SequenceNumber, Locator, ProtocolVersion, VendorId |
header |
RtpsHeader (20B) + RTPS_MAGIC |
submessage_header |
SubmessageHeader (4B) + SubmessageId enum |
submessages |
DATA, DATA_FRAG, HEARTBEAT, HEARTBEAT_FRAG, ACKNACK, NACK_FRAG, GAP, INFO_TS, INFO_SRC, INFO_DST, INFO_REPLY + SequenceNumberSet |
datagram |
encode/decode RTPS messages, ParsedSubmessage iteration |
writer |
BestEffortWriter (1:1, stateless) |
reader |
BestEffortReader (1:1, with wildcard match) |
history_cache |
Ordered CacheChange store (BTreeMap) + atomic stats + LockFreeReadHistoryCache |
reader_proxy / writer_proxy |
Per-endpoint state |
reliable_writer / reliable_reader |
Reliable state machines, tick-driven |
reliable_stateless_writer |
Stateless writer variant for SPDP |
fragment_assembler |
Reader-side reassembly with DoS caps |
participant_security_info |
PID 0x1005 (DDS-Security 1.2 §7.4.1.6) |
message_builder |
OutboundDatagram aggregation per send tick |
Reliable-Quickstart
use Duration;
use ;
let mut w = new;
let dgs = w.write?;
for dg in dgs
loop
Lock-Free Read-Path & Per-Slot Mutex
The history-cache module is built lock-free in three layers:
- Atomic stats —
HistoryCacheStatswithAtomicUsize/AtomicI64forlen/evicted/max_sn/min_sn. Monitoring threads poll viacache.stats() -> Arc<HistoryCacheStats>without taking the writer lock. - Per-endpoint mutex —
dcps::user_writers/user_readersuseRwLock<BTreeMap<EntityId, Arc<Mutex<Slot>>>>: a separate mutex per writer/reader instead of a global lock. - RCU snapshot —
LockFreeReadHistoryCachewith&selfmutations viazerodds_foundation::rcu::RcuCell(copy-on-write Arc swap). Reader snapshots live independently of the cache lock.
Wire-format conformance
DDSI-RTPS 2.5 §8.3 — fully implemented:
- RTPS header (§8.3.3): 20 byte, magic + version + VendorId + GuidPrefix
- Submessage header (§8.3.4): 4 byte, ID + flags + OctetsToNextHeader
- DATA / DATA_FRAG / GAP / HEARTBEAT / HEARTBEAT_FRAG / ACKNACK / NACK_FRAG / INFO_TS / INFO_SRC / INFO_DST / INFO_REPLY
- Cross-vendor wire compat byte-identical against Cyclone DDS,
FastDDS, RTI Connext, OpenSplice (see
docs/interop/).
Cross-vendor compatibility
- RTPS 2.1 with a 0x80 submessage (Cyclone/FastDDS legacy) —
HeaderExtension is parsed only from version 2.5 on, before that
treated as vendor-specific. Regression test
rtps_2_1_treats_0x80_as_vendor_specific_not_header_extension. fragments_in_submessage > 1(RTI bundling) — the decoder accepts it; the encoder emits 1 fragment per submessage.- HEARTBEAT_FRAG — decoder ready, encoder not active (regular HEARTBEATs are enough for the reader).
Tests
E2E tests in tests/reliable_e2e.rs cover in-order delivery with
0%/10%/30% simulated packet loss + 10-kB fragmentation.
Documentation
For a hot-path trace see Documentation Trail Station 02 → data-flow.