use crate::{
entity::DdsEntity,
error::{check, check_entity},
DdsError, DdsResult, DdsType, Topic,
};
use cyclonedds_rust_sys::*;
use std::ffi::c_void;
use std::marker::PhantomData;
use std::rc::Rc;
use crate::topic::{OP_RTS, OP_KOF};
struct FilterArg<T> {
type_id: std::any::TypeId,
filter: Box<dyn Fn(&T) -> bool + Send + Sync>,
}
unsafe extern "C" fn trampoline_filter_sample_arg<T: DdsType + 'static>(
sample: *const c_void,
arg: *mut c_void,
) -> bool {
if arg.is_null() || sample.is_null() {
return true; }
let fa: &FilterArg<T> = &*(arg as *const FilterArg<T>);
if fa.type_id != std::any::TypeId::of::<T>() {
return true;
}
let data = T::clone_out(sample as *const T);
(fa.filter)(&data)
}
struct CftDescriptorHolder {
_ops: Vec<u32>,
_typename: std::ffi::CString,
_key_names: Vec<std::ffi::CString>,
_keys: Vec<dds_key_descriptor>,
_meta: std::ffi::CString,
}
pub struct ContentFilteredTopic<T: DdsType> {
entity: dds_entity_t,
_filter_arg: Option<Box<FilterArg<T>>>,
_desc_holder: Rc<CftDescriptorHolder>,
_marker: PhantomData<T>,
}
impl<T: DdsType + 'static> ContentFilteredTopic<T> {
pub fn new<F>(topic: &Topic<T>, filter: F) -> DdsResult<Self>
where
F: Fn(&T) -> bool + Send + Sync + 'static,
{
let participant = unsafe { dds_get_participant(topic.entity()) };
check_entity(participant)?;
let (handle, desc_holder) =
Self::create_sibling_topic(participant, topic.entity())?;
let filter_arg: Box<FilterArg<T>> = Box::new(FilterArg {
type_id: std::any::TypeId::of::<T>(),
filter: Box::new(filter),
});
let arg_ptr = &*filter_arg as *const FilterArg<T> as *mut c_void;
unsafe {
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_SAMPLE_ARG;
dds_filter.f.sample_arg = Some(trampoline_filter_sample_arg::<T>);
dds_filter.arg = arg_ptr;
let ret =
dds_set_topic_filter_extended(handle, &dds_filter as *const dds_topic_filter);
if ret < 0 {
dds_delete(handle);
return Err(DdsError::from(ret));
}
}
Ok(ContentFilteredTopic {
entity: handle,
_filter_arg: Some(filter_arg),
_desc_holder: desc_holder,
_marker: PhantomData,
})
}
pub fn set_filter<F>(&mut self, filter: F) -> DdsResult<()>
where
F: Fn(&T) -> bool + Send + Sync + 'static,
{
let filter_arg: Box<FilterArg<T>> = Box::new(FilterArg {
type_id: std::any::TypeId::of::<T>(),
filter: Box::new(filter),
});
let arg_ptr = &*filter_arg as *const FilterArg<T> as *mut c_void;
unsafe {
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_SAMPLE_ARG;
dds_filter.f.sample_arg = Some(trampoline_filter_sample_arg::<T>);
dds_filter.arg = arg_ptr;
let ret =
dds_set_topic_filter_extended(self.entity(), &dds_filter as *const dds_topic_filter);
check(ret)?;
}
self._filter_arg = Some(filter_arg);
Ok(())
}
pub fn clear_filter(&mut self) -> DdsResult<()> {
unsafe {
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_NONE;
let ret =
dds_set_topic_filter_extended(self.entity(), &dds_filter as *const dds_topic_filter);
check(ret)?;
}
self._filter_arg = None;
Ok(())
}
fn create_sibling_topic(
participant: dds_entity_t,
original: dds_entity_t,
) -> DdsResult<(dds_entity_t, Rc<CftDescriptorHolder>)> {
unsafe {
let type_name = std::ffi::CString::new(T::type_name())
.map_err(|_| DdsError::BadParameter("type name contains null".into()))?;
let topic_name_c = {
let mut buf = vec![0u8; 256];
let n = dds_get_name(original, buf.as_mut_ptr() as *mut i8, buf.len());
if n < 0 {
return Err(DdsError::from(n));
}
buf.truncate(n as usize);
let original_name = String::from_utf8_lossy(&buf);
let clone_name = format!("{}_cft_{}", original_name, original);
std::ffi::CString::new(clone_name)
.map_err(|_| DdsError::BadParameter("topic name contains null".into()))?
};
let mut ops = T::ops();
if ops.last().copied() != Some(OP_RTS) {
ops.push(OP_RTS);
}
let key_defs = T::keys();
let key_names: Vec<std::ffi::CString> = key_defs
.iter()
.map(|k| std::ffi::CString::new(k.name.as_str()).unwrap())
.collect();
let mut keys: Vec<dds_key_descriptor> = Vec::with_capacity(key_defs.len());
for (i, kd) in key_defs.iter().enumerate() {
let offset = ops.len() as u32;
ops.push(OP_KOF | (kd.ops_path.len() as u32));
ops.extend(kd.ops_path.iter().copied());
keys.push(dds_key_descriptor {
m_name: key_names[i].as_ptr(),
m_offset: offset,
m_idx: i as u32,
});
}
let post_key_ops = T::post_key_ops();
if !post_key_ops.is_empty() {
ops.extend(post_key_ops);
}
let meta = std::ffi::CString::new("").unwrap();
let descriptor = dds_topic_descriptor {
m_size: T::descriptor_size(),
m_align: T::descriptor_align(),
m_flagset: T::flagset(),
m_nkeys: T::key_count() as u32,
m_typename: type_name.as_ptr(),
m_keys: if keys.is_empty() {
std::ptr::null()
} else {
keys.as_ptr()
},
m_nops: ops.len() as u32,
m_ops: ops.as_ptr(),
m_meta: meta.as_ptr(),
type_information: std::mem::zeroed(),
type_mapping: std::mem::zeroed(),
restrict_data_representation: 0,
};
let handle = dds_create_topic(
participant,
&descriptor,
topic_name_c.as_ptr(),
std::ptr::null(),
std::ptr::null(),
);
check_entity(handle)?;
let holder = Rc::new(CftDescriptorHolder {
_ops: ops,
_typename: type_name,
_key_names: key_names,
_keys: keys,
_meta: meta,
});
Ok((handle, holder))
}
}
}
impl<T: DdsType> DdsEntity for ContentFilteredTopic<T> {
fn entity(&self) -> dds_entity_t {
self.entity
}
}
impl<T: DdsType> Drop for ContentFilteredTopic<T> {
fn drop(&mut self) {
unsafe {
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_NONE;
dds_set_topic_filter_extended(self.entity, &dds_filter as *const dds_topic_filter);
dds_delete(self.entity);
}
}
}
pub trait TopicFilterExt<T: DdsType + 'static> {
fn set_filter<F: Fn(&T) -> bool + Send + Sync + 'static>(&self, filter: F) -> DdsResult<()>;
fn clear_filter(&self) -> DdsResult<()>;
}
impl<T: DdsType + 'static> TopicFilterExt<T> for Topic<T> {
fn set_filter<F: Fn(&T) -> bool + Send + Sync + 'static>(&self, filter: F) -> DdsResult<()> {
let old_arg = unsafe {
let mut old_fn: dds_topic_filter_arg_fn = None;
let mut old_arg: *mut c_void = std::ptr::null_mut();
let _ = dds_get_topic_filter_and_arg(self.entity(), &mut old_fn, &mut old_arg);
old_arg
};
let filter_arg: Box<FilterArg<T>> = Box::new(FilterArg {
type_id: std::any::TypeId::of::<T>(),
filter: Box::new(filter),
});
let arg_ptr = Box::into_raw(filter_arg) as *mut c_void;
unsafe {
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_SAMPLE_ARG;
dds_filter.f.sample_arg = Some(trampoline_filter_sample_arg::<T>);
dds_filter.arg = arg_ptr;
let ret =
dds_set_topic_filter_extended(self.entity(), &dds_filter as *const dds_topic_filter);
if ret < 0 {
let _ = Box::from_raw(arg_ptr as *mut FilterArg<T>);
return Err(DdsError::from(ret));
}
}
if !old_arg.is_null() {
unsafe {
let _ = Box::from_raw(old_arg as *mut FilterArg<T>);
}
}
Ok(())
}
fn clear_filter(&self) -> DdsResult<()> {
unsafe {
let mut old_fn: dds_topic_filter_arg_fn = None;
let mut old_arg: *mut c_void = std::ptr::null_mut();
let _ = dds_get_topic_filter_and_arg(
self.entity(),
&mut old_fn,
&mut old_arg,
);
let mut dds_filter: dds_topic_filter = std::mem::zeroed();
dds_filter.mode = dds_topic_filter_mode_DDS_TOPIC_FILTER_NONE;
let ret =
dds_set_topic_filter_extended(self.entity(), &dds_filter as *const dds_topic_filter);
check(ret)?;
if !old_arg.is_null() {
let _ = Box::from_raw(old_arg as *mut FilterArg<T>);
}
}
Ok(())
}
}