use alloc::sync::Arc;
use core::time::Duration;
use zerodds_dcps::{DataWriter, DataWriterQos, DdsType, InstanceHandle, Result};
pub struct AsyncDataWriter<T: DdsType + Send + Sync + 'static> {
inner: Arc<DataWriter<T>>,
}
impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataWriter<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: DdsType + Send + Sync + 'static> AsyncDataWriter<T> {
pub(crate) fn from_sync(inner: DataWriter<T>) -> Self {
Self {
inner: Arc::new(inner),
}
}
pub async fn write(&self, sample: &T) -> Result<()>
where
T: Clone,
{
let max_block = self.inner.qos().reliability.max_blocking_time;
let max_block_nanos = max_block.to_nanos();
let safety_cap = core::time::Duration::from_secs(1);
let deadline = if max_block_nanos == u128::MAX {
None
} else {
#[allow(clippy::cast_possible_truncation)]
let secs = (max_block_nanos / 1_000_000_000) as u64;
#[allow(clippy::cast_possible_truncation)]
let nanos = (max_block_nanos % 1_000_000_000) as u32;
Some(std::time::Instant::now() + core::time::Duration::new(secs, nanos))
};
let s = sample.clone();
loop {
match self.inner.write(&s) {
Ok(()) => return Ok(()),
Err(zerodds_dcps::DdsError::OutOfResources { .. }) => {
if let Some(d) = deadline {
if std::time::Instant::now() >= d {
return Err(zerodds_dcps::DdsError::Timeout);
}
}
crate::yield_for(core::time::Duration::from_millis(2)).await;
}
Err(other) => return Err(other),
}
if deadline.is_none() {
let _ = safety_cap;
}
}
}
pub async fn register_instance(&self, sample: &T) -> Result<InstanceHandle> {
self.inner.register_instance(sample)
}
pub async fn dispose(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
self.inner.dispose(sample, handle)
}
pub async fn unregister_instance(&self, sample: &T, handle: InstanceHandle) -> Result<()> {
self.inner.unregister_instance(sample, handle)
}
pub async fn wait_for_matched_subscription(
&self,
min_count: usize,
timeout: Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
loop {
if self.inner.matched_subscription_count() >= min_count {
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(zerodds_dcps::DdsError::Timeout);
}
crate::yield_for(Duration::from_millis(10)).await;
}
}
#[must_use]
pub fn matched_subscription_count(&self) -> usize {
self.inner.matched_subscription_count()
}
#[must_use]
pub fn as_sync(&self) -> &DataWriter<T> {
&self.inner
}
#[must_use]
pub fn qos(&self) -> DataWriterQos {
self.inner.qos()
}
}