use peat_schema::event::v1::{EventPriority, PeatEvent};
use std::collections::VecDeque;
use std::time::Instant;
#[derive(Debug, Clone, Copy)]
pub struct BandwidthAllocation {
pub critical_reserved_bps: u64,
pub high_min_bps: u64,
pub normal_min_bps: u64,
pub low_min_bps: u64,
pub total_available_bps: u64,
}
impl Default for BandwidthAllocation {
fn default() -> Self {
let total = 1_000_000; Self {
critical_reserved_bps: total / 10, high_min_bps: (total * 9 / 10) * 50 / 100, normal_min_bps: (total * 9 / 10) * 35 / 100, low_min_bps: (total * 9 / 10) * 15 / 100, total_available_bps: total,
}
}
}
impl BandwidthAllocation {
pub fn new(total_bps: u64) -> Self {
let non_critical = total_bps * 90 / 100; Self {
critical_reserved_bps: total_bps / 10,
high_min_bps: non_critical * 50 / 100,
normal_min_bps: non_critical * 35 / 100,
low_min_bps: non_critical * 15 / 100,
total_available_bps: total_bps,
}
}
pub fn with_percentages(
total_bps: u64,
critical_pct: u8,
high_pct: u8,
normal_pct: u8,
low_pct: u8,
) -> Self {
assert!(
critical_pct + high_pct + normal_pct + low_pct <= 100,
"Percentages must sum to <= 100"
);
Self {
critical_reserved_bps: total_bps * critical_pct as u64 / 100,
high_min_bps: total_bps * high_pct as u64 / 100,
normal_min_bps: total_bps * normal_pct as u64 / 100,
low_min_bps: total_bps * low_pct as u64 / 100,
total_available_bps: total_bps,
}
}
}
#[derive(Debug)]
struct TokenBucket {
tokens: f64,
capacity: f64,
rate: f64,
last_refill: Instant,
}
impl TokenBucket {
fn new(capacity: f64, rate: f64) -> Self {
Self {
tokens: capacity,
capacity,
rate,
last_refill: Instant::now(),
}
}
fn try_consume(&mut self, count: f64) -> bool {
self.refill();
if self.tokens >= count {
self.tokens -= count;
true
} else {
false
}
}
fn available(&mut self) -> f64 {
self.refill();
self.tokens
}
fn refill(&mut self) {
let now = Instant::now();
let elapsed = now.duration_since(self.last_refill).as_secs_f64();
self.tokens = (self.tokens + elapsed * self.rate).min(self.capacity);
self.last_refill = now;
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OverflowPolicy {
RejectNew,
RemoveOldest,
RemoveLowestPriority,
}
pub struct EventTransmitter {
queues: [VecDeque<PeatEvent>; 4],
max_queue_sizes: [usize; 4],
buckets: [TokenBucket; 4],
#[allow(dead_code)]
allocation: BandwidthAllocation,
overflow_policy: OverflowPolicy,
stats: TransmitterStats,
}
#[derive(Debug, Default, Clone)]
pub struct TransmitterStats {
pub transmitted: [u64; 4],
pub dropped: [u64; 4],
pub bytes_transmitted: [u64; 4],
}
impl EventTransmitter {
pub fn new(allocation: BandwidthAllocation) -> Self {
let critical_bucket = TokenBucket::new(
allocation.critical_reserved_bps as f64,
allocation.critical_reserved_bps as f64,
);
let high_bucket = TokenBucket::new(
allocation.high_min_bps as f64,
allocation.high_min_bps as f64,
);
let normal_bucket = TokenBucket::new(
allocation.normal_min_bps as f64,
allocation.normal_min_bps as f64,
);
let low_bucket =
TokenBucket::new(allocation.low_min_bps as f64, allocation.low_min_bps as f64);
Self {
queues: Default::default(),
max_queue_sizes: [100, 1000, 1000, 1000], buckets: [critical_bucket, high_bucket, normal_bucket, low_bucket],
allocation,
overflow_policy: OverflowPolicy::RemoveLowestPriority,
stats: TransmitterStats::default(),
}
}
pub fn with_defaults() -> Self {
Self::new(BandwidthAllocation::default())
}
pub fn set_max_queue_size(&mut self, priority: EventPriority, size: usize) {
self.max_queue_sizes[priority_to_level(priority)] = size;
}
pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
self.overflow_policy = policy;
}
pub fn enqueue(&mut self, event: PeatEvent) -> bool {
let level = self.get_level(&event);
if self.queues[level].len() >= self.max_queue_sizes[level] {
match self.overflow_policy {
OverflowPolicy::RejectNew => {
self.stats.dropped[level] += 1;
return false;
}
OverflowPolicy::RemoveOldest => {
self.queues[level].pop_front();
self.stats.dropped[level] += 1;
}
OverflowPolicy::RemoveLowestPriority => {
let dropped = self.drop_lowest_priority();
if !dropped {
self.stats.dropped[level] += 1;
return false;
}
}
}
}
self.queues[level].push_back(event);
true
}
pub fn transmit(&mut self, max_events: usize) -> Vec<PeatEvent> {
let mut result = Vec::with_capacity(max_events);
let mut remaining = max_events;
while remaining > 0 {
if let Some(event) = self.queues[0].front() {
let size = estimate_event_size(event);
if self.buckets[0].try_consume(size as f64) {
let event = self.queues[0].pop_front().unwrap();
self.stats.transmitted[0] += 1;
self.stats.bytes_transmitted[0] += size as u64;
result.push(event);
remaining -= 1;
} else {
break; }
} else {
break; }
}
if remaining == 0 {
return result;
}
let high_alloc = (remaining * 50) / 100;
let normal_alloc = (remaining * 35) / 100;
let mut high_remaining = high_alloc;
while high_remaining > 0 {
if let Some(event) = self.queues[1].front() {
let size = estimate_event_size(event);
if self.buckets[1].try_consume(size as f64) {
let event = self.queues[1].pop_front().unwrap();
self.stats.transmitted[1] += 1;
self.stats.bytes_transmitted[1] += size as u64;
result.push(event);
high_remaining -= 1;
remaining -= 1;
} else {
break; }
} else {
break;
}
}
let high_unused = high_alloc - (high_alloc - high_remaining);
let mut normal_remaining = normal_alloc + high_unused;
while normal_remaining > 0 && remaining > 0 {
if let Some(event) = self.queues[2].front() {
let size = estimate_event_size(event);
if self.buckets[2].try_consume(size as f64) {
let event = self.queues[2].pop_front().unwrap();
self.stats.transmitted[2] += 1;
self.stats.bytes_transmitted[2] += size as u64;
result.push(event);
normal_remaining -= 1;
remaining -= 1;
} else {
break;
}
} else {
break;
}
}
while remaining > 0 {
if let Some(event) = self.queues[3].front() {
let size = estimate_event_size(event);
if self.buckets[3].try_consume(size as f64) {
let event = self.queues[3].pop_front().unwrap();
self.stats.transmitted[3] += 1;
self.stats.bytes_transmitted[3] += size as u64;
result.push(event);
remaining -= 1;
} else {
break;
}
} else {
break;
}
}
result
}
pub fn transmit_critical(&mut self) -> Vec<PeatEvent> {
let mut result = Vec::new();
while let Some(event) = self.queues[0].front() {
let size = estimate_event_size(event);
if self.buckets[0].try_consume(size as f64) {
let event = self.queues[0].pop_front().unwrap();
self.stats.transmitted[0] += 1;
self.stats.bytes_transmitted[0] += size as u64;
result.push(event);
} else {
break;
}
}
result
}
pub fn has_critical(&self) -> bool {
!self.queues[0].is_empty()
}
pub fn queue_lengths(&self) -> [usize; 4] {
[
self.queues[0].len(),
self.queues[1].len(),
self.queues[2].len(),
self.queues[3].len(),
]
}
pub fn total_queued(&self) -> usize {
self.queues.iter().map(|q| q.len()).sum()
}
pub fn stats(&self) -> &TransmitterStats {
&self.stats
}
pub fn reset_stats(&mut self) {
self.stats = TransmitterStats::default();
}
pub fn available_bandwidth(&mut self) -> [f64; 4] {
[
self.buckets[0].available(),
self.buckets[1].available(),
self.buckets[2].available(),
self.buckets[3].available(),
]
}
fn get_level(&self, event: &PeatEvent) -> usize {
let priority = event
.routing
.as_ref()
.map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
.unwrap_or(EventPriority::PriorityNormal);
priority_to_level(priority)
}
fn drop_lowest_priority(&mut self) -> bool {
if !self.queues[3].is_empty() {
self.queues[3].pop_front();
self.stats.dropped[3] += 1;
return true;
}
if !self.queues[2].is_empty() {
self.queues[2].pop_front();
self.stats.dropped[2] += 1;
return true;
}
if !self.queues[1].is_empty() {
self.queues[1].pop_front();
self.stats.dropped[1] += 1;
return true;
}
false
}
}
fn priority_to_level(priority: EventPriority) -> usize {
match priority {
EventPriority::PriorityCritical => 0,
EventPriority::PriorityHigh => 1,
EventPriority::PriorityNormal => 2,
EventPriority::PriorityLow => 3,
}
}
fn estimate_event_size(event: &PeatEvent) -> usize {
let base_overhead = 200; base_overhead + event.payload_value.len()
}
#[cfg(test)]
mod tests {
use super::*;
use peat_schema::event::v1::AggregationPolicy;
fn make_event(id: &str, priority: EventPriority, payload_size: usize) -> PeatEvent {
PeatEvent {
event_id: id.to_string(),
timestamp: None,
source_node_id: "node-1".to_string(),
source_formation_id: "squad-1".to_string(),
source_instance_id: None,
event_class: peat_schema::event::v1::EventClass::Product as i32,
event_type: "test".to_string(),
routing: Some(AggregationPolicy {
propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
priority: priority as i32,
ttl_seconds: 300,
aggregation_window_ms: 0,
}),
payload_type_url: String::new(),
payload_value: vec![0u8; payload_size],
}
}
#[test]
fn test_bandwidth_allocation_default() {
let alloc = BandwidthAllocation::default();
assert_eq!(alloc.total_available_bps, 1_000_000);
assert!(alloc.critical_reserved_bps > 0);
assert!(alloc.high_min_bps > 0);
assert!(alloc.normal_min_bps > 0);
assert!(alloc.low_min_bps > 0);
}
#[test]
fn test_bandwidth_allocation_custom() {
let alloc = BandwidthAllocation::with_percentages(1_000_000, 10, 45, 30, 15);
assert_eq!(alloc.critical_reserved_bps, 100_000);
assert_eq!(alloc.high_min_bps, 450_000);
assert_eq!(alloc.normal_min_bps, 300_000);
assert_eq!(alloc.low_min_bps, 150_000);
}
#[test]
fn test_token_bucket_basic() {
let mut bucket = TokenBucket::new(1000.0, 100.0);
assert!(bucket.try_consume(500.0));
assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
assert!(!bucket.try_consume(600.0));
assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
assert!(bucket.try_consume(400.0));
assert!(bucket.tokens >= 99.0 && bucket.tokens <= 110.0);
}
#[test]
fn test_transmitter_enqueue() {
let mut tx = EventTransmitter::with_defaults();
let event = make_event("e1", EventPriority::PriorityNormal, 100);
assert!(tx.enqueue(event));
assert_eq!(tx.queue_lengths()[2], 1); }
#[test]
fn test_transmitter_critical_preemption() {
let mut tx = EventTransmitter::with_defaults();
tx.enqueue(make_event("low", EventPriority::PriorityLow, 100));
tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
tx.enqueue(make_event("high", EventPriority::PriorityHigh, 100));
tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
let events = tx.transmit(4);
assert!(!events.is_empty());
assert_eq!(events[0].event_id, "critical");
}
#[test]
fn test_transmitter_has_critical() {
let mut tx = EventTransmitter::with_defaults();
assert!(!tx.has_critical());
tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
assert!(!tx.has_critical());
tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
assert!(tx.has_critical());
tx.transmit_critical();
assert!(!tx.has_critical());
}
#[test]
fn test_transmitter_overflow_drop_incoming() {
let mut tx = EventTransmitter::with_defaults();
tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
tx.set_overflow_policy(OverflowPolicy::RejectNew);
assert!(tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100)));
assert!(tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100)));
assert!(!tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100)));
assert_eq!(tx.queue_lengths()[2], 2);
assert_eq!(tx.stats.dropped[2], 1);
}
#[test]
fn test_transmitter_overflow_drop_oldest() {
let mut tx = EventTransmitter::with_defaults();
tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
tx.set_overflow_policy(OverflowPolicy::RemoveOldest);
tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100));
tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100));
tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100));
assert_eq!(tx.queue_lengths()[2], 2);
assert_eq!(tx.stats.dropped[2], 1);
let events = tx.transmit(10);
assert!(events.iter().any(|e| e.event_id == "e2"));
assert!(events.iter().any(|e| e.event_id == "e3"));
}
#[test]
fn test_transmitter_overflow_drop_lowest() {
let mut tx = EventTransmitter::with_defaults();
tx.set_max_queue_size(EventPriority::PriorityHigh, 2);
tx.set_overflow_policy(OverflowPolicy::RemoveLowestPriority);
tx.enqueue(make_event("low1", EventPriority::PriorityLow, 100));
tx.enqueue(make_event("low2", EventPriority::PriorityLow, 100));
tx.enqueue(make_event("high1", EventPriority::PriorityHigh, 100));
tx.enqueue(make_event("high2", EventPriority::PriorityHigh, 100));
tx.enqueue(make_event("high3", EventPriority::PriorityHigh, 100));
assert_eq!(tx.queue_lengths()[1], 3); assert_eq!(tx.queue_lengths()[3], 1); assert_eq!(tx.stats.dropped[3], 1);
}
#[test]
fn test_transmitter_stats() {
let mut tx = EventTransmitter::with_defaults();
tx.enqueue(make_event("c1", EventPriority::PriorityCritical, 100));
tx.enqueue(make_event("h1", EventPriority::PriorityHigh, 200));
tx.transmit(10);
let stats = tx.stats();
assert_eq!(stats.transmitted[0], 1); assert_eq!(stats.transmitted[1], 1); assert!(stats.bytes_transmitted[0] > 0);
assert!(stats.bytes_transmitted[1] > 0);
}
#[test]
fn test_transmitter_weighted_distribution() {
let mut tx = EventTransmitter::with_defaults();
for i in 0..20 {
tx.enqueue(make_event(
&format!("h{}", i),
EventPriority::PriorityHigh,
50,
));
tx.enqueue(make_event(
&format!("n{}", i),
EventPriority::PriorityNormal,
50,
));
tx.enqueue(make_event(
&format!("l{}", i),
EventPriority::PriorityLow,
50,
));
}
let events = tx.transmit(10);
let high_count = events
.iter()
.filter(|e| e.event_id.starts_with('h'))
.count();
let normal_count = events
.iter()
.filter(|e| e.event_id.starts_with('n'))
.count();
let low_count = events
.iter()
.filter(|e| e.event_id.starts_with('l'))
.count();
assert!(high_count >= 3, "high_count={}", high_count);
assert!(normal_count >= 2, "normal_count={}", normal_count);
assert!(high_count >= low_count, "high >= low");
}
}