extern crate alloc;
use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::time::Duration;
use crate::object_id::ObjectId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct DeliveryControl {
pub max_samples: u16,
pub max_elapsed_time: Duration,
pub max_bytes_per_second: u32,
pub min_pace_period: Duration,
}
impl Default for DeliveryControl {
fn default() -> Self {
Self {
max_samples: 0,
max_elapsed_time: Duration::MAX,
max_bytes_per_second: 0,
min_pace_period: Duration::ZERO,
}
}
}
impl DeliveryControl {
#[must_use]
pub fn single_shot() -> Self {
Self {
max_samples: 1,
max_elapsed_time: Duration::ZERO,
max_bytes_per_second: 0,
min_pace_period: Duration::ZERO,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PendingSample {
pub bytes: Vec<u8>,
}
#[derive(Debug, Clone)]
pub struct ReadStream {
pub subscriber_handle: ObjectId,
pub topic_handle: ObjectId,
pub delivery_control: DeliveryControl,
started_at: Duration,
last_tick: Duration,
samples_delivered: u32,
bytes_credit: u64,
queue: VecDeque<PendingSample>,
finalized: bool,
}
impl ReadStream {
#[must_use]
pub fn new(
subscriber_handle: ObjectId,
topic_handle: ObjectId,
delivery_control: DeliveryControl,
now: Duration,
) -> Self {
Self {
subscriber_handle,
topic_handle,
delivery_control,
started_at: now,
last_tick: now,
samples_delivered: 0,
bytes_credit: 0,
queue: VecDeque::new(),
finalized: false,
}
}
#[must_use]
pub fn is_finalized(&self) -> bool {
self.finalized
}
#[must_use]
pub fn samples_delivered(&self) -> u32 {
self.samples_delivered
}
pub fn push_sample(&mut self, sample: PendingSample) {
if !self.finalized {
self.queue.push_back(sample);
}
}
#[must_use]
pub fn queued_count(&self) -> usize {
self.queue.len()
}
pub fn pull_pending_samples(&mut self, now: Duration) -> Vec<PendingSample> {
if self.finalized {
return Vec::new();
}
let elapsed = now.saturating_sub(self.started_at);
if elapsed >= self.delivery_control.max_elapsed_time
&& self.delivery_control.max_elapsed_time > Duration::ZERO
{
self.finalized = true;
return Vec::new();
}
let dt = now.saturating_sub(self.last_tick);
if self.delivery_control.max_bytes_per_second > 0 {
let added = (u128::from(self.delivery_control.max_bytes_per_second)
* u128::from(dt.as_millis() as u64)
/ 1000) as u64;
self.bytes_credit = self.bytes_credit.saturating_add(added);
let burst_cap = u64::from(self.delivery_control.max_bytes_per_second);
if self.bytes_credit > burst_cap {
self.bytes_credit = burst_cap;
}
}
if self.delivery_control.min_pace_period > Duration::ZERO
&& dt < self.delivery_control.min_pace_period
&& self.samples_delivered > 0
{
return Vec::new();
}
self.last_tick = now;
let mut out = Vec::new();
while let Some(front) = self.queue.front() {
if self.delivery_control.max_samples > 0
&& self.samples_delivered >= u32::from(self.delivery_control.max_samples)
{
self.finalized = true;
break;
}
if self.delivery_control.max_bytes_per_second > 0 {
let need = front.bytes.len() as u64;
if self.bytes_credit < need {
break;
}
self.bytes_credit -= need;
}
let Some(sample) = self.queue.pop_front() else {
break;
};
out.push(sample);
self.samples_delivered = self.samples_delivered.saturating_add(1);
if self.delivery_control.max_samples == 1 {
self.finalized = true;
break;
}
if self.delivery_control.min_pace_period > Duration::ZERO {
break;
}
}
if self.delivery_control.max_samples > 0
&& self.samples_delivered >= u32::from(self.delivery_control.max_samples)
{
self.finalized = true;
}
out
}
pub fn stop(&mut self) {
self.finalized = true;
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::expect_used, clippy::unwrap_used)]
use super::*;
use crate::object_kind::ObjectKind;
fn s_id() -> ObjectId {
ObjectId::new(0x10, ObjectKind::Subscriber).unwrap()
}
fn t_id() -> ObjectId {
ObjectId::new(0x11, ObjectKind::Topic).unwrap()
}
#[test]
fn single_shot_delivers_one_then_finalizes() {
let mut rs = ReadStream::new(
s_id(),
t_id(),
DeliveryControl::single_shot(),
Duration::ZERO,
);
rs.push_sample(PendingSample {
bytes: alloc::vec![1, 2],
});
rs.push_sample(PendingSample {
bytes: alloc::vec![3, 4],
});
let out = rs.pull_pending_samples(Duration::from_millis(1));
assert_eq!(out.len(), 1);
assert!(rs.is_finalized());
let out = rs.pull_pending_samples(Duration::from_millis(2));
assert!(out.is_empty());
}
#[test]
fn max_samples_cap_enforced() {
let dc = DeliveryControl {
max_samples: 3,
..Default::default()
};
let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
for i in 0..10 {
rs.push_sample(PendingSample {
bytes: alloc::vec![i as u8],
});
}
let out = rs.pull_pending_samples(Duration::from_millis(1));
assert_eq!(out.len(), 3);
assert!(rs.is_finalized());
}
#[test]
fn rate_limit_partitions_samples_over_time() {
let dc = DeliveryControl {
max_samples: 0,
max_elapsed_time: Duration::MAX,
max_bytes_per_second: 100, min_pace_period: Duration::ZERO,
};
let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
for _ in 0..5 {
rs.push_sample(PendingSample {
bytes: alloc::vec![0u8; 50],
});
}
let out = rs.pull_pending_samples(Duration::from_secs(1));
assert_eq!(out.len(), 2);
let out = rs.pull_pending_samples(Duration::from_secs(2));
assert_eq!(out.len(), 2);
}
#[test]
fn max_elapsed_time_finalizes() {
let dc = DeliveryControl {
max_samples: 0,
max_elapsed_time: Duration::from_secs(1),
max_bytes_per_second: 0,
min_pace_period: Duration::ZERO,
};
let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
rs.push_sample(PendingSample {
bytes: alloc::vec![1],
});
let out = rs.pull_pending_samples(Duration::from_millis(500));
assert_eq!(out.len(), 1);
assert!(!rs.is_finalized());
rs.push_sample(PendingSample {
bytes: alloc::vec![2],
});
let out = rs.pull_pending_samples(Duration::from_secs(2));
assert!(out.is_empty());
assert!(rs.is_finalized());
}
#[test]
fn stop_finalizes_immediately() {
let mut rs = ReadStream::new(s_id(), t_id(), DeliveryControl::default(), Duration::ZERO);
rs.push_sample(PendingSample {
bytes: alloc::vec![1],
});
rs.stop();
let out = rs.pull_pending_samples(Duration::from_millis(1));
assert!(out.is_empty());
assert!(rs.is_finalized());
}
#[test]
fn pacing_throttles_per_period() {
let dc = DeliveryControl {
max_samples: 0,
max_elapsed_time: Duration::MAX,
max_bytes_per_second: 0,
min_pace_period: Duration::from_millis(100),
};
let mut rs = ReadStream::new(s_id(), t_id(), dc, Duration::ZERO);
for _ in 0..5 {
rs.push_sample(PendingSample {
bytes: alloc::vec![1],
});
}
let out = rs.pull_pending_samples(Duration::from_millis(1));
assert_eq!(out.len(), 1);
let out = rs.pull_pending_samples(Duration::from_millis(50));
assert!(out.is_empty());
let out = rs.pull_pending_samples(Duration::from_millis(200));
assert_eq!(out.len(), 1);
}
}