use crate::subscriber::Subscriber;
use crate::types::TopicId;
use rustecal_core::types::DataTypeInfo;
use rustecal_sys::{eCAL_SDataTypeInformation, eCAL_SReceiveCallbackData, eCAL_STopicId};
use std::{
ffi::{CStr, c_void},
marker::PhantomData,
slice,
};
pub trait SubscriberMessage<'a>: Sized {
fn datatype() -> DataTypeInfo;
fn from_bytes(bytes: &'a [u8], data_type_info: &DataTypeInfo) -> Option<Self>;
}
pub struct Received<T> {
pub payload: T,
pub topic_name: String,
pub encoding: String,
pub type_name: String,
pub timestamp: i64,
pub clock: i64,
}
struct CallbackWrapper<'buf, T: SubscriberMessage<'buf>> {
callback: Box<dyn Fn(Received<T>) + Send + Sync + 'static>,
_phantom: PhantomData<&'buf T>,
}
impl<'buf, T: SubscriberMessage<'buf>> CallbackWrapper<'buf, T> {
fn new<F>(f: F) -> Self
where
F: Fn(Received<T>) + Send + Sync + 'static,
{
Self {
callback: Box::new(f),
_phantom: PhantomData,
}
}
fn call(&self, received: Received<T>) {
(self.callback)(received);
}
}
pub struct TypedSubscriber<'buf, T: SubscriberMessage<'buf>> {
subscriber: Subscriber,
user_data: *mut CallbackWrapper<'buf, T>,
_phantom: PhantomData<&'buf T>,
}
impl<'buf, T: SubscriberMessage<'buf>> TypedSubscriber<'buf, T> {
pub fn new(topic_name: &str) -> Result<Self, String> {
let datatype = T::datatype();
let boxed = Box::new(CallbackWrapper::new(|_| {}));
let user_data = Box::into_raw(boxed);
let subscriber = Subscriber::new(topic_name, datatype, trampoline::<'buf, T>)?;
Ok(Self {
subscriber,
user_data,
_phantom: PhantomData,
})
}
pub fn set_callback<F>(&mut self, callback: F)
where
F: Fn(Received<T>) + Send + Sync + 'static,
{
unsafe {
let _ = Box::from_raw(self.user_data);
}
let boxed = Box::new(CallbackWrapper::new(callback));
self.user_data = Box::into_raw(boxed);
unsafe {
rustecal_sys::eCAL_Subscriber_SetReceiveCallback(
self.subscriber.raw_handle(),
Some(trampoline::<'buf, T>),
self.user_data as *mut _,
);
}
}
pub fn get_publisher_count(&self) -> usize {
self.subscriber.get_publisher_count()
}
pub fn get_topic_name(&self) -> Option<String> {
self.subscriber.get_topic_name()
}
pub fn get_topic_id(&self) -> Option<TopicId> {
self.subscriber.get_topic_id()
}
pub fn get_data_type_information(&self) -> Option<DataTypeInfo> {
self.subscriber.get_data_type_information()
}
}
impl<'buf, T: SubscriberMessage<'buf>> Drop for TypedSubscriber<'buf, T> {
fn drop(&mut self) {
unsafe {
rustecal_sys::eCAL_Subscriber_RemoveReceiveCallback(self.subscriber.raw_handle());
let _ = Box::from_raw(self.user_data);
}
}
}
extern "C" fn trampoline<'buf, T: SubscriberMessage<'buf> + 'buf>(
topic_id: *const eCAL_STopicId,
data_type_info: *const eCAL_SDataTypeInformation,
data: *const eCAL_SReceiveCallbackData,
user_data: *mut c_void,
) {
unsafe {
if data.is_null() || user_data.is_null() {
return;
}
let rd = &*data;
let payload = slice::from_raw_parts(rd.buffer as *const u8, rd.buffer_size);
let info = &*data_type_info;
let encoding = CStr::from_ptr(info.encoding).to_string_lossy().into_owned();
let type_name = CStr::from_ptr(info.name).to_string_lossy().into_owned();
let descriptor = if info.descriptor.is_null() || info.descriptor_length == 0 {
Vec::new()
} else {
slice::from_raw_parts(info.descriptor as *const u8, info.descriptor_length).to_vec()
};
let dt_info = DataTypeInfo {
encoding: encoding.clone(),
type_name: type_name.clone(),
descriptor,
};
if let Some(decoded) = T::from_bytes(payload, &dt_info) {
let cb_wrapper = &*(user_data as *const CallbackWrapper<'buf, T>);
let topic_name = CStr::from_ptr((*topic_id).topic_name)
.to_string_lossy()
.into_owned();
let received = Received {
payload: decoded,
topic_name,
encoding: encoding.clone(),
type_name: type_name.clone(),
timestamp: rd.send_timestamp,
clock: rd.send_clock,
};
cb_wrapper.call(received);
}
}
}