Skip to main content

zerodds_xrce/
continuous_read.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Continuous-Read-Mode (Spec §8.4.14).
5//!
6//! `READ_DATA`-Submessages koennen einen `DataDeliveryControl` tragen, der
7//! aus dem Single-Shot-Read einen kontinuierlichen Stream macht: der Agent
8//! liefert solange `DATA`-Submessages, bis eines der Limits erreicht ist:
9//!
10//! - `max_samples`: maximale Anzahl Samples insgesamt.
11//! - `max_elapsed_time`: harte Zeit-Obergrenze.
12//! - `max_bytes_per_second`: Rate-Limit (Token-Bucket-aehnlich).
13//!
14//! Diese Datei modelliert den Reader-Mode-State, der von einem
15//! Agent-Prozess (out-of-scope hier) konsumiert wird.
16
17extern crate alloc;
18use alloc::collections::VecDeque;
19use alloc::vec::Vec;
20use core::time::Duration;
21
22use crate::object_id::ObjectId;
23
24/// Delivery-Control (Spec §7.7.13).
25///
26/// `0` als Wert bedeutet "no limit" (Spec); wir mappen das auf `u16::MAX`
27/// fuer max_samples / `Duration::MAX` fuer max_elapsed_time.
28#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct DeliveryControl {
30    /// Maximale Anzahl Samples (`0` = unlimited).
31    pub max_samples: u16,
32    /// Harte Zeit-Obergrenze.
33    pub max_elapsed_time: Duration,
34    /// Rate-Cap in Bytes/s (`0` = unlimited).
35    pub max_bytes_per_second: u32,
36    /// Mindest-Pause zwischen Samples (ms). `0` = kein Pacing.
37    pub min_pace_period: Duration,
38}
39
40impl Default for DeliveryControl {
41    fn default() -> Self {
42        Self {
43            max_samples: 0,
44            max_elapsed_time: Duration::MAX,
45            max_bytes_per_second: 0,
46            min_pace_period: Duration::ZERO,
47        }
48    }
49}
50
51impl DeliveryControl {
52    /// Single-Shot Read: ein Sample, sofort.
53    #[must_use]
54    pub fn single_shot() -> Self {
55        Self {
56            max_samples: 1,
57            max_elapsed_time: Duration::ZERO,
58            max_bytes_per_second: 0,
59            min_pace_period: Duration::ZERO,
60        }
61    }
62}
63
64/// Ein vom ReadStream produziertes Sample (entspricht spaeter dem
65/// XCDR2-Body einer DATA-Submessage).
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct PendingSample {
68    /// Application-Payload (XCDR2-encoded).
69    pub bytes: Vec<u8>,
70}
71
72/// Pro-`READ_DATA`-Request gehaltener Stream-State.
73#[derive(Debug, Clone)]
74pub struct ReadStream {
75    /// Subscriber-Object, dem der ReadStream zugeordnet ist.
76    pub subscriber_handle: ObjectId,
77    /// Topic-Object, das gelesen wird.
78    pub topic_handle: ObjectId,
79    /// Delivery-Control mit allen Limits.
80    pub delivery_control: DeliveryControl,
81
82    /// Start-Zeitpunkt (uptime-relativ).
83    started_at: Duration,
84    /// Letzter Tick.
85    last_tick: Duration,
86    /// Bisher gelieferte Samples.
87    samples_delivered: u32,
88    /// Token-Bucket: zum Tick verfuegbare Bytes.
89    bytes_credit: u64,
90    /// Wartende Samples, von der App-Schicht eingespielt.
91    queue: VecDeque<PendingSample>,
92    /// `true`, wenn der Stream finalized ist und nichts mehr liefert.
93    finalized: bool,
94}
95
96impl ReadStream {
97    /// Konstruktor.
98    #[must_use]
99    pub fn new(
100        subscriber_handle: ObjectId,
101        topic_handle: ObjectId,
102        delivery_control: DeliveryControl,
103        now: Duration,
104    ) -> Self {
105        Self {
106            subscriber_handle,
107            topic_handle,
108            delivery_control,
109            started_at: now,
110            last_tick: now,
111            samples_delivered: 0,
112            bytes_credit: 0,
113            queue: VecDeque::new(),
114            finalized: false,
115        }
116    }
117
118    /// `true`, wenn der Stream abgeschlossen ist.
119    #[must_use]
120    pub fn is_finalized(&self) -> bool {
121        self.finalized
122    }
123
124    /// Anzahl bereits gelieferter Samples.
125    #[must_use]
126    pub fn samples_delivered(&self) -> u32 {
127        self.samples_delivered
128    }
129
130    /// App-Schicht reicht ein neues Sample ein.
131    pub fn push_sample(&mut self, sample: PendingSample) {
132        if !self.finalized {
133            self.queue.push_back(sample);
134        }
135    }
136
137    /// Anzahl wartender Samples (noch nicht ausgeliefert).
138    #[must_use]
139    pub fn queued_count(&self) -> usize {
140        self.queue.len()
141    }
142
143    /// Pull-Tick: liefert die Samples, die jetzt rate-konform ausgegeben
144    /// werden duerfen. `now` ist uptime-relativ.
145    pub fn pull_pending_samples(&mut self, now: Duration) -> Vec<PendingSample> {
146        if self.finalized {
147            return Vec::new();
148        }
149        // Time-Cap: max_elapsed_time ueberschritten?
150        let elapsed = now.saturating_sub(self.started_at);
151        if elapsed >= self.delivery_control.max_elapsed_time
152            && self.delivery_control.max_elapsed_time > Duration::ZERO
153        {
154            self.finalized = true;
155            return Vec::new();
156        }
157
158        // Token-Bucket nachfuellen.
159        let dt = now.saturating_sub(self.last_tick);
160        if self.delivery_control.max_bytes_per_second > 0 {
161            let added = (u128::from(self.delivery_control.max_bytes_per_second)
162                * u128::from(dt.as_millis() as u64)
163                / 1000) as u64;
164            self.bytes_credit = self.bytes_credit.saturating_add(added);
165            // Cap auf 1s-Burst
166            let burst_cap = u64::from(self.delivery_control.max_bytes_per_second);
167            if self.bytes_credit > burst_cap {
168                self.bytes_credit = burst_cap;
169            }
170        }
171
172        // Pacing-Pause: noch nicht abgelaufen?
173        if self.delivery_control.min_pace_period > Duration::ZERO
174            && dt < self.delivery_control.min_pace_period
175            && self.samples_delivered > 0
176        {
177            return Vec::new();
178        }
179        self.last_tick = now;
180
181        let mut out = Vec::new();
182        while let Some(front) = self.queue.front() {
183            // max_samples-Cap?
184            if self.delivery_control.max_samples > 0
185                && self.samples_delivered >= u32::from(self.delivery_control.max_samples)
186            {
187                self.finalized = true;
188                break;
189            }
190            // Rate-Cap: passt das naechste Sample?
191            if self.delivery_control.max_bytes_per_second > 0 {
192                let need = front.bytes.len() as u64;
193                if self.bytes_credit < need {
194                    break;
195                }
196                self.bytes_credit -= need;
197            }
198            let Some(sample) = self.queue.pop_front() else {
199                break;
200            };
201            out.push(sample);
202            self.samples_delivered = self.samples_delivered.saturating_add(1);
203
204            // Single-Shot ist nach 1 Sample fertig.
205            if self.delivery_control.max_samples == 1 {
206                self.finalized = true;
207                break;
208            }
209            // Pacing nach jedem Sample, falls aktiv.
210            if self.delivery_control.min_pace_period > Duration::ZERO {
211                break;
212            }
213        }
214        // max_samples erreicht?
215        if self.delivery_control.max_samples > 0
216            && self.samples_delivered >= u32::from(self.delivery_control.max_samples)
217        {
218            self.finalized = true;
219        }
220        out
221    }
222
223    /// Stoppt den Stream sofort (z.B. bei DELETE des Subscribers).
224    pub fn stop(&mut self) {
225        self.finalized = true;
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    #![allow(clippy::expect_used, clippy::unwrap_used)]
232    use super::*;
233    use crate::object_kind::ObjectKind;
234
235    fn s_id() -> ObjectId {
236        ObjectId::new(0x10, ObjectKind::Subscriber).unwrap()
237    }
238    fn t_id() -> ObjectId {
239        ObjectId::new(0x11, ObjectKind::Topic).unwrap()
240    }
241
242    #[test]
243    fn single_shot_delivers_one_then_finalizes() {
244        let mut rs = ReadStream::new(
245            s_id(),
246            t_id(),
247            DeliveryControl::single_shot(),
248            Duration::ZERO,
249        );
250        rs.push_sample(PendingSample {
251            bytes: alloc::vec![1, 2],
252        });
253        rs.push_sample(PendingSample {
254            bytes: alloc::vec![3, 4],
255        });
256        let out = rs.pull_pending_samples(Duration::from_millis(1));
257        assert_eq!(out.len(), 1);
258        assert!(rs.is_finalized());
259        // Nach finalize liefert nichts mehr
260        let out = rs.pull_pending_samples(Duration::from_millis(2));
261        assert!(out.is_empty());
262    }
263
264    #[test]
265    fn max_samples_cap_enforced() {
266        let dc = DeliveryControl {
267            max_samples: 3,
268            ..Default::default()
269        };
270        let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
271        for i in 0..10 {
272            rs.push_sample(PendingSample {
273                bytes: alloc::vec![i as u8],
274            });
275        }
276        let out = rs.pull_pending_samples(Duration::from_millis(1));
277        assert_eq!(out.len(), 3);
278        assert!(rs.is_finalized());
279    }
280
281    #[test]
282    fn rate_limit_partitions_samples_over_time() {
283        let dc = DeliveryControl {
284            max_samples: 0,
285            max_elapsed_time: Duration::MAX,
286            max_bytes_per_second: 100, // 100 B/s
287            min_pace_period: Duration::ZERO,
288        };
289        let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
290        for _ in 0..5 {
291            rs.push_sample(PendingSample {
292                bytes: alloc::vec![0u8; 50],
293            });
294        }
295        // Bei 1s vergangen: 100 B Budget, 50 B Samples → 2 Samples
296        let out = rs.pull_pending_samples(Duration::from_secs(1));
297        assert_eq!(out.len(), 2);
298        // Bei 2s: weitere 100 B → 2 mehr (insgesamt 4)
299        let out = rs.pull_pending_samples(Duration::from_secs(2));
300        assert_eq!(out.len(), 2);
301    }
302
303    #[test]
304    fn max_elapsed_time_finalizes() {
305        let dc = DeliveryControl {
306            max_samples: 0,
307            max_elapsed_time: Duration::from_secs(1),
308            max_bytes_per_second: 0,
309            min_pace_period: Duration::ZERO,
310        };
311        let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
312        rs.push_sample(PendingSample {
313            bytes: alloc::vec![1],
314        });
315        // 0.5s spaeter → noch ok
316        let out = rs.pull_pending_samples(Duration::from_millis(500));
317        assert_eq!(out.len(), 1);
318        assert!(!rs.is_finalized());
319        // 2s spaeter → finalized, kein Sample mehr
320        rs.push_sample(PendingSample {
321            bytes: alloc::vec![2],
322        });
323        let out = rs.pull_pending_samples(Duration::from_secs(2));
324        assert!(out.is_empty());
325        assert!(rs.is_finalized());
326    }
327
328    #[test]
329    fn stop_finalizes_immediately() {
330        let mut rs = ReadStream::new(s_id(), t_id(), DeliveryControl::default(), Duration::ZERO);
331        rs.push_sample(PendingSample {
332            bytes: alloc::vec![1],
333        });
334        rs.stop();
335        let out = rs.pull_pending_samples(Duration::from_millis(1));
336        assert!(out.is_empty());
337        assert!(rs.is_finalized());
338    }
339
340    #[test]
341    fn pacing_throttles_per_period() {
342        let dc = DeliveryControl {
343            max_samples: 0,
344            max_elapsed_time: Duration::MAX,
345            max_bytes_per_second: 0,
346            min_pace_period: Duration::from_millis(100),
347        };
348        let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
349        for _ in 0..5 {
350            rs.push_sample(PendingSample {
351                bytes: alloc::vec![1],
352            });
353        }
354        // Erster Tick: liefert 1 Sample
355        let out = rs.pull_pending_samples(Duration::from_millis(1));
356        assert_eq!(out.len(), 1);
357        // 50ms spaeter → noch keine 100ms vergangen → kein Sample
358        let out = rs.pull_pending_samples(Duration::from_millis(50));
359        assert!(out.is_empty());
360        // 200ms spaeter → naechstes Sample
361        let out = rs.pull_pending_samples(Duration::from_millis(200));
362        assert_eq!(out.len(), 1);
363    }
364}