use crate::{DdsError, DdsResult, DdsType};
use cyclonedds_rust_sys::*;
use std::ffi::c_void;
use std::marker::PhantomData;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CdrEncoding {
#[default]
Xcdr1,
Xcdr2,
}
impl CdrEncoding {
fn as_xcdr_version(self) -> u32 {
match self {
Self::Xcdr1 => 1,
Self::Xcdr2 => 2,
}
}
}
pub struct CdrSample {
pub data: Vec<u8>,
pub info: dds_sample_info_t,
}
pub struct CdrSerializer<'a, T: DdsType> {
_marker: PhantomData<&'a T>,
}
impl<'a, T: DdsType> CdrSerializer<'a, T> {
pub fn serialize(sample: &T, encoding: CdrEncoding) -> DdsResult<Vec<u8>> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut os: dds_ostream_t = std::mem::zeroed();
dds_ostream_init(
&mut os,
&dds_cdrstream_default_allocator,
0,
encoding.as_xcdr_version(),
);
let mut arena = crate::write_arena::WriteArena::new();
let data_ptr = sample.write_to_native(&mut arena)?;
let ok = dds_stream_write_sample(
&mut os,
&dds_cdrstream_default_allocator,
data_ptr,
desc.as_ptr(),
);
let bytes = if ok {
let len = os.m_index as usize;
let mut buf = vec![0u8; len];
std::ptr::copy_nonoverlapping(os.m_buffer, buf.as_mut_ptr(), len);
Ok(buf)
} else {
Err(DdsError::Unsupported("CDR serialization failed".into()))
};
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
bytes
}
}
pub fn serialize_key(sample: &T, encoding: CdrEncoding) -> DdsResult<Vec<u8>> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut os: dds_ostream_t = std::mem::zeroed();
dds_ostream_init(
&mut os,
&dds_cdrstream_default_allocator,
0,
encoding.as_xcdr_version(),
);
let mut arena = crate::write_arena::WriteArena::new();
let data_ptr = sample.write_to_native(&mut arena)?;
dds_stream_write_key(
&mut os,
cyclonedds_rust_sys::dds_cdr_key_serialization_kind_DDS_CDR_KEY_SERIALIZATION_SAMPLE,
&dds_cdrstream_default_allocator,
data_ptr as *const _,
desc.as_ptr(),
);
let len = os.m_index as usize;
let mut buf = vec![0u8; len];
std::ptr::copy_nonoverlapping(os.m_buffer, buf.as_mut_ptr(), len);
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
Ok(buf)
}
}
pub fn serialize_to_buffer(
sample: &T,
buf: &mut [u8],
encoding: CdrEncoding,
) -> DdsResult<usize> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut os: dds_ostream_t = std::mem::zeroed();
dds_ostream_init(
&mut os,
&dds_cdrstream_default_allocator,
0,
encoding.as_xcdr_version(),
);
let mut arena = crate::write_arena::WriteArena::new();
let data_ptr = sample.write_to_native(&mut arena)?;
let ok = dds_stream_write_sample(
&mut os,
&dds_cdrstream_default_allocator,
data_ptr,
desc.as_ptr(),
);
if !ok {
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
return Err(DdsError::Unsupported("CDR serialization failed".into()));
}
let len = os.m_index as usize;
if len > buf.len() {
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
return Err(DdsError::BadParameter(format!(
"buffer too small: needed {} bytes, got {}",
len,
buf.len()
)));
}
std::ptr::copy_nonoverlapping(os.m_buffer, buf.as_mut_ptr(), len);
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
Ok(len)
}
}
pub fn serialize_key_to_buffer(
sample: &T,
buf: &mut [u8],
encoding: CdrEncoding,
) -> DdsResult<usize> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut os: dds_ostream_t = std::mem::zeroed();
dds_ostream_init(
&mut os,
&dds_cdrstream_default_allocator,
0,
encoding.as_xcdr_version(),
);
let mut arena = crate::write_arena::WriteArena::new();
let data_ptr = sample.write_to_native(&mut arena)?;
dds_stream_write_key(
&mut os,
cyclonedds_rust_sys::dds_cdr_key_serialization_kind_DDS_CDR_KEY_SERIALIZATION_SAMPLE,
&dds_cdrstream_default_allocator,
data_ptr as *const _,
desc.as_ptr(),
);
let len = os.m_index as usize;
if len > buf.len() {
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
return Err(DdsError::BadParameter(format!(
"buffer too small: needed {} bytes, got {}",
len,
buf.len()
)));
}
std::ptr::copy_nonoverlapping(os.m_buffer, buf.as_mut_ptr(), len);
dds_ostream_fini(&mut os, &dds_cdrstream_default_allocator);
Ok(len)
}
}
}
pub struct CdrDeserializer<T: DdsType> {
_marker: PhantomData<T>,
}
impl<T: DdsType> CdrDeserializer<T> {
pub fn deserialize(data: &[u8], encoding: CdrEncoding) -> DdsResult<T> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut is: dds_istream_t = std::mem::zeroed();
dds_istream_init(
&mut is,
data.len() as u32,
data.as_ptr() as *const c_void,
encoding.as_xcdr_version(),
);
let size = T::descriptor_size() as usize;
let align = std::cmp::max(T::descriptor_align() as usize, 1);
let layout = std::alloc::Layout::from_size_align(size, align)
.map_err(|_| DdsError::BadParameter("invalid type layout".into()))?;
let buf = std::alloc::alloc_zeroed(layout);
if buf.is_null() {
return Err(DdsError::OutOfMemory);
}
dds_stream_read_sample(
&mut is,
buf as *mut c_void,
&dds_cdrstream_default_allocator,
desc.as_ptr(),
);
let result = T::clone_out(buf as *const T);
dds_stream_free_sample(
buf as *mut c_void,
&dds_cdrstream_default_allocator,
desc.ops_ptr(),
);
std::alloc::dealloc(buf, layout);
dds_istream_fini(&mut is);
Ok(result)
}
}
pub fn deserialize_key(data: &[u8], encoding: CdrEncoding) -> DdsResult<T> {
let desc = CdrStreamDesc::new::<T>()?;
unsafe {
let mut is: dds_istream_t = std::mem::zeroed();
dds_istream_init(
&mut is,
data.len() as u32,
data.as_ptr() as *const c_void,
encoding.as_xcdr_version(),
);
let size = T::descriptor_size() as usize;
let align = std::cmp::max(T::descriptor_align() as usize, 1);
let layout = std::alloc::Layout::from_size_align(size, align)
.map_err(|_| DdsError::BadParameter("invalid type layout".into()))?;
let buf = std::alloc::alloc_zeroed(layout);
if buf.is_null() {
return Err(DdsError::OutOfMemory);
}
dds_stream_read_key(
&mut is,
buf as *mut ::std::ffi::c_char,
&dds_cdrstream_default_allocator,
desc.as_ptr(),
);
let result = T::clone_out(buf as *const T);
dds_stream_free_sample(
buf as *mut ::std::ffi::c_void,
&dds_cdrstream_default_allocator,
desc.ops_ptr(),
);
std::alloc::dealloc(buf, layout);
dds_istream_fini(&mut is);
Ok(result)
}
}
}
struct CdrStreamDesc {
desc: dds_cdrstream_desc,
_ops: Vec<u32>,
_key_names: Vec<std::ffi::CString>,
_keys: Vec<dds_key_descriptor>,
}
impl CdrStreamDesc {
fn new<T: DdsType>() -> DdsResult<Self> {
let mut ops = T::ops();
if ops.last().copied() != Some(crate::topic::OP_RTS) {
ops.push(crate::topic::OP_RTS);
}
let key_defs = T::keys();
let key_names: Vec<std::ffi::CString> = key_defs
.iter()
.map(|k| std::ffi::CString::new(k.name.as_str()).unwrap())
.collect();
let mut keys: Vec<dds_key_descriptor> = Vec::with_capacity(key_defs.len());
for (i, kd) in key_defs.iter().enumerate() {
let offset = ops.len() as u32;
ops.push(crate::topic::OP_KOF | (kd.ops_path.len() as u32));
ops.extend(kd.ops_path.iter().copied());
keys.push(dds_key_descriptor {
m_name: key_names[i].as_ptr(),
m_offset: offset,
m_idx: i as u32,
});
}
let post_key_ops = T::post_key_ops();
if !post_key_ops.is_empty() {
ops.extend(post_key_ops);
}
unsafe {
let mut desc: dds_cdrstream_desc = std::mem::zeroed();
dds_cdrstream_desc_init(
&mut desc,
&dds_cdrstream_default_allocator,
T::descriptor_size(),
T::descriptor_align(),
T::flagset(),
ops.as_ptr(),
if keys.is_empty() {
std::ptr::null()
} else {
keys.as_ptr()
},
keys.len() as u32,
);
Ok(Self {
desc,
_ops: ops,
_key_names: key_names,
_keys: keys,
})
}
}
fn as_ptr(&self) -> *const dds_cdrstream_desc {
&self.desc
}
fn ops_ptr(&self) -> *const u32 {
self.desc.ops.ops as *const u32
}
}
impl Drop for CdrStreamDesc {
fn drop(&mut self) {
unsafe {
dds_cdrstream_desc_fini(&mut self.desc, &dds_cdrstream_default_allocator);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Debug, PartialEq, Clone)]
#[allow(dead_code)]
struct Point {
x: i32,
y: i32,
}
impl DdsType for Point {
fn type_name() -> &'static str {
"test::Point"
}
fn ops() -> Vec<u32> {
use crate::topic::*;
let mut ops = Vec::new();
ops.extend(adr(TYPE_4BY, 0)); ops.extend(adr(TYPE_4BY, 4)); ops
}
fn descriptor_size() -> u32 {
8
}
fn descriptor_align() -> u32 {
4
}
}
#[test]
fn cdr_encoding_default_is_xcdr1() {
assert_eq!(CdrEncoding::default(), CdrEncoding::Xcdr1);
assert_eq!(CdrEncoding::Xcdr1.as_xcdr_version(), 1);
assert_eq!(CdrEncoding::Xcdr2.as_xcdr_version(), 2);
}
}