extern crate alloc;
use alloc::sync::Arc;
use core::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use crate::error::{DdsError, Result};
pub type CoherentSequenceNumber = i64;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct CoherentSetMarker {
pub set_first_sn: CoherentSequenceNumber,
}
impl CoherentSetMarker {
#[must_use]
pub fn to_wire_bytes(&self) -> [u8; 8] {
let high = (self.set_first_sn >> 32) as i32;
let low = (self.set_first_sn & 0xFFFF_FFFF) as u32;
let mut out = [0u8; 8];
out[0..4].copy_from_slice(&high.to_be_bytes());
out[4..8].copy_from_slice(&low.to_be_bytes());
out
}
#[must_use]
pub fn from_wire_bytes(bytes: &[u8; 8]) -> Self {
let high = i32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
let low = u32::from_be_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
let sn = (i64::from(high) << 32) | i64::from(low);
Self { set_first_sn: sn }
}
}
#[derive(Debug)]
pub struct CoherentScope {
active: AtomicBool,
set_first_sn: AtomicI64,
}
impl Default for CoherentScope {
fn default() -> Self {
Self {
active: AtomicBool::new(false),
set_first_sn: AtomicI64::new(i64::MIN),
}
}
}
impl CoherentScope {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
#[must_use]
pub fn is_active(&self) -> bool {
self.active.load(Ordering::Acquire)
}
#[must_use]
pub fn current_marker(&self) -> Option<CoherentSetMarker> {
if self.is_active() {
let sn = self.set_first_sn.load(Ordering::Acquire);
if sn != i64::MIN {
return Some(CoherentSetMarker { set_first_sn: sn });
}
}
None
}
pub fn begin(&self, next_sn: CoherentSequenceNumber) -> Result<()> {
if self.active.load(Ordering::Acquire) {
return Err(DdsError::PreconditionNotMet {
reason: "coherent set already active",
});
}
self.set_first_sn.store(next_sn, Ordering::Release);
self.active.store(true, Ordering::Release);
Ok(())
}
pub fn end(&self) -> Result<CoherentSetMarker> {
let was = self.active.swap(false, Ordering::AcqRel);
if !was {
return Err(DdsError::PreconditionNotMet {
reason: "no coherent set active",
});
}
let sn = self.set_first_sn.swap(i64::MIN, Ordering::AcqRel);
Ok(CoherentSetMarker { set_first_sn: sn })
}
}
#[derive(Debug, Default)]
pub struct GroupAccessScope {
open_count: core::sync::atomic::AtomicU32,
snapshot_generation: core::sync::atomic::AtomicU64,
}
impl GroupAccessScope {
#[must_use]
pub fn new() -> Arc<Self> {
Arc::new(Self::default())
}
#[must_use]
pub fn is_active(&self) -> bool {
self.open_count.load(Ordering::Acquire) > 0
}
#[must_use]
pub fn current_snapshot(&self) -> u64 {
self.snapshot_generation.load(Ordering::Acquire)
}
pub fn begin(&self) {
let prev = self.open_count.fetch_add(1, Ordering::AcqRel);
if prev == 0 {
self.snapshot_generation.fetch_add(1, Ordering::AcqRel);
}
}
pub fn end(&self) -> Result<()> {
loop {
let cur = self.open_count.load(Ordering::Acquire);
if cur == 0 {
return Err(DdsError::PreconditionNotMet {
reason: "end_access without begin_access",
});
}
if self
.open_count
.compare_exchange(cur, cur - 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(());
}
}
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn marker_wire_roundtrip() {
let m = CoherentSetMarker {
set_first_sn: 0x0123_4567_89AB_CDEF,
};
let bytes = m.to_wire_bytes();
let back = CoherentSetMarker::from_wire_bytes(&bytes);
assert_eq!(m, back);
}
#[test]
fn marker_wire_zero() {
let m = CoherentSetMarker { set_first_sn: 0 };
assert_eq!(m.to_wire_bytes(), [0u8; 8]);
}
#[test]
fn coherent_scope_starts_inactive() {
let s = CoherentScope::new();
assert!(!s.is_active());
assert!(s.current_marker().is_none());
}
#[test]
fn begin_end_lifecycle() {
let s = CoherentScope::new();
s.begin(42).unwrap();
assert!(s.is_active());
let m = s.current_marker().expect("active should have marker");
assert_eq!(m.set_first_sn, 42);
let end = s.end().unwrap();
assert_eq!(end.set_first_sn, 42);
assert!(!s.is_active());
}
#[test]
fn double_begin_is_error() {
let s = CoherentScope::new();
s.begin(1).unwrap();
let err = s.begin(2).unwrap_err();
assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
}
#[test]
fn end_without_begin_is_error() {
let s = CoherentScope::new();
let err = s.end().unwrap_err();
assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
}
#[test]
fn group_access_nesting() {
let g = GroupAccessScope::new();
assert!(!g.is_active());
g.begin();
assert!(g.is_active());
g.begin();
g.end().unwrap();
assert!(g.is_active(), "still nested");
g.end().unwrap();
assert!(!g.is_active());
}
#[test]
fn group_access_underflow_is_error() {
let g = GroupAccessScope::new();
let err = g.end().unwrap_err();
assert!(matches!(err, DdsError::PreconditionNotMet { .. }));
}
#[test]
fn snapshot_generation_starts_zero() {
let g = GroupAccessScope::new();
assert_eq!(g.current_snapshot(), 0);
}
#[test]
fn snapshot_generation_increments_on_begin_from_zero() {
let g = GroupAccessScope::new();
g.begin();
assert_eq!(g.current_snapshot(), 1);
g.end().unwrap();
assert_eq!(g.current_snapshot(), 1);
g.begin();
assert_eq!(g.current_snapshot(), 2);
}
#[test]
fn snapshot_generation_stable_during_nested_begin() {
let g = GroupAccessScope::new();
g.begin();
let g1 = g.current_snapshot();
g.begin();
let g2 = g.current_snapshot();
assert_eq!(g1, g2, "nested begin must keep snapshot stable");
g.end().unwrap();
let g3 = g.current_snapshot();
assert_eq!(g1, g3, "snapshot stays stable until last end");
g.end().unwrap();
}
#[test]
fn cross_topic_consistent_snapshot_via_clone() {
let g = GroupAccessScope::new();
let g_for_dr1 = Arc::clone(&g);
let g_for_dr2 = Arc::clone(&g);
g.begin();
assert_eq!(g_for_dr1.current_snapshot(), g_for_dr2.current_snapshot());
assert_eq!(g_for_dr1.current_snapshot(), 1);
g.end().unwrap();
}
}