#![allow(unsafe_code)]
use alloc::sync::Arc;
use core::marker::PhantomData;
use zerodds_flatdata::{FlatStruct, SlotBackend, SlotError, SlotHandle};
use crate::dds_type::DdsType;
use crate::error::{DdsError, Result};
use crate::publisher::DataWriter;
use crate::subscriber::DataReader;
fn slot_to_dds(e: SlotError) -> DdsError {
match e {
SlotError::NoFreeSlot => DdsError::OutOfResources {
what: "flatdata: no free slot",
},
SlotError::OutOfBounds => DdsError::BadParameter {
what: "flatdata: slot index out of bounds",
},
SlotError::SampleTooLarge { .. } => DdsError::OutOfResources {
what: "flatdata: sample too large",
},
SlotError::LockPoisoned => DdsError::PreconditionNotMet {
reason: "flatdata: slot lock poisoned",
},
}
}
impl<T: DdsType + FlatStruct + Send + Sync + 'static> DataWriter<T> {
pub fn set_flat_backend(
&self,
backend: Option<Arc<dyn SlotBackend>>,
active_readers_mask: u32,
) {
if let Ok(mut slot) = self.flat_backend.lock() {
*slot = backend.map(|b| (b, active_readers_mask));
}
}
pub fn write_flat(&self, sample: &T) -> Result<()> {
let backend_snapshot = self
.flat_backend
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "flatdata: backend mutex poisoned",
})?
.clone();
if let Some((backend, mask)) = backend_snapshot {
let bytes = sample.as_bytes();
let handle = backend.reserve_slot(mask).map_err(slot_to_dds)?;
if let Err(e) = backend.commit_slot(handle, bytes) {
let _ = backend.discard_slot(handle);
return Err(slot_to_dds(e));
}
self.write(sample)
} else {
self.write(sample)
}
}
}
impl<T: DdsType + FlatStruct + Send + Sync + 'static> DataReader<T> {
pub fn set_flat_backend(&self, backend: Option<Arc<dyn SlotBackend>>, reader_index: u8) {
use core::sync::atomic::AtomicU32;
if let Ok(mut slot) = self.flat_backend.lock() {
*slot = backend.map(|b| (b, reader_index, AtomicU32::new(u32::MAX)));
}
}
pub fn read_flat(&self) -> Result<Option<T>> {
let mut slot = self
.flat_backend
.lock()
.map_err(|_| DdsError::PreconditionNotMet {
reason: "flatdata: backend mutex poisoned",
})?;
let Some((backend, reader_index, last_sn)) = slot.as_mut() else {
return Ok(None);
};
if let Some(backend_hash) = backend.type_hash() {
if backend_hash != T::TYPE_HASH {
return Err(DdsError::PreconditionNotMet {
reason: "flatdata: TYPE_HASH mismatch (schema drift)",
});
}
}
scan_slots::<T>(backend.as_ref(), *reader_index, last_sn)
}
}
fn scan_slots<T: FlatStruct>(
backend: &dyn SlotBackend,
reader_index: u8,
last_sn: &core::sync::atomic::AtomicU32,
) -> Result<Option<T>> {
let count = backend.slot_count().map_err(slot_to_dds)?;
let last_seen = last_sn.load(core::sync::atomic::Ordering::Relaxed);
let mut best: Option<(u32, T)> = None;
for idx in 0..count {
let handle = SlotHandle {
segment_id: 0,
slot_index: idx as u32,
};
let (header, bytes) = backend.read_slot(handle).map_err(slot_to_dds)?;
if header.sample_size == 0 {
continue;
}
if (header.reader_mask & (1u32 << reader_index)) != 0 {
continue;
}
if (bytes.len() as u32) < T::WIRE_SIZE as u32 {
continue;
}
let sample = unsafe { T::from_bytes_unchecked(&bytes) };
let unseen = last_seen == u32::MAX || header.sequence_number > last_seen;
let beats_current = best
.as_ref()
.is_none_or(|(b_sn, _)| header.sequence_number > *b_sn);
if unseen && beats_current {
best = Some((header.sequence_number, sample));
}
backend
.mark_read(handle, reader_index)
.map_err(slot_to_dds)?;
}
if let Some((sn, _)) = best.as_ref() {
last_sn.store(*sn, core::sync::atomic::Ordering::Relaxed);
}
Ok(best.map(|(_, t)| t))
}
pub struct FlatWriterExt<T: DdsType + FlatStruct + Send + Sync + 'static> {
writer: Arc<DataWriter<T>>,
backend: Arc<dyn SlotBackend>,
active_readers_mask: u32,
_t: PhantomData<fn() -> T>,
}
impl<T: DdsType + FlatStruct + Send + Sync + 'static> FlatWriterExt<T> {
#[must_use]
pub fn new(
writer: Arc<DataWriter<T>>,
backend: Arc<dyn SlotBackend>,
active_readers_mask: u32,
) -> Self {
Self {
writer,
backend,
active_readers_mask,
_t: PhantomData,
}
}
pub fn write_flat(&self, sample: &T) -> Result<()> {
let bytes = sample.as_bytes();
let handle = self
.backend
.reserve_slot(self.active_readers_mask)
.map_err(slot_to_dds)?;
if let Err(e) = self.backend.commit_slot(handle, bytes) {
let _ = self.backend.discard_slot(handle);
return Err(slot_to_dds(e));
}
self.writer.write(sample)
}
#[must_use]
pub fn writer(&self) -> &DataWriter<T> {
&self.writer
}
}
pub struct FlatReaderExt<T: DdsType + FlatStruct + Send + Sync + 'static> {
reader: Arc<DataReader<T>>,
backend: Arc<dyn SlotBackend>,
reader_index: u8,
last_sn: core::sync::atomic::AtomicU32,
_t: PhantomData<fn() -> T>,
}
impl<T: DdsType + FlatStruct + Send + Sync + 'static> FlatReaderExt<T> {
#[must_use]
pub fn new(
reader: Arc<DataReader<T>>,
backend: Arc<dyn SlotBackend>,
reader_index: u8,
) -> Self {
Self {
reader,
backend,
reader_index,
last_sn: core::sync::atomic::AtomicU32::new(u32::MAX),
_t: PhantomData,
}
}
pub fn read_flat(&self) -> Result<Option<T>> {
if let Some(backend_hash) = self.backend.type_hash() {
if backend_hash != T::TYPE_HASH {
return Err(DdsError::PreconditionNotMet {
reason: "flatdata: TYPE_HASH mismatch (schema drift)",
});
}
}
scan_slots::<T>(self.backend.as_ref(), self.reader_index, &self.last_sn)
}
#[must_use]
pub fn reader(&self) -> &DataReader<T> {
&self.reader
}
}