Skip to main content

zerodds_dcps/
coherent_set.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! Coherent-Sets + Group-Access (DDS DCPS 1.4 §2.2.2.4.1.8-11,
4//! §2.2.2.5.2.8-11, §2.2.2.5.3.32; DDSI-RTPS 2.5 §9.6.4.2/3/4).
5//!
6//! Wenn `Presentation.coherent_access = true`, kann ein Publisher
7//! eine Sequenz von write()-Aufrufen in einem **Coherent-Set**
8//! gruppieren — der Subscriber sieht entweder alle Samples oder
9//! keines, niemals einen Teil. Der Wire-Mechanismus ist die
10//! Inline-QoS-PID `PID_COHERENT_SET` (sequence_number des ersten
11//! Sample im Set).
12//!
13//! .9 liefert:
14//! - [`CoherentScope`] — der State-Machine-Tracker im Publisher /
15//!   Subscriber.
16//! - [`CoherentSetMarker`] — die Wire-Repraesentation, die in der
17//!   Inline-QoS landet.
18//! - Hilfsfunktionen fuer begin/end_coherent_changes auf Publisher-
19//!   und begin/end_access auf Subscriber-Seite.
20//!
21//! Scope: API-Surface + State-Tracking. Tatsaechliche
22//! Inline-QoS-PID-Wiring im DCPS-write/take-Pfad folgt im Wire-Up
23//! (deps auf KeyHash-Inline-QoS-Wiring).
24
25extern crate alloc;
26
27use alloc::sync::Arc;
28use core::sync::atomic::{AtomicBool, AtomicI64, Ordering};
29
30use crate::error::{DdsError, Result};
31
32/// 8-byte SequenceNumber-Aequivalent (DDSI-RTPS Wire-Format).
33pub type CoherentSequenceNumber = i64;
34
35/// Wire-Repraesentation eines `PID_COHERENT_SET`-Eintrags. Wird vom
36/// DataWriter in die Inline-QoS einer DATA-Submessage geschrieben.
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub struct CoherentSetMarker {
39    /// SequenceNumber des **ersten** Sample im Coherent-Set.
40    pub set_first_sn: CoherentSequenceNumber,
41}
42
43impl CoherentSetMarker {
44    /// 8-byte Wire-Repraesentation (BE — entspricht
45    /// `SequenceNumber::to_bytes_be`).
46    #[must_use]
47    pub fn to_wire_bytes(&self) -> [u8; 8] {
48        // RTPS SequenceNumber-Format: high 32 bits + low 32 bits, beides BE.
49        let high = (self.set_first_sn >> 32) as i32;
50        let low = (self.set_first_sn & 0xFFFF_FFFF) as u32;
51        let mut out = [0u8; 8];
52        out[0..4].copy_from_slice(&high.to_be_bytes());
53        out[4..8].copy_from_slice(&low.to_be_bytes());
54        out
55    }
56
57    /// Decode aus 8 byte BE.
58    #[must_use]
59    pub fn from_wire_bytes(bytes: &[u8; 8]) -> Self {
60        let high = i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
61        let low = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
62        let sn = (i64::from(high) << 32) | i64::from(low);
63        Self { set_first_sn: sn }
64    }
65}
66
67/// State-Machine-Tracker fuer Coherent-Set auf Publisher-Seite.
68///
69/// Lifecycle:
70/// 1. `begin_coherent_changes` setzt `active=true` und merkt sich
71///    die naechste Sequence-Number als `set_first_sn`.
72/// 2. Jede `write()` waehrend des aktiven Sets traegt diesen Marker.
73/// 3. `end_coherent_changes` setzt `active=false`. Das naechste write
74///    *ohne* Marker oder mit neuem Marker signalisiert dem Reader
75///    das Set-Ende.
76#[derive(Debug)]
77pub struct CoherentScope {
78    /// Aktuell offen?
79    active: AtomicBool,
80    /// Sequence-Number des ersten Sample im aktuellen Set
81    /// (Sentinel `i64::MIN` = noch keiner gesetzt).
82    set_first_sn: AtomicI64,
83}
84
85impl Default for CoherentScope {
86    fn default() -> Self {
87        Self {
88            active: AtomicBool::new(false),
89            set_first_sn: AtomicI64::new(i64::MIN),
90        }
91    }
92}
93
94impl CoherentScope {
95    /// Neuer leerer Scope (in Arc, weil Caller den meist shared
96    /// zwischen Publisher-Inner + write-Pfad halten muessen).
97    #[must_use]
98    pub fn new() -> Arc<Self> {
99        Arc::new(Self::default())
100    }
101
102    /// True wenn der Scope aktiv ist.
103    #[must_use]
104    pub fn is_active(&self) -> bool {
105        self.active.load(Ordering::Acquire)
106    }
107
108    /// Liefert den aktuellen Marker, falls Scope aktiv ist.
109    #[must_use]
110    pub fn current_marker(&self) -> Option<CoherentSetMarker> {
111        if self.is_active() {
112            let sn = self.set_first_sn.load(Ordering::Acquire);
113            if sn != i64::MIN {
114                return Some(CoherentSetMarker { set_first_sn: sn });
115            }
116        }
117        None
118    }
119
120    /// Beginnt einen neuen Coherent-Set mit der gegebenen
121    /// `next_sn` als Set-First-SN. Spec: §2.2.2.4.1.8
122    /// `Publisher::begin_coherent_changes`.
123    ///
124    /// # Errors
125    /// `PreconditionNotMet` wenn ein Set bereits aktiv ist
126    /// (Spec: nicht verschachtelbar aktuell).
127    pub fn begin(&self, next_sn: CoherentSequenceNumber) -> Result<()> {
128        if self.active.load(Ordering::Acquire) {
129            return Err(DdsError::PreconditionNotMet {
130                reason: "coherent set already active",
131            });
132        }
133        self.set_first_sn.store(next_sn, Ordering::Release);
134        self.active.store(true, Ordering::Release);
135        Ok(())
136    }
137
138    /// Beendet den aktiven Coherent-Set. Spec: §2.2.2.4.1.9
139    /// `Publisher::end_coherent_changes`.
140    ///
141    /// # Errors
142    /// `PreconditionNotMet` wenn kein Set aktiv ist.
143    pub fn end(&self) -> Result<CoherentSetMarker> {
144        let was = self.active.swap(false, Ordering::AcqRel);
145        if !was {
146            return Err(DdsError::PreconditionNotMet {
147                reason: "no coherent set active",
148            });
149        }
150        let sn = self.set_first_sn.swap(i64::MIN, Ordering::AcqRel);
151        Ok(CoherentSetMarker { set_first_sn: sn })
152    }
153}
154
155/// Subscriber-seitiger Group-Access (Spec §2.2.2.5.2.8/9).
156///
157/// Wenn `Presentation.access_scope = GROUP` und `coherent_access`
158/// aktiv ist, MUSS der Subscriber zwischen `begin_access` und
159/// `end_access` die Atomic-Sicht ueber alle DataReader im Subscriber
160/// gewaehren.
161///
162/// Group-Access-Scope mit Snapshot-Atomicity: jedes `begin_access`
163/// inkrementiert den `snapshot_generation`-Counter; alle DataReader
164/// im Subscriber, die zwischen begin und end gelesen werden, sehen
165/// die SELBE Generation und daher Spec-konform einen atomic Cut
166/// ueber alle Topics.
167#[derive(Debug, Default)]
168pub struct GroupAccessScope {
169    /// Counter der noch offenen begin_access-Aufrufe (Spec erlaubt
170    /// rekursives Verschachteln).
171    open_count: core::sync::atomic::AtomicU32,
172    /// Snapshot-Generation: wird beim ersten `begin()` (cur=0→1)
173    /// inkrementiert und stays-stable bis das letzte `end()` schließt.
174    /// DataReader-Read-Sites koennen `current_snapshot()` lesen, um
175    /// einen konsistenten Cut zu definieren.
176    snapshot_generation: core::sync::atomic::AtomicU64,
177}
178
179impl GroupAccessScope {
180    /// Neuer leerer Scope.
181    #[must_use]
182    pub fn new() -> Arc<Self> {
183        Arc::new(Self::default())
184    }
185
186    /// True wenn aktuell mindestens ein begin_access offen ist.
187    #[must_use]
188    pub fn is_active(&self) -> bool {
189        self.open_count.load(Ordering::Acquire) > 0
190    }
191
192    /// Aktuelle Snapshot-Generation (siehe Struct-Doku). 0 bedeutet
193    /// "kein Snapshot je geoeffnet".
194    #[must_use]
195    pub fn current_snapshot(&self) -> u64 {
196        self.snapshot_generation.load(Ordering::Acquire)
197    }
198
199    /// `Subscriber::begin_access` (Spec §2.2.2.5.2.8). Idempotent
200    /// nestable — jeder Aufruf erhoeht den Counter, jedes
201    /// `end_access` erniedrigt. Beim Uebergang von 0→1 wird zusaetzlich
202    /// die Snapshot-Generation inkrementiert (atomic Cut-Begin).
203    pub fn begin(&self) {
204        let prev = self.open_count.fetch_add(1, Ordering::AcqRel);
205        if prev == 0 {
206            self.snapshot_generation.fetch_add(1, Ordering::AcqRel);
207        }
208    }
209
210    /// `Subscriber::end_access` (Spec §2.2.2.5.2.9).
211    ///
212    /// # Errors
213    /// `PreconditionNotMet` wenn kein begin_access offen ist (Counter
214    /// unterläuft).
215    pub fn end(&self) -> Result<()> {
216        // Sicherer Decrement: read+CAS-loop um Underflow zu erkennen.
217        loop {
218            let cur = self.open_count.load(Ordering::Acquire);
219            if cur == 0 {
220                return Err(DdsError::PreconditionNotMet {
221                    reason: "end_access without begin_access",
222                });
223            }
224            if self
225                .open_count
226                .compare_exchange(cur, cur - 1, Ordering::AcqRel, Ordering::Acquire)
227                .is_ok()
228            {
229                return Ok(());
230            }
231        }
232    }
233}
234
235#[cfg(test)]
236#[allow(clippy::expect_used, clippy::unwrap_used)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn marker_wire_roundtrip() {
242        let m = CoherentSetMarker {
243            set_first_sn: 0x0123_4567_89AB_CDEF,
244        };
245        let bytes = m.to_wire_bytes();
246        let back = CoherentSetMarker::from_wire_bytes(&bytes);
247        assert_eq!(m, back);
248    }
249
250    #[test]
251    fn marker_wire_zero() {
252        let m = CoherentSetMarker { set_first_sn: 0 };
253        assert_eq!(m.to_wire_bytes(), [0u8; 8]);
254    }
255
256    #[test]
257    fn coherent_scope_starts_inactive() {
258        let s = CoherentScope::new();
259        assert!(!s.is_active());
260        assert!(s.current_marker().is_none());
261    }
262
263    #[test]
264    fn begin_end_lifecycle() {
265        let s = CoherentScope::new();
266        s.begin(42).unwrap();
267        assert!(s.is_active());
268        let m = s.current_marker().expect("active should have marker");
269        assert_eq!(m.set_first_sn, 42);
270        let end = s.end().unwrap();
271        assert_eq!(end.set_first_sn, 42);
272        assert!(!s.is_active());
273    }
274
275    #[test]
276    fn double_begin_is_error() {
277        let s = CoherentScope::new();
278        s.begin(1).unwrap();
279        let err = s.begin(2).unwrap_err();
280        assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
281    }
282
283    #[test]
284    fn end_without_begin_is_error() {
285        let s = CoherentScope::new();
286        let err = s.end().unwrap_err();
287        assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
288    }
289
290    #[test]
291    fn group_access_nesting() {
292        let g = GroupAccessScope::new();
293        assert!(!g.is_active());
294        g.begin();
295        assert!(g.is_active());
296        g.begin();
297        g.end().unwrap();
298        assert!(g.is_active(), "still nested");
299        g.end().unwrap();
300        assert!(!g.is_active());
301    }
302
303    #[test]
304    fn group_access_underflow_is_error() {
305        let g = GroupAccessScope::new();
306        let err = g.end().unwrap_err();
307        assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
308    }
309
310    // ---- §2.2.3.6 GROUP-coherent_access Snapshot-Generation ----
311
312    #[test]
313    fn snapshot_generation_starts_zero() {
314        let g = GroupAccessScope::new();
315        assert_eq!(g.current_snapshot(), 0);
316    }
317
318    #[test]
319    fn snapshot_generation_increments_on_begin_from_zero() {
320        let g = GroupAccessScope::new();
321        g.begin();
322        assert_eq!(g.current_snapshot(), 1);
323        g.end().unwrap();
324        // Generation bleibt nach end() — jedes neue begin gibt eine
325        // neue Generation. Das ist die "atomic cut"-Identitaet.
326        assert_eq!(g.current_snapshot(), 1);
327        g.begin();
328        assert_eq!(g.current_snapshot(), 2);
329    }
330
331    #[test]
332    fn snapshot_generation_stable_during_nested_begin() {
333        // Innerhalb verschachtelter begin/end soll die Generation
334        // konstant bleiben — wir sehen denselben Cut.
335        let g = GroupAccessScope::new();
336        g.begin();
337        let g1 = g.current_snapshot();
338        g.begin();
339        let g2 = g.current_snapshot();
340        assert_eq!(g1, g2, "nested begin must keep snapshot stable");
341        g.end().unwrap();
342        let g3 = g.current_snapshot();
343        assert_eq!(g1, g3, "snapshot stays stable until last end");
344        g.end().unwrap();
345    }
346
347    #[test]
348    fn cross_topic_consistent_snapshot_via_clone() {
349        // Multi-Reader-Coherent-Set: alle DR koennen via cloning
350        // dieselbe Scope sehen → identische snapshot_generation.
351        let g = GroupAccessScope::new();
352        let g_for_dr1 = Arc::clone(&g);
353        let g_for_dr2 = Arc::clone(&g);
354        g.begin();
355        // DR1 + DR2 sehen denselben Cut.
356        assert_eq!(g_for_dr1.current_snapshot(), g_for_dr2.current_snapshot());
357        assert_eq!(g_for_dr1.current_snapshot(), 1);
358        g.end().unwrap();
359    }
360}