Skip to main content

zerodds_dcps_async/
reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3//! AsyncDataReader + SampleStream.
4
5use alloc::sync::Arc;
6use alloc::vec::Vec;
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use core::time::Duration;
10
11use futures_core::Stream;
12use zerodds_dcps::{DataReader, DataReaderQos, DdsType, Result};
13
14/// Async-Wrapper um `DataReader<T>`.
15pub struct AsyncDataReader<T: DdsType + Send + Sync + 'static> {
16    inner: Arc<DataReader<T>>,
17}
18
19impl<T: DdsType + Send + Sync + 'static> Clone for AsyncDataReader<T> {
20    fn clone(&self) -> Self {
21        Self {
22            inner: Arc::clone(&self.inner),
23        }
24    }
25}
26
27impl<T: DdsType + Send + Sync + 'static> AsyncDataReader<T> {
28    pub(crate) fn from_sync(inner: DataReader<T>) -> Self {
29        Self {
30            inner: Arc::new(inner),
31        }
32    }
33
34    /// Spec §2.2.1 `take_stream` — `Stream<Item = T>`.
35    ///
36    /// Live-Mode (Reader hat `runtime_handle()`): der Stream
37    /// registriert sich mit `register_user_reader_waker` an der
38    /// Runtime; `cx.waker()` wird beim naechsten Sample-Zufluss
39    /// gefeuert (kein Polling). Offline-Mode: detached-Thread-Sleep
40    /// als Polling-Fallback (Spec §3.3).
41    #[must_use]
42    pub fn take_stream(&self) -> SampleStream<T> {
43        SampleStream {
44            reader: Arc::clone(&self.inner),
45            buffered: Vec::new(),
46            poll_interval: Duration::from_millis(5),
47            sleep_until: None,
48        }
49    }
50
51    /// Spec §2.2.2 take(timeout). Resolves Ok mit `Vec<T>` wenn Samples
52    /// verfuegbar sind oder Timeout abgelaufen ist (dann leerer Vec
53    /// statt Err — analog `take()` sync).
54    ///
55    /// # Errors
56    /// Wire-/Decode-Fehler wie sync.
57    pub async fn take(&self, timeout: Duration) -> Result<Vec<T>> {
58        let deadline = std::time::Instant::now() + timeout;
59        loop {
60            let samples = self.inner.take()?;
61            if !samples.is_empty() {
62                return Ok(samples);
63            }
64            if std::time::Instant::now() >= deadline {
65                return Ok(Vec::new());
66            }
67            crate::yield_for(Duration::from_millis(10)).await;
68        }
69    }
70
71    /// Spec §2.2.3 wait_for_matched_publication.
72    ///
73    /// # Errors
74    /// `Timeout` wenn `min_count` nicht erreicht.
75    pub async fn wait_for_matched_publication(
76        &self,
77        min_count: usize,
78        timeout: Duration,
79    ) -> Result<()> {
80        let deadline = std::time::Instant::now() + timeout;
81        loop {
82            if self.inner.matched_publication_count() >= min_count {
83                return Ok(());
84            }
85            if std::time::Instant::now() >= deadline {
86                return Err(zerodds_dcps::DdsError::Timeout);
87            }
88            crate::yield_for(Duration::from_millis(10)).await;
89        }
90    }
91
92    /// Spec §2.2.4 matched_publication_count (synchron).
93    #[must_use]
94    pub fn matched_publication_count(&self) -> usize {
95        self.inner.matched_publication_count()
96    }
97
98    /// Liefert die zugrundeliegende sync-Variante.
99    #[must_use]
100    pub fn as_sync(&self) -> &DataReader<T> {
101        &self.inner
102    }
103
104    /// Liefert die DataReaderQos.
105    #[must_use]
106    pub fn qos(&self) -> DataReaderQos {
107        self.inner.qos()
108    }
109
110    /// Spec §6.1 `data_available_stream` — `Stream<Item = ()>`.
111    /// Emittiert pro Sample-Zufluss ein `()`-Event; Caller ruft danach
112    /// `take()` / iteriert ueber `take_stream()` um die Samples zu
113    /// holen. Konsumiert keine Samples (im Gegensatz zu
114    /// [`Self::take_stream`]).
115    ///
116    /// Live-Mode: registriert sich am Reader-Slot der Runtime
117    /// (`register_user_reader_waker`) — Wake erfolgt beim
118    /// `sample_tx.send` durch die Runtime, kein Polling. Offline-
119    /// Mode: detached-Thread-Sleep als Polling-Fallback.
120    #[must_use]
121    pub fn data_available_stream(&self) -> DataAvailableStream<T> {
122        DataAvailableStream {
123            reader: Arc::clone(&self.inner),
124            poll_interval: Duration::from_millis(10),
125            sleep_until: None,
126            last_seen_count: 0,
127        }
128    }
129
130    /// Spec §6.2 `publication_matched_stream` —
131    /// `Stream<Item = SubscriptionMatchedStatus>`. Emittiert den
132    /// vollen Reader-side Match-Status (DDS 1.4 §2.2.4.1
133    /// SUBSCRIPTION_MATCHED) jedes Mal wenn sich der Count an
134    /// matched Publications (Writers) aendert. Felder:
135    /// `total_count` (cumulative), `total_count_change` (delta),
136    /// `current_count` (currently matched), `current_count_change`
137    /// (signed delta), `last_publication_handle`.
138    #[must_use]
139    pub fn publication_matched_stream(&self) -> PublicationMatchedStream<T> {
140        PublicationMatchedStream {
141            reader: Arc::clone(&self.inner),
142            poll_interval: Duration::from_millis(20),
143            sleep_until: None,
144            last_count: usize::MAX,
145        }
146    }
147}
148
149/// Stream uber Reader-Samples. Endet wenn der Reader gedroppt wird.
150pub struct SampleStream<T: DdsType + Send + Sync + 'static> {
151    reader: Arc<DataReader<T>>,
152    buffered: Vec<T>,
153    poll_interval: Duration,
154    sleep_until: Option<std::time::Instant>,
155}
156
157impl<T: DdsType + Send + Sync + 'static> Unpin for SampleStream<T> {}
158
159/// Stream der "data available"-Events. Yieldet `()` jedes Mal wenn
160/// neue Samples im Reader sind. Konsumiert keine Samples — Caller
161/// muss `take()` oder `take_stream` separat aufrufen.
162pub struct DataAvailableStream<T: DdsType + Send + Sync + 'static> {
163    reader: Arc<DataReader<T>>,
164    poll_interval: Duration,
165    sleep_until: Option<std::time::Instant>,
166    /// Sample-Anzahl bei der letzten Emission. Steigender Wert =
167    /// neue Samples → emit `()`. Lese-Quelle ist der nicht-
168    /// konsumierende `read()`-Pfad.
169    last_seen_count: usize,
170}
171
172impl<T: DdsType + Send + Sync + 'static> Unpin for DataAvailableStream<T> {}
173
174impl<T: DdsType + Send + Sync + 'static> Stream for DataAvailableStream<T> {
175    type Item = ();
176
177    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
178        let this = self.get_mut();
179
180        // Pruefen ob neue Samples eingetroffen sind — `read()` ist
181        // non-consuming (DDS 1.4 §2.2.2.5.3.5), die Samples bleiben
182        // im Reader-Cache und koennen vom Caller danach via `take()`
183        // konsumiert werden.
184        let cur_count = match this.reader.read() {
185            Ok(samples) => samples.len(),
186            Err(_) => {
187                // Reader-State korrupt → Stream-Ende.
188                return Poll::Ready(None);
189            }
190        };
191        if cur_count > this.last_seen_count {
192            this.last_seen_count = cur_count;
193            return Poll::Ready(Some(()));
194        }
195
196        // Kein neuer Sample. Live-Mode: nativer Reader-Slot-Waker;
197        // bei jedem `sample_tx.send` weckt die Runtime uns auf.
198        if let Some((rt, eid)) = this.reader.runtime_handle() {
199            rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
200            return Poll::Pending;
201        }
202
203        // Offline-Mode: Polling-Fallback ueber detached-Thread-Sleep.
204        if let Some(deadline) = this.sleep_until {
205            if std::time::Instant::now() < deadline {
206                schedule_wake(cx, deadline);
207                return Poll::Pending;
208            }
209            this.sleep_until = None;
210        }
211        this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
212        schedule_wake_in(cx, this.poll_interval);
213        Poll::Pending
214    }
215}
216
217/// Stream der publication-matched-Counts. Yieldet jeden neuen Count.
218pub struct PublicationMatchedStream<T: DdsType + Send + Sync + 'static> {
219    reader: Arc<DataReader<T>>,
220    poll_interval: Duration,
221    sleep_until: Option<std::time::Instant>,
222    last_count: usize,
223}
224
225impl<T: DdsType + Send + Sync + 'static> Unpin for PublicationMatchedStream<T> {}
226
227impl<T: DdsType + Send + Sync + 'static> Stream for PublicationMatchedStream<T> {
228    type Item = zerodds_dcps::status::SubscriptionMatchedStatus;
229
230    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231        let this = self.get_mut();
232
233        if let Some(deadline) = this.sleep_until {
234            if std::time::Instant::now() < deadline {
235                schedule_wake(cx, deadline);
236                return Poll::Pending;
237            }
238            this.sleep_until = None;
239        }
240
241        let now_count = this.reader.matched_publication_count();
242        if now_count != this.last_count {
243            // Delta-Berechnung: bei initialem Aufruf
244            // (last_count == usize::MAX) ist der Delta == now_count.
245            let prev_known = if this.last_count == usize::MAX {
246                0
247            } else {
248                this.last_count
249            };
250            this.last_count = now_count;
251            #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
252            let now_i = now_count as i32;
253            #[allow(clippy::cast_possible_truncation, clippy::cast_possible_wrap)]
254            let prev_i = prev_known as i32;
255            let delta = now_i - prev_i;
256            let status = zerodds_dcps::status::SubscriptionMatchedStatus {
257                total_count: now_i.max(prev_i),
258                total_count_change: delta.max(0),
259                current_count: now_i,
260                current_count_change: delta,
261                last_publication_handle: zerodds_dcps::HANDLE_NIL,
262            };
263            Poll::Ready(Some(status))
264        } else {
265            this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
266            schedule_wake_in(cx, this.poll_interval);
267            Poll::Pending
268        }
269    }
270}
271
272fn schedule_wake(cx: &mut Context<'_>, deadline: std::time::Instant) {
273    let waker = cx.waker().clone();
274    let remaining = deadline.saturating_duration_since(std::time::Instant::now());
275    std::thread::spawn(move || {
276        std::thread::sleep(remaining);
277        waker.wake();
278    });
279}
280
281fn schedule_wake_in(cx: &mut Context<'_>, interval: Duration) {
282    let waker = cx.waker().clone();
283    std::thread::spawn(move || {
284        std::thread::sleep(interval);
285        waker.wake();
286    });
287}
288
289impl<T: DdsType + Send + Sync + 'static> Stream for SampleStream<T> {
290    type Item = T;
291
292    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
293        let this = self.get_mut();
294
295        // Pufferierte Samples zuerst.
296        if !this.buffered.is_empty() {
297            return Poll::Ready(Some(this.buffered.remove(0)));
298        }
299
300        // Sleep-Pfad: wenn wir mid-pause sind, warten.
301        if let Some(deadline) = this.sleep_until {
302            if std::time::Instant::now() < deadline {
303                // Re-schedule — Caller-Runtime weckt den Future.
304                let waker = cx.waker().clone();
305                let remaining = deadline.saturating_duration_since(std::time::Instant::now());
306                std::thread::spawn(move || {
307                    std::thread::sleep(remaining);
308                    waker.wake();
309                });
310                return Poll::Pending;
311            }
312            this.sleep_until = None;
313        }
314
315        // Take aus dem Reader.
316        match this.reader.take() {
317            Ok(mut samples) if !samples.is_empty() => {
318                let first = samples.remove(0);
319                this.buffered = samples;
320                Poll::Ready(Some(first))
321            }
322            Ok(_) => {
323                // Kein Sample — Spec §3.3: Waker beim Reader-Slot
324                // registrieren. Bei `sample_tx.send` weckt die
325                // Runtime uns nativ. Live-Mode-Pfad.
326                if let Some((rt, eid)) = this.reader.runtime_handle() {
327                    rt.register_user_reader_waker(eid, Some(cx.waker().clone()));
328                    return Poll::Pending;
329                }
330                // Offline-Mode: detached-thread-Sleep als Polling-
331                // Fallback (kein Reader-Slot, kein Sample-Zufluss).
332                this.sleep_until = Some(std::time::Instant::now() + this.poll_interval);
333                let waker = cx.waker().clone();
334                let interval = this.poll_interval;
335                std::thread::spawn(move || {
336                    std::thread::sleep(interval);
337                    waker.wake();
338                });
339                Poll::Pending
340            }
341            Err(_) => Poll::Ready(None),
342        }
343    }
344}