use alloc::sync::Arc;
use alloc::vec::Vec;
use core::pin::Pin;
use core::task::{Context, Poll};
use core::time::Duration;
use futures_core::Stream;
use zerodds_dcps::{DataReader, DataReaderQos, DdsType, Result};
pub struct AsyncDataReader<T: DdsType + Send + Sync + 'static> {
inner: Arc<DataReader<T>>,
}
impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataReader<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T: DdsType + Send + Sync + 'static> AsyncDataReader<T> {
pub(crate) fn from_sync(inner: DataReader<T>) -> Self {
Self {
inner: Arc::new(inner),
}
}
#[must_use]
pub fn take_stream(&self) -> SampleStream<T> {
SampleStream {
reader: Arc::clone(&self.inner),
buffered: Vec::new(),
poll_interval: Duration::from_millis(5),
sleep_until: None,
}
}
pub async fn take(&self, timeout: Duration) -> Result<Vec<T>> {
let deadline = std::time::Instant::now() + timeout;
loop {
let samples = self.inner.take()?;
if !samples.is_empty() {
return Ok(samples);
}
if std::time::Instant::now() >= deadline {
return Ok(Vec::new());
}
crate::yield_for(Duration::from_millis(10)).await;
}
}
pub async fn wait_for_matched_publication(
&self,
min_count: usize,
timeout: Duration,
) -> Result<()> {
let deadline = std::time::Instant::now() + timeout;
loop {
if self.inner.matched_publication_count() >= min_count {
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(zerodds_dcps::DdsError::Timeout);
}
crate::yield_for(Duration::from_millis(10)).await;
}
}
#[must_use]
pub fn matched_publication_count(&self) -> usize {
self.inner.matched_publication_count()
}
#[must_use]
pub fn as_sync(&self) -> &DataReader<T> {
&self.inner
}
#[must_use]
pub fn qos(&self) -> DataReaderQos {
self.inner.qos()
}
#[must_use]
pub fn data_available_stream(&self) -> DataAvailableStream<T> {
DataAvailableStream {
reader: Arc::clone(&self.inner),
poll_interval: Duration::from_millis(10),
sleep_until: None,
last_seen_count: 0,
}
}
#[must_use]
pub fn publication_matched_stream(&self) -> PublicationMatchedStream<T> {
PublicationMatchedStream {
reader: Arc::clone(&self.inner),
poll_interval: Duration::from_millis(20),
sleep_until: None,
last_count: usize::MAX,
}
}
}
pub struct SampleStream<T: DdsType + Send + Sync + 'static> {
reader: Arc<DataReader<T>>,
buffered: Vec<T>,
poll_interval: Duration,
sleep_until: Option<std::time::Instant>,
}
impl<T: DdsType + Send + Sync + 'static> Unpin for SampleStream<T> {}
pub struct DataAvailableStream<T: DdsType + Send + Sync + 'static> {
reader: Arc<DataReader<T>>,
poll_interval: Duration,
sleep_until: Option<std::time::Instant>,
last_seen_count: usize,
}
impl<T: DdsType + Send + Sync + 'static> Unpin for DataAvailableStream<T> {}
impl<T: DdsType + Send + Sync + 'static> Stream for DataAvailableStream<T> {
type Item = ();
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
let this = self.get_mut();
let cur_count = match this.reader.read() {
Ok(samples) => samples.len(),
Err(_) => {
return Poll::Ready(None);
}
};
if cur_count > this.last_seen_count {
this.last_seen_count = cur_count;
return Poll::Ready(Some(()));
}
if let Some((rt, eid)) = this.reader.runtime_handle() {
rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
return Poll::Pending;
}
if let Some(deadline) = this.sleep_until {
if std::time::Instant::now() < deadline {
schedule_wake(cx, deadline);
return Poll::Pending;
}
this.sleep_until = None;
}
this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
schedule_wake_in(cx, this.poll_interval);
Poll::Pending
}
}
pub struct PublicationMatchedStream<T: DdsType + Send + Sync + 'static> {
reader: Arc<DataReader<T>>,
poll_interval: Duration,
sleep_until: Option<std::time::Instant>,
last_count: usize,
}
impl<T: DdsType + Send + Sync + 'static> Unpin for PublicationMatchedStream<T> {}
impl<T: DdsType + Send + Sync + 'static> Stream for PublicationMatchedStream<T> {
type Item = zerodds_dcps::status::SubscriptionMatchedStatus;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(deadline) = this.sleep_until {
if std::time::Instant::now() < deadline {
schedule_wake(cx, deadline);
return Poll::Pending;
}
this.sleep_until = None;
}
let now_count = this.reader.matched_publication_count();
if now_count != this.last_count {
let prev_known = if this.last_count == usize::MAX {
0
} else {
this.last_count
};
this.last_count = now_count;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let now_i = now_count as i32;
#[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
let prev_i = prev_known as i32;
let delta = now_i - prev_i;
let status = zerodds_dcps::status::SubscriptionMatchedStatus {
total_count: now_i.max(prev_i),
total_count_change: delta.max(0),
current_count: now_i,
current_count_change: delta,
last_publication_handle: zerodds_dcps::HANDLE_NIL,
};
Poll::Ready(Some(status))
} else {
this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
schedule_wake_in(cx, this.poll_interval);
Poll::Pending
}
}
}
fn schedule_wake(cx: &mut Context<'_>, deadline: std::time::Instant) {
let waker = cx.waker().clone();
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
std::thread::spawn(move || {
std::thread::sleep(remaining);
waker.wake();
});
}
fn schedule_wake_in(cx: &mut Context<'_>, interval: Duration) {
let waker = cx.waker().clone();
std::thread::spawn(move || {
std::thread::sleep(interval);
waker.wake();
});
}
impl<T: DdsType + Send + Sync + 'static> Stream for SampleStream<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let this = self.get_mut();
if !this.buffered.is_empty() {
return Poll::Ready(Some(this.buffered.remove(0)));
}
if let Some(deadline) = this.sleep_until {
if std::time::Instant::now() < deadline {
let waker = cx.waker().clone();
let remaining = deadline.saturating_duration_since(std::time::Instant::now());
std::thread::spawn(move || {
std::thread::sleep(remaining);
waker.wake();
});
return Poll::Pending;
}
this.sleep_until = None;
}
match this.reader.take() {
Ok(mut samples) if !samples.is_empty() => {
let first = samples.remove(0);
this.buffered = samples;
Poll::Ready(Some(first))
}
Ok(_) => {
if let Some((rt, eid)) = this.reader.runtime_handle() {
rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
return Poll::Pending;
}
this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
let waker = cx.waker().clone();
let interval = this.poll_interval;
std::thread::spawn(move || {
std::thread::sleep(interval);
waker.wake();
});
Poll::Pending
}
Err(_) => Poll::Ready(None),
}
}
}