zerodds_dcps/
coherent_set.rs1extern crate alloc;
26
27use alloc::sync::Arc;
28use core::sync::atomic::{AtomicBool, AtomicI64, Ordering};
29
30use crate::error::{DdsError, Result};
31
32pub type CoherentSequenceNumber = i64;
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
38pub struct CoherentSetMarker {
39 pub set_first_sn: CoherentSequenceNumber,
41}
42
43impl CoherentSetMarker {
44 #[must_use]
47 pub fn to_wire_bytes(&self) -> [u8; 8] {
48 let high = (self.set_first_sn >> 32) as i32;
50 let low = (self.set_first_sn & 0xFFFF_FFFF) as u32;
51 let mut out = [0u8; 8];
52 out[0..4].copy_from_slice(&high.to_be_bytes());
53 out[4..8].copy_from_slice(&low.to_be_bytes());
54 out
55 }
56
57 #[must_use]
59 pub fn from_wire_bytes(bytes: &[u8; 8]) -> Self {
60 let high = i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
61 let low = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
62 let sn = (i64::from(high) << 32) | i64::from(low);
63 Self { set_first_sn: sn }
64 }
65}
66
67#[derive(Debug)]
77pub struct CoherentScope {
78 active: AtomicBool,
80 set_first_sn: AtomicI64,
83}
84
85impl Default for CoherentScope {
86 fn default() -> Self {
87 Self {
88 active: AtomicBool::new(false),
89 set_first_sn: AtomicI64::new(i64::MIN),
90 }
91 }
92}
93
94impl CoherentScope {
95 #[must_use]
98 pub fn new() -> Arc<Self> {
99 Arc::new(Self::default())
100 }
101
102 #[must_use]
104 pub fn is_active(&self) -> bool {
105 self.active.load(Ordering::Acquire)
106 }
107
108 #[must_use]
110 pub fn current_marker(&self) -> Option<CoherentSetMarker> {
111 if self.is_active() {
112 let sn = self.set_first_sn.load(Ordering::Acquire);
113 if sn != i64::MIN {
114 return Some(CoherentSetMarker { set_first_sn: sn });
115 }
116 }
117 None
118 }
119
120 pub fn begin(&self, next_sn: CoherentSequenceNumber) -> Result<()> {
128 if self.active.load(Ordering::Acquire) {
129 return Err(DdsError::PreconditionNotMet {
130 reason: "coherent set already active",
131 });
132 }
133 self.set_first_sn.store(next_sn, Ordering::Release);
134 self.active.store(true, Ordering::Release);
135 Ok(())
136 }
137
138 pub fn end(&self) -> Result<CoherentSetMarker> {
144 let was = self.active.swap(false, Ordering::AcqRel);
145 if !was {
146 return Err(DdsError::PreconditionNotMet {
147 reason: "no coherent set active",
148 });
149 }
150 let sn = self.set_first_sn.swap(i64::MIN, Ordering::AcqRel);
151 Ok(CoherentSetMarker { set_first_sn: sn })
152 }
153}
154
155#[derive(Debug, Default)]
168pub struct GroupAccessScope {
169 open_count: core::sync::atomic::AtomicU32,
172 snapshot_generation: core::sync::atomic::AtomicU64,
177}
178
179impl GroupAccessScope {
180 #[must_use]
182 pub fn new() -> Arc<Self> {
183 Arc::new(Self::default())
184 }
185
186 #[must_use]
188 pub fn is_active(&self) -> bool {
189 self.open_count.load(Ordering::Acquire) > 0
190 }
191
192 #[must_use]
195 pub fn current_snapshot(&self) -> u64 {
196 self.snapshot_generation.load(Ordering::Acquire)
197 }
198
199 pub fn begin(&self) {
204 let prev = self.open_count.fetch_add(1, Ordering::AcqRel);
205 if prev == 0 {
206 self.snapshot_generation.fetch_add(1, Ordering::AcqRel);
207 }
208 }
209
210 pub fn end(&self) -> Result<()> {
216 loop {
218 let cur = self.open_count.load(Ordering::Acquire);
219 if cur == 0 {
220 return Err(DdsError::PreconditionNotMet {
221 reason: "end_access without begin_access",
222 });
223 }
224 if self
225 .open_count
226 .compare_exchange(cur, cur - 1, Ordering::AcqRel, Ordering::Acquire)
227 .is_ok()
228 {
229 return Ok(());
230 }
231 }
232 }
233}
234
235#[cfg(test)]
236#[allow(clippy::expect_used, clippy::unwrap_used)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn marker_wire_roundtrip() {
242 let m = CoherentSetMarker {
243 set_first_sn: 0x0123_4567_89AB_CDEF,
244 };
245 let bytes = m.to_wire_bytes();
246 let back = CoherentSetMarker::from_wire_bytes(&bytes);
247 assert_eq!(m, back);
248 }
249
250 #[test]
251 fn marker_wire_zero() {
252 let m = CoherentSetMarker { set_first_sn: 0 };
253 assert_eq!(m.to_wire_bytes(), [0u8; 8]);
254 }
255
256 #[test]
257 fn coherent_scope_starts_inactive() {
258 let s = CoherentScope::new();
259 assert!(!s.is_active());
260 assert!(s.current_marker().is_none());
261 }
262
263 #[test]
264 fn begin_end_lifecycle() {
265 let s = CoherentScope::new();
266 s.begin(42).unwrap();
267 assert!(s.is_active());
268 let m = s.current_marker().expect("active should have marker");
269 assert_eq!(m.set_first_sn, 42);
270 let end = s.end().unwrap();
271 assert_eq!(end.set_first_sn, 42);
272 assert!(!s.is_active());
273 }
274
275 #[test]
276 fn double_begin_is_error() {
277 let s = CoherentScope::new();
278 s.begin(1).unwrap();
279 let err = s.begin(2).unwrap_err();
280 assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
281 }
282
283 #[test]
284 fn end_without_begin_is_error() {
285 let s = CoherentScope::new();
286 let err = s.end().unwrap_err();
287 assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
288 }
289
290 #[test]
291 fn group_access_nesting() {
292 let g = GroupAccessScope::new();
293 assert!(!g.is_active());
294 g.begin();
295 assert!(g.is_active());
296 g.begin();
297 g.end().unwrap();
298 assert!(g.is_active(), "still nested");
299 g.end().unwrap();
300 assert!(!g.is_active());
301 }
302
303 #[test]
304 fn group_access_underflow_is_error() {
305 let g = GroupAccessScope::new();
306 let err = g.end().unwrap_err();
307 assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
308 }
309
310 #[test]
313 fn snapshot_generation_starts_zero() {
314 let g = GroupAccessScope::new();
315 assert_eq!(g.current_snapshot(), 0);
316 }
317
318 #[test]
319 fn snapshot_generation_increments_on_begin_from_zero() {
320 let g = GroupAccessScope::new();
321 g.begin();
322 assert_eq!(g.current_snapshot(), 1);
323 g.end().unwrap();
324 assert_eq!(g.current_snapshot(), 1);
327 g.begin();
328 assert_eq!(g.current_snapshot(), 2);
329 }
330
331 #[test]
332 fn snapshot_generation_stable_during_nested_begin() {
333 let g = GroupAccessScope::new();
336 g.begin();
337 let g1 = g.current_snapshot();
338 g.begin();
339 let g2 = g.current_snapshot();
340 assert_eq!(g1, g2, "nested begin must keep snapshot stable");
341 g.end().unwrap();
342 let g3 = g.current_snapshot();
343 assert_eq!(g1, g3, "snapshot stays stable until last end");
344 g.end().unwrap();
345 }
346
347 #[test]
348 fn cross_topic_consistent_snapshot_via_clone() {
349 let g = GroupAccessScope::new();
352 let g_for_dr1 = Arc::clone(&g);
353 let g_for_dr2 = Arc::clone(&g);
354 g.begin();
355 assert_eq!(g_for_dr1.current_snapshot(), g_for_dr2.current_snapshot());
357 assert_eq!(g_for_dr1.current_snapshot(), 1);
358 g.end().unwrap();
359 }
360}