zerodds_dcps_async/
reader.rs1use alloc::sync::Arc;
6use alloc::vec::Vec;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use core::time::Duration;
10
11use futures_core::Stream;
12use zerodds_dcps::{DataReader, DataReaderQos, DdsType, Result};
13
14pub struct AsyncDataReader<T: DdsType + Send + Sync + 'static> {
16 inner: Arc<DataReader<T>>,
17}
18
19impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataReader<T> {
20 fn clone(&self) -> Self {
21 Self {
22 inner: Arc::clone(&self.inner),
23 }
24 }
25}
26
27impl<T: DdsType + Send + Sync + 'static> AsyncDataReader<T> {
28 pub(crate) fn from_sync(inner: DataReader<T>) -> Self {
29 Self {
30 inner: Arc::new(inner),
31 }
32 }
33
34 #[must_use]
42 pub fn take_stream(&self) -> SampleStream<T> {
43 SampleStream {
44 reader: Arc::clone(&self.inner),
45 buffered: Vec::new(),
46 poll_interval: Duration::from_millis(5),
47 sleep_until: None,
48 }
49 }
50
51 pub async fn take(&self, timeout: Duration) -> Result<Vec<T>> {
58 let deadline = std::time::Instant::now() + timeout;
59 loop {
60 let samples = self.inner.take()?;
61 if !samples.is_empty() {
62 return Ok(samples);
63 }
64 if std::time::Instant::now() >= deadline {
65 return Ok(Vec::new());
66 }
67 crate::yield_for(Duration::from_millis(10)).await;
68 }
69 }
70
71 pub async fn wait_for_matched_publication(
76 &self,
77 min_count: usize,
78 timeout: Duration,
79 ) -> Result<()> {
80 let deadline = std::time::Instant::now() + timeout;
81 loop {
82 if self.inner.matched_publication_count() >= min_count {
83 return Ok(());
84 }
85 if std::time::Instant::now() >= deadline {
86 return Err(zerodds_dcps::DdsError::Timeout);
87 }
88 crate::yield_for(Duration::from_millis(10)).await;
89 }
90 }
91
92 #[must_use]
94 pub fn matched_publication_count(&self) -> usize {
95 self.inner.matched_publication_count()
96 }
97
98 #[must_use]
100 pub fn as_sync(&self) -> &DataReader<T> {
101 &self.inner
102 }
103
104 #[must_use]
106 pub fn qos(&self) -> DataReaderQos {
107 self.inner.qos()
108 }
109
110 #[must_use]
121 pub fn data_available_stream(&self) -> DataAvailableStream<T> {
122 DataAvailableStream {
123 reader: Arc::clone(&self.inner),
124 poll_interval: Duration::from_millis(10),
125 sleep_until: None,
126 last_seen_count: 0,
127 }
128 }
129
130 #[must_use]
139 pub fn publication_matched_stream(&self) -> PublicationMatchedStream<T> {
140 PublicationMatchedStream {
141 reader: Arc::clone(&self.inner),
142 poll_interval: Duration::from_millis(20),
143 sleep_until: None,
144 last_count: usize::MAX,
145 }
146 }
147}
148
149pub struct SampleStream<T: DdsType + Send + Sync + 'static> {
151 reader: Arc<DataReader<T>>,
152 buffered: Vec<T>,
153 poll_interval: Duration,
154 sleep_until: Option<std::time::Instant>,
155}
156
157impl<T: DdsType + Send + Sync + 'static> Unpin for SampleStream<T> {}
158
159pub struct DataAvailableStream<T: DdsType + Send + Sync + 'static> {
163 reader: Arc<DataReader<T>>,
164 poll_interval: Duration,
165 sleep_until: Option<std::time::Instant>,
166 last_seen_count: usize,
170}
171
172impl<T: DdsType + Send + Sync + 'static> Unpin for DataAvailableStream<T> {}
173
174impl<T: DdsType + Send + Sync + 'static> Stream for DataAvailableStream<T> {
175 type Item = ();
176
177 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
178 let this = self.get_mut();
179
180 let cur_count = match this.reader.read() {
185 Ok(samples) => samples.len(),
186 Err(_) => {
187 return Poll::Ready(None);
189 }
190 };
191 if cur_count > this.last_seen_count {
192 this.last_seen_count = cur_count;
193 return Poll::Ready(Some(()));
194 }
195
196 if let Some((rt, eid)) = this.reader.runtime_handle() {
199 rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
200 return Poll::Pending;
201 }
202
203 if let Some(deadline) = this.sleep_until {
205 if std::time::Instant::now() < deadline {
206 schedule_wake(cx, deadline);
207 return Poll::Pending;
208 }
209 this.sleep_until = None;
210 }
211 this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
212 schedule_wake_in(cx, this.poll_interval);
213 Poll::Pending
214 }
215}
216
217pub struct PublicationMatchedStream<T: DdsType + Send + Sync + 'static> {
219 reader: Arc<DataReader<T>>,
220 poll_interval: Duration,
221 sleep_until: Option<std::time::Instant>,
222 last_count: usize,
223}
224
225impl<T: DdsType + Send + Sync + 'static> Unpin for PublicationMatchedStream<T> {}
226
227impl<T: DdsType + Send + Sync + 'static> Stream for PublicationMatchedStream<T> {
228 type Item = zerodds_dcps::status::SubscriptionMatchedStatus;
229
230 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231 let this = self.get_mut();
232
233 if let Some(deadline) = this.sleep_until {
234 if std::time::Instant::now() < deadline {
235 schedule_wake(cx, deadline);
236 return Poll::Pending;
237 }
238 this.sleep_until = None;
239 }
240
241 let now_count = this.reader.matched_publication_count();
242 if now_count != this.last_count {
243 let prev_known = if this.last_count == usize::MAX {
246 0
247 } else {
248 this.last_count
249 };
250 this.last_count = now_count;
251 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
252 let now_i = now_count as i32;
253 #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
254 let prev_i = prev_known as i32;
255 let delta = now_i - prev_i;
256 let status = zerodds_dcps::status::SubscriptionMatchedStatus {
257 total_count: now_i.max(prev_i),
258 total_count_change: delta.max(0),
259 current_count: now_i,
260 current_count_change: delta,
261 last_publication_handle: zerodds_dcps::HANDLE_NIL,
262 };
263 Poll::Ready(Some(status))
264 } else {
265 this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
266 schedule_wake_in(cx, this.poll_interval);
267 Poll::Pending
268 }
269 }
270}
271
272fn schedule_wake(cx: &mut Context<'_>, deadline: std::time::Instant) {
273 let waker = cx.waker().clone();
274 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
275 std::thread::spawn(move || {
276 std::thread::sleep(remaining);
277 waker.wake();
278 });
279}
280
281fn schedule_wake_in(cx: &mut Context<'_>, interval: Duration) {
282 let waker = cx.waker().clone();
283 std::thread::spawn(move || {
284 std::thread::sleep(interval);
285 waker.wake();
286 });
287}
288
289impl<T: DdsType + Send + Sync + 'static> Stream for SampleStream<T> {
290 type Item = T;
291
292 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
293 let this = self.get_mut();
294
295 if !this.buffered.is_empty() {
297 return Poll::Ready(Some(this.buffered.remove(0)));
298 }
299
300 if let Some(deadline) = this.sleep_until {
302 if std::time::Instant::now() < deadline {
303 let waker = cx.waker().clone();
305 let remaining = deadline.saturating_duration_since(std::time::Instant::now());
306 std::thread::spawn(move || {
307 std::thread::sleep(remaining);
308 waker.wake();
309 });
310 return Poll::Pending;
311 }
312 this.sleep_until = None;
313 }
314
315 match this.reader.take() {
317 Ok(mut samples) if !samples.is_empty() => {
318 let first = samples.remove(0);
319 this.buffered = samples;
320 Poll::Ready(Some(first))
321 }
322 Ok(_) => {
323 if let Some((rt, eid)) = this.reader.runtime_handle() {
327 rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
328 return Poll::Pending;
329 }
330 this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
333 let waker = cx.waker().clone();
334 let interval = this.poll_interval;
335 std::thread::spawn(move || {
336 std::thread::sleep(interval);
337 waker.wake();
338 });
339 Poll::Pending
340 }
341 Err(_) => Poll::Ready(None),
342 }
343 }
344}