use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use zenrc_dds::{
CdrSample, LoanedSample, RawMessageBridge, Sample, dds_entity_t, dds_instance_handle_t
};
use super::common::{
peek_cdr,
read_cdr,
read_one_wl,
read_wl,
take_cdr,
take_one,
take_one_wl,
take_wl,
};
use super::error::{DdsError, Result, check_entity, check_ret};
use super::topic::Topic;
pub struct Subscription<T: RawMessageBridge> {
reader: dds_entity_t,
topic: Topic<T>,
_marker: PhantomData<T>,
notify: Option<Arc<tokio::sync::Notify>>,
}
impl<T: RawMessageBridge> Subscription<T> {
pub(crate) fn new(reader: dds_entity_t, topic: Topic<T>) -> Self {
Self {
reader,
topic,
_marker: PhantomData,
notify: None,
}
}
pub(crate) fn with_context(
&mut self,
context: &super::context::DdsContext,
) {
let notify = Some(context.attach(self.reader));
self.notify = notify;
}
pub fn subscription_matched_status(
&self,
) -> Result<zenrc_dds::dds_subscription_matched_status_t> {
let mut status = unsafe { std::mem::zeroed() };
check_ret(unsafe {
zenrc_dds::dds_get_subscription_matched_status(self.reader, &mut status)
})?;
Ok(status)
}
pub fn sample_lost_status(&self) -> Result<zenrc_dds::dds_sample_lost_status_t> {
let mut status = unsafe { std::mem::zeroed() };
check_ret(unsafe { zenrc_dds::dds_get_sample_lost_status(self.reader, &mut status) })?;
Ok(status)
}
pub fn matched_publications(&self) -> Result<Vec<dds_instance_handle_t>> {
const MAX: usize = 64;
let mut handles = vec![0u64; MAX];
let ret = unsafe {
zenrc_dds::dds_get_matched_publications(self.reader, handles.as_mut_ptr(), MAX)
};
let n = check_entity(ret)? as usize;
handles.truncate(n);
Ok(handles)
}
pub fn wait_for_historical_data(&self, max_wait: std::time::Duration) -> Result<()> {
check_ret(unsafe {
zenrc_dds::dds_reader_wait_for_historical_data(
self.reader,
super::qos::duration_to_nanos(max_wait),
)
})
}
pub fn entity(&self) -> dds_entity_t {
self.reader
}
pub fn topic_entity(&self) -> dds_entity_t {
self.topic.entity
}
pub fn read_wl(&self, max: usize) -> Result<Vec<LoanedSample<T>>> {
read_wl(self.reader, max)
}
pub fn read_one_wl(&self) -> Result<Option<LoanedSample<T>>> {
read_one_wl(self.reader)
}
pub fn take_wl(&self, max: usize) -> Result<Vec<LoanedSample<T>>> {
take_wl(self.reader, max)
}
pub fn take_one_wl(&self) -> Result<Option<LoanedSample<T>>> {
take_one_wl(self.reader)
}
pub fn take_cdr(&self, max: usize) -> Result<Vec<CdrSample>> {
take_cdr(self.reader, max)
}
pub fn read_cdr(&self, max: usize) -> Result<Vec<CdrSample>> {
read_cdr(self.reader, max)
}
pub fn peek_cdr(&self, max: usize) -> Result<Vec<CdrSample>> {
peek_cdr(self.reader, max)
}
}
impl<T: RawMessageBridge + Send + 'static> Subscription<T> {
pub fn set_event<F>(&self, handler: F) -> Result<tokio::task::JoinHandle<()>>
where
F: Fn(Sample<T>) + Send + Sync + 'static,
{
let notify = match self.notify.clone() {
Some(n) => n,
None => {
return Err(DdsError::NullPtr(
"订阅未附加到任何 DdsContext,无法设置事件回调".into(),
));
}
};
let reader = self.reader;
let handler = Arc::new(handler);
Ok(tokio::task::spawn(async move {
loop {
notify.notified().await;
loop {
match take_one::<T>(reader) {
Ok(Some(sample)) => (handler)(sample),
Ok(None) => break,
Err(_) => break,
}
}
}
}))
}
}
impl<T: RawMessageBridge> Drop for Subscription<T> {
fn drop(&mut self) {
unsafe { zenrc_dds::dds_delete(self.reader) };
}
}
unsafe impl<T: RawMessageBridge> Send for Subscription<T> {}
unsafe impl<T: RawMessageBridge> Sync for Subscription<T> {}