use crate::{
entity::DdsEntity,
error::{check, check_entity},
serialization::CdrSerializer,
write_arena::WriteArena,
xtypes::MatchedEndpoint,
DdsResult, DdsType, Listener, Qos,
};
use cyclonedds_rust_sys::*;
use std::ffi::c_void;
use std::marker::PhantomData;
pub struct DataWriter<T: DdsType> {
entity: dds_entity_t,
_marker: PhantomData<T>,
}
impl<T: DdsType> DataWriter<T> {
fn with_native_ptr<R>(
&self,
data: &T,
f: impl FnOnce(*const c_void) -> DdsResult<R>,
) -> DdsResult<R> {
let mut arena = WriteArena::new();
let ptr = data.write_to_native(&mut arena)?;
f(ptr)
}
pub fn new(publisher: dds_entity_t, topic: dds_entity_t) -> DdsResult<Self> {
Self::with_qos_and_listener(publisher, topic, None, None)
}
pub fn with_qos(
publisher: dds_entity_t,
topic: dds_entity_t,
qos: Option<&Qos>,
) -> DdsResult<Self> {
Self::with_qos_and_listener(publisher, topic, qos, None)
}
pub fn with_listener(
publisher: dds_entity_t,
topic: dds_entity_t,
listener: &Listener,
) -> DdsResult<Self> {
Self::with_qos_and_listener(publisher, topic, None, Some(listener))
}
pub fn with_qos_and_listener(
publisher: dds_entity_t,
topic: dds_entity_t,
qos: Option<&Qos>,
listener: Option<&Listener>,
) -> DdsResult<Self> {
unsafe {
let q = qos.map_or(std::ptr::null(), |q| q.as_ptr());
let l = listener.map_or(std::ptr::null_mut(), |l| l.as_ptr());
let handle = dds_create_writer(publisher, topic, q, l);
check_entity(handle)?;
Ok(DataWriter {
entity: handle,
_marker: PhantomData,
})
}
}
pub fn write(&self, data: &T) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe { check(dds_write(self.entity, ptr)) })
}
pub fn write_dispose(&self, data: &T) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_writedispose(self.entity, ptr))
})
}
pub fn register_instance(&self, data: &T) -> DdsResult<dds_instance_handle_t> {
self.with_native_ptr(data, |ptr| unsafe {
let mut handle: dds_instance_handle_t = 0;
check(dds_register_instance(self.entity, &mut handle, ptr))?;
Ok(handle)
})
}
pub fn unregister_instance(&self, data: &T) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_unregister_instance(self.entity, ptr))
})
}
pub fn unregister_instance_handle(&self, handle: dds_instance_handle_t) -> DdsResult<()> {
unsafe { check(dds_unregister_instance_ih(self.entity, handle)) }
}
pub fn dispose(&self, data: &T) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe { check(dds_dispose(self.entity, ptr)) })
}
pub fn dispose_instance_handle(&self, handle: dds_instance_handle_t) -> DdsResult<()> {
unsafe { check(dds_dispose_ih(self.entity, handle)) }
}
pub fn lookup_instance(&self, data: &T) -> dds_instance_handle_t {
self.with_native_ptr(data, |ptr| unsafe {
Ok(dds_lookup_instance(self.entity, ptr))
})
.unwrap_or(0)
}
pub fn write_ts(&self, data: &T, timestamp: dds_time_t) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_write_ts(self.entity, ptr, timestamp))
})
}
pub fn write_dispose_ts(&self, data: &T, timestamp: dds_time_t) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_writedispose_ts(self.entity, ptr, timestamp))
})
}
pub fn write_flush(&self) -> DdsResult<()> {
unsafe { check(dds_write_flush(self.entity)) }
}
pub fn wait_for_acks(&self, timeout: dds_duration_t) -> DdsResult<()> {
unsafe { check(dds_wait_for_acks(self.entity, timeout)) }
}
pub fn assert_liveliness(&self) -> DdsResult<()> {
unsafe { check(dds_assert_liveliness(self.entity)) }
}
pub fn unregister_instance_ts(&self, data: &T, timestamp: dds_time_t) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_unregister_instance_ts(self.entity, ptr, timestamp))
})
}
pub fn unregister_instance_handle_ts(
&self,
handle: dds_instance_handle_t,
timestamp: dds_time_t,
) -> DdsResult<()> {
unsafe {
check(dds_unregister_instance_ih_ts(
self.entity,
handle,
timestamp,
))
}
}
pub fn dispose_ts(&self, data: &T, timestamp: dds_time_t) -> DdsResult<()> {
self.with_native_ptr(data, |ptr| unsafe {
check(dds_dispose_ts(self.entity, ptr, timestamp))
})
}
pub fn dispose_instance_handle_ts(
&self,
handle: dds_instance_handle_t,
timestamp: dds_time_t,
) -> DdsResult<()> {
unsafe { check(dds_dispose_ih_ts(self.entity, handle, timestamp)) }
}
pub fn write_cdr(&self, data: &[u8]) -> DdsResult<()> {
let sample = crate::serialization::CdrDeserializer::<T>::deserialize(
data,
crate::serialization::CdrEncoding::Xcdr2,
)
.or_else(|_| {
crate::serialization::CdrDeserializer::<T>::deserialize(
data,
crate::serialization::CdrEncoding::Xcdr1,
)
})?;
self.write(&sample)
}
pub fn request_loan(&self) -> DdsResult<WriteLoan<T>> {
unsafe {
let mut sample_ptr: *mut c_void = std::ptr::null_mut();
check(dds_request_loan(self.entity, &mut sample_ptr))?;
if sample_ptr.is_null() {
return Err(crate::DdsError::OutOfResources);
}
std::ptr::write_bytes(sample_ptr as *mut u8, 0, std::mem::size_of::<T>());
Ok(WriteLoan {
sample: sample_ptr as *mut T,
writer: self.entity,
written: false,
_marker: PhantomData,
})
}
}
pub fn matched_subscriptions(&self) -> DdsResult<Vec<dds_instance_handle_t>> {
unsafe {
let count = dds_get_matched_subscriptions(self.entity, std::ptr::null_mut(), 0);
if count < 0 {
return Err(crate::DdsError::from(count));
}
let count = count as usize;
if count == 0 {
return Ok(Vec::new());
}
let mut handles = vec![0; count];
let actual =
dds_get_matched_subscriptions(self.entity, handles.as_mut_ptr(), handles.len());
if actual < 0 {
return Err(crate::DdsError::from(actual));
}
handles.truncate(actual as usize);
Ok(handles)
}
}
pub fn matched_subscription_endpoints(&self) -> DdsResult<Vec<MatchedEndpoint>> {
let handles = self.matched_subscriptions()?;
handles
.into_iter()
.map(|handle| MatchedEndpoint::from_subscription(self.entity, handle))
.collect()
}
pub fn get_matched_subscription_data(
&self,
handle: dds_instance_handle_t,
) -> DdsResult<MatchedEndpoint> {
MatchedEndpoint::from_subscription(self.entity, handle)
}
pub fn serialize(&self, data: &T) -> DdsResult<Vec<u8>> {
CdrSerializer::serialize(data, crate::serialization::CdrEncoding::Xcdr1)
}
pub fn serialize_with_encoding(
&self,
data: &T,
encoding: crate::serialization::CdrEncoding,
) -> DdsResult<Vec<u8>> {
CdrSerializer::serialize(data, encoding)
}
}
impl<T: DdsType> DdsEntity for DataWriter<T> {
fn entity(&self) -> dds_entity_t {
self.entity
}
}
impl<T: DdsType> Drop for DataWriter<T> {
fn drop(&mut self) {
unsafe {
dds_delete(self.entity);
}
}
}
pub struct WriteLoan<T: DdsType> {
sample: *mut T,
writer: dds_entity_t,
written: bool,
_marker: PhantomData<T>,
}
impl<T: DdsType> WriteLoan<T> {
pub fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.sample }
}
pub fn write(mut loan: Self) -> DdsResult<()> {
loan.written = true;
unsafe {
let ret = dds_write(loan.writer, loan.sample as *const c_void);
check(ret)
}
}
}
impl<T: DdsType> Drop for WriteLoan<T> {
fn drop(&mut self) {
if !self.written && !self.sample.is_null() {
unsafe {
let mut ptr = self.sample as *mut c_void;
let _ = dds_return_loan(self.writer, &mut ptr, 1);
}
}
}
}