1extern crate alloc;
18use alloc::collections::VecDeque;
19use alloc::vec::Vec;
20use core::time::Duration;
21
22use crate::object_id::ObjectId;
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
29pub struct DeliveryControl {
30 pub max_samples: u16,
32 pub max_elapsed_time: Duration,
34 pub max_bytes_per_second: u32,
36 pub min_pace_period: Duration,
38}
39
40impl Default for DeliveryControl {
41 fn default() -> Self {
42 Self {
43 max_samples: 0,
44 max_elapsed_time: Duration::MAX,
45 max_bytes_per_second: 0,
46 min_pace_period: Duration::ZERO,
47 }
48 }
49}
50
51impl DeliveryControl {
52 #[must_use]
54 pub fn single_shot() -> Self {
55 Self {
56 max_samples: 1,
57 max_elapsed_time: Duration::ZERO,
58 max_bytes_per_second: 0,
59 min_pace_period: Duration::ZERO,
60 }
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct PendingSample {
68 pub bytes: Vec<u8>,
70}
71
72#[derive(Debug, Clone)]
74pub struct ReadStream {
75 pub subscriber_handle: ObjectId,
77 pub topic_handle: ObjectId,
79 pub delivery_control: DeliveryControl,
81
82 started_at: Duration,
84 last_tick: Duration,
86 samples_delivered: u32,
88 bytes_credit: u64,
90 queue: VecDeque<PendingSample>,
92 finalized: bool,
94}
95
96impl ReadStream {
97 #[must_use]
99 pub fn new(
100 subscriber_handle: ObjectId,
101 topic_handle: ObjectId,
102 delivery_control: DeliveryControl,
103 now: Duration,
104 ) -> Self {
105 Self {
106 subscriber_handle,
107 topic_handle,
108 delivery_control,
109 started_at: now,
110 last_tick: now,
111 samples_delivered: 0,
112 bytes_credit: 0,
113 queue: VecDeque::new(),
114 finalized: false,
115 }
116 }
117
118 #[must_use]
120 pub fn is_finalized(&self) -> bool {
121 self.finalized
122 }
123
124 #[must_use]
126 pub fn samples_delivered(&self) -> u32 {
127 self.samples_delivered
128 }
129
130 pub fn push_sample(&mut self, sample: PendingSample) {
132 if !self.finalized {
133 self.queue.push_back(sample);
134 }
135 }
136
137 #[must_use]
139 pub fn queued_count(&self) -> usize {
140 self.queue.len()
141 }
142
143 pub fn pull_pending_samples(&mut self, now: Duration) -> Vec<PendingSample> {
146 if self.finalized {
147 return Vec::new();
148 }
149 let elapsed = now.saturating_sub(self.started_at);
151 if elapsed >= self.delivery_control.max_elapsed_time
152 && self.delivery_control.max_elapsed_time > Duration::ZERO
153 {
154 self.finalized = true;
155 return Vec::new();
156 }
157
158 let dt = now.saturating_sub(self.last_tick);
160 if self.delivery_control.max_bytes_per_second > 0 {
161 let added = (u128::from(self.delivery_control.max_bytes_per_second)
162 * u128::from(dt.as_millis() as u64)
163 / 1000) as u64;
164 self.bytes_credit = self.bytes_credit.saturating_add(added);
165 let burst_cap = u64::from(self.delivery_control.max_bytes_per_second);
167 if self.bytes_credit > burst_cap {
168 self.bytes_credit = burst_cap;
169 }
170 }
171
172 if self.delivery_control.min_pace_period > Duration::ZERO
174 && dt < self.delivery_control.min_pace_period
175 && self.samples_delivered > 0
176 {
177 return Vec::new();
178 }
179 self.last_tick = now;
180
181 let mut out = Vec::new();
182 while let Some(front) = self.queue.front() {
183 if self.delivery_control.max_samples > 0
185 && self.samples_delivered >= u32::from(self.delivery_control.max_samples)
186 {
187 self.finalized = true;
188 break;
189 }
190 if self.delivery_control.max_bytes_per_second > 0 {
192 let need = front.bytes.len() as u64;
193 if self.bytes_credit < need {
194 break;
195 }
196 self.bytes_credit -= need;
197 }
198 let Some(sample) = self.queue.pop_front() else {
199 break;
200 };
201 out.push(sample);
202 self.samples_delivered = self.samples_delivered.saturating_add(1);
203
204 if self.delivery_control.max_samples == 1 {
206 self.finalized = true;
207 break;
208 }
209 if self.delivery_control.min_pace_period > Duration::ZERO {
211 break;
212 }
213 }
214 if self.delivery_control.max_samples > 0
216 && self.samples_delivered >= u32::from(self.delivery_control.max_samples)
217 {
218 self.finalized = true;
219 }
220 out
221 }
222
223 pub fn stop(&mut self) {
225 self.finalized = true;
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 #![allow(clippy::expect_used, clippy::unwrap_used)]
232 use super::*;
233 use crate::object_kind::ObjectKind;
234
235 fn s_id() -> ObjectId {
236 ObjectId::new(0x10, ObjectKind::Subscriber).unwrap()
237 }
238 fn t_id() -> ObjectId {
239 ObjectId::new(0x11, ObjectKind::Topic).unwrap()
240 }
241
242 #[test]
243 fn single_shot_delivers_one_then_finalizes() {
244 let mut rs = ReadStream::new(
245 s_id(),
246 t_id(),
247 DeliveryControl::single_shot(),
248 Duration::ZERO,
249 );
250 rs.push_sample(PendingSample {
251 bytes: alloc::vec![1, 2],
252 });
253 rs.push_sample(PendingSample {
254 bytes: alloc::vec![3, 4],
255 });
256 let out = rs.pull_pending_samples(Duration::from_millis(1));
257 assert_eq!(out.len(), 1);
258 assert!(rs.is_finalized());
259 let out = rs.pull_pending_samples(Duration::from_millis(2));
261 assert!(out.is_empty());
262 }
263
264 #[test]
265 fn max_samples_cap_enforced() {
266 let dc = DeliveryControl {
267 max_samples: 3,
268 ..Default::default()
269 };
270 let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
271 for i in 0..10 {
272 rs.push_sample(PendingSample {
273 bytes: alloc::vec![i as u8],
274 });
275 }
276 let out = rs.pull_pending_samples(Duration::from_millis(1));
277 assert_eq!(out.len(), 3);
278 assert!(rs.is_finalized());
279 }
280
281 #[test]
282 fn rate_limit_partitions_samples_over_time() {
283 let dc = DeliveryControl {
284 max_samples: 0,
285 max_elapsed_time: Duration::MAX,
286 max_bytes_per_second: 100, min_pace_period: Duration::ZERO,
288 };
289 let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
290 for _ in 0..5 {
291 rs.push_sample(PendingSample {
292 bytes: alloc::vec![0u8; 50],
293 });
294 }
295 let out = rs.pull_pending_samples(Duration::from_secs(1));
297 assert_eq!(out.len(), 2);
298 let out = rs.pull_pending_samples(Duration::from_secs(2));
300 assert_eq!(out.len(), 2);
301 }
302
303 #[test]
304 fn max_elapsed_time_finalizes() {
305 let dc = DeliveryControl {
306 max_samples: 0,
307 max_elapsed_time: Duration::from_secs(1),
308 max_bytes_per_second: 0,
309 min_pace_period: Duration::ZERO,
310 };
311 let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
312 rs.push_sample(PendingSample {
313 bytes: alloc::vec![1],
314 });
315 let out = rs.pull_pending_samples(Duration::from_millis(500));
317 assert_eq!(out.len(), 1);
318 assert!(!rs.is_finalized());
319 rs.push_sample(PendingSample {
321 bytes: alloc::vec![2],
322 });
323 let out = rs.pull_pending_samples(Duration::from_secs(2));
324 assert!(out.is_empty());
325 assert!(rs.is_finalized());
326 }
327
328 #[test]
329 fn stop_finalizes_immediately() {
330 let mut rs = ReadStream::new(s_id(), t_id(), DeliveryControl::default(), Duration::ZERO);
331 rs.push_sample(PendingSample {
332 bytes: alloc::vec![1],
333 });
334 rs.stop();
335 let out = rs.pull_pending_samples(Duration::from_millis(1));
336 assert!(out.is_empty());
337 assert!(rs.is_finalized());
338 }
339
340 #[test]
341 fn pacing_throttles_per_period() {
342 let dc = DeliveryControl {
343 max_samples: 0,
344 max_elapsed_time: Duration::MAX,
345 max_bytes_per_second: 0,
346 min_pace_period: Duration::from_millis(100),
347 };
348 let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
349 for _ in 0..5 {
350 rs.push_sample(PendingSample {
351 bytes: alloc::vec![1],
352 });
353 }
354 let out = rs.pull_pending_samples(Duration::from_millis(1));
356 assert_eq!(out.len(), 1);
357 let out = rs.pull_pending_samples(Duration::from_millis(50));
359 assert!(out.is_empty());
360 let out = rs.pull_pending_samples(Duration::from_millis(200));
362 assert_eq!(out.len(), 1);
363 }
364}