use std::ffi::c_void;
use zenrc_dds::{
CdrSample, LoanedSample, RawMessageBridge, Sample, SampleInfo, dds_entity_t, dds_sample_info_t, ddsi_serdata
};
use super::error::{DdsError, Result};
pub(crate) fn allocate_sample_buffers<T: RawMessageBridge>(
max: usize,
) -> (Vec<T::CStruct>, Vec<*mut c_void>, Vec<dds_sample_info_t>) {
let mut raw_samples: Vec<T::CStruct> =
(0..max).map(|_| unsafe { std::mem::zeroed() }).collect();
let ptrs: Vec<*mut c_void> = raw_samples
.iter_mut()
.map(|sample| sample as *mut T::CStruct as *mut c_void)
.collect();
let infos = vec![unsafe { std::mem::zeroed() }; max];
(raw_samples, ptrs, infos)
}
pub(crate) fn collect_samples<T: RawMessageBridge>(
n: i32,
raw_samples: Vec<T::CStruct>,
infos: Vec<dds_sample_info_t>,
) -> Result<Vec<Sample<T>>> {
let n = n as usize;
let mut result = Vec::with_capacity(n);
for (raw, raw_info) in raw_samples.into_iter().zip(infos.into_iter()).take(n) {
if raw_info.valid_data {
result.push(Sample {
inner: T::from_raw(raw),
info: SampleInfo::from(raw_info),
});
} else {
let _ = T::from_raw(raw);
}
}
Ok(result)
}
pub(crate) fn take<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
) -> Result<Vec<Sample<T>>> {
take_with_mask(reader, max, zenrc_dds::DDS_ANY_STATE)
}
pub(crate) fn take_one<T: RawMessageBridge>(
reader: dds_entity_t,
) -> Result<Option<Sample<T>>> {
Ok(take(reader, 1)?.into_iter().next())
}
pub(crate) fn take_with_mask<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
mask: u32,
) -> Result<Vec<Sample<T>>> {
read_or_take(reader, max, mask, true)
}
pub(crate) fn read<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
) -> Result<Vec<Sample<T>>> {
read_with_mask(reader, max, zenrc_dds::DDS_ANY_STATE)
}
pub(crate) fn read_one<T: RawMessageBridge>(
reader: dds_entity_t,
) -> Result<Option<Sample<T>>> {
Ok(read(reader, 1)?.into_iter().next())
}
pub(crate) fn read_wl<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
) -> Result<Vec<LoanedSample<T>>> {
if max == 0 {
return Ok(Vec::new());
}
let mut ptrs = vec![std::ptr::null_mut::<c_void>(); max];
let mut infos = vec![unsafe { std::mem::zeroed::<dds_sample_info_t>() }; max];
let n = unsafe {
zenrc_dds::dds_read_wl(reader, ptrs.as_mut_ptr(), infos.as_mut_ptr(), max as u32)
};
if n < 0 {
return Err(DdsError::RetCode(n, "dds_read_wl failed".to_string()));
}
let result = ptrs.into_iter()
.zip(infos.into_iter())
.take(n as usize)
.map(|(ptr, info)| LoanedSample::new(reader, ptr, SampleInfo::from(info)))
.collect();
Ok(result)
}
pub(crate) fn read_one_wl<T: RawMessageBridge>(
reader: dds_entity_t,
) -> Result<Option<LoanedSample<T>>> {
Ok(read_wl(reader, 1)?.into_iter().next())
}
pub(crate) fn read_with_mask<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
mask: u32,
) -> Result<Vec<Sample<T>>> {
read_or_take(reader, max, mask, false)
}
pub(crate) fn peek<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
) -> Result<Vec<Sample<T>>> {
if max == 0 {
return Ok(Vec::new());
}
let (raw_samples, mut ptrs, mut infos) = allocate_sample_buffers::<T>(max);
let n = unsafe {
zenrc_dds::dds_peek(reader, ptrs.as_mut_ptr(), infos.as_mut_ptr(), max, max as u32)
};
collect_samples(n, raw_samples, infos)
}
pub(crate) fn take_wl<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
) -> Result<Vec<LoanedSample<T>>> {
if max == 0 {
return Ok(Vec::new());
}
let mut ptrs = vec![std::ptr::null_mut::<c_void>(); max];
let mut infos = vec![unsafe { std::mem::zeroed::<dds_sample_info_t>() }; max];
let n = unsafe {
zenrc_dds::dds_take_wl(reader, ptrs.as_mut_ptr(), infos.as_mut_ptr(), max as u32)
};
if n < 0 {
return Err(DdsError::RetCode(n, "dds_take_wl failed".to_string()));
}
let result = ptrs.into_iter()
.zip(infos.into_iter())
.take(n as usize)
.map(|(ptr, info)| LoanedSample::new(reader, ptr, SampleInfo::from(info)))
.collect();
Ok(result)
}
pub(crate) fn take_one_wl<T: RawMessageBridge>(
reader: dds_entity_t,
) -> Result<Option<LoanedSample<T>>> {
Ok(take_wl(reader, 1)?.into_iter().next())
}
fn read_or_take<T: RawMessageBridge>(
reader: dds_entity_t,
max: usize,
mask: u32,
take: bool,
) -> Result<Vec<Sample<T>>> {
if max == 0 {
return Ok(Vec::new());
}
let (raw_samples, mut ptrs, mut infos) = allocate_sample_buffers::<T>(max);
let n = unsafe {
if take {
zenrc_dds::dds_take_mask(reader, ptrs.as_mut_ptr(), infos.as_mut_ptr(), max, max as u32, mask)
} else {
zenrc_dds::dds_read_mask(reader, ptrs.as_mut_ptr(), infos.as_mut_ptr(), max, max as u32, mask)
}
};
collect_samples(n, raw_samples, infos)
}
fn allocate_cdr_buffers(max: usize) -> (Vec<*mut ddsi_serdata>, Vec<dds_sample_info_t>) {
let ptrs = vec![std::ptr::null_mut(); max];
let infos = vec![unsafe { std::mem::zeroed() }; max];
(ptrs, infos)
}
fn collect_cdr_samples(
n: i32,
raw_cdrs: Vec<*mut ddsi_serdata>,
infos: Vec<dds_sample_info_t>,
) -> Result<Vec<CdrSample>> {
if n < 0 {
return Err(DdsError::RetCode(n, format!("collect_cdr_samples failed")));
}
let n = n as usize;
let mut result = Vec::with_capacity(n);
for (raw_cdr, raw_info) in raw_cdrs.into_iter().zip(infos.into_iter()).take(n) {
if raw_info.valid_data && !raw_cdr.is_null() {
result.push(CdrSample::new(raw_cdr, SampleInfo::from(raw_info)));
}
}
Ok(result)
}
pub(crate) fn peek_cdr(reader: dds_entity_t, max: usize) -> Result<Vec<CdrSample>> {
if max == 0 {
return Ok(Vec::new());
}
let (mut raw_cdrs, mut infos) = allocate_cdr_buffers(max);
let n = unsafe {
zenrc_dds::dds_peekcdr(
reader,
raw_cdrs.as_mut_ptr(),
max as u32,
infos.as_mut_ptr(),
zenrc_dds::DDS_ANY_STATE,
)
};
collect_cdr_samples(n, raw_cdrs, infos)
}
pub(crate) fn read_cdr(reader: dds_entity_t, max: usize) -> Result<Vec<CdrSample>> {
if max == 0 {
return Ok(Vec::new());
}
let (mut raw_cdrs, mut infos) = allocate_cdr_buffers(max);
let n = unsafe {
zenrc_dds::dds_readcdr(
reader,
raw_cdrs.as_mut_ptr(),
max as u32,
infos.as_mut_ptr(),
zenrc_dds::DDS_ANY_STATE,
)
};
collect_cdr_samples(n, raw_cdrs, infos)
}
pub(crate) fn take_cdr(reader: dds_entity_t, max: usize) -> Result<Vec<CdrSample>> {
if max == 0 {
return Ok(Vec::new());
}
let (mut raw_cdrs, mut infos) = allocate_cdr_buffers(max);
let n = unsafe {
zenrc_dds::dds_takecdr(
reader,
raw_cdrs.as_mut_ptr(),
max as u32,
infos.as_mut_ptr(),
zenrc_dds::DDS_ANY_STATE,
)
};
collect_cdr_samples(n, raw_cdrs, infos)
}
pub fn write_cdr(writer: dds_entity_t, mut cdr: CdrSample) -> Result<()> {
let ret = unsafe { zenrc_dds::dds_writecdr(writer, cdr.as_ptr()) };
if ret != 0 {
Err(DdsError::RetCode(ret, "dds_writecdr failed".to_string()))
}else {
Ok(())
}
}
pub fn forward_cdr(writer: dds_entity_t, mut cdr: CdrSample) -> Result<()> {
let ret = unsafe { zenrc_dds::dds_forwardcdr(writer, cdr.as_ptr()) };
if ret != 0 {
Err(DdsError::RetCode(ret, "dds_forwardcdr failed".to_string()))
} else {
Ok(())
}
}