1use peat_schema::event::v1::{EventPriority, PeatEvent};
23use std::collections::VecDeque;
24use std::time::Instant;
25
26#[derive(Debug, Clone, Copy)]
28pub struct BandwidthAllocation {
29 pub critical_reserved_bps: u64,
31
32 pub high_min_bps: u64,
34
35 pub normal_min_bps: u64,
37
38 pub low_min_bps: u64,
40
41 pub total_available_bps: u64,
43}
44
45impl Default for BandwidthAllocation {
46 fn default() -> Self {
47 let total = 1_000_000; Self {
50 critical_reserved_bps: total / 10, high_min_bps: (total * 9 / 10) * 50 / 100, normal_min_bps: (total * 9 / 10) * 35 / 100, low_min_bps: (total * 9 / 10) * 15 / 100, total_available_bps: total,
55 }
56 }
57}
58
59impl BandwidthAllocation {
60 pub fn new(total_bps: u64) -> Self {
62 let non_critical = total_bps * 90 / 100; Self {
64 critical_reserved_bps: total_bps / 10,
65 high_min_bps: non_critical * 50 / 100,
66 normal_min_bps: non_critical * 35 / 100,
67 low_min_bps: non_critical * 15 / 100,
68 total_available_bps: total_bps,
69 }
70 }
71
72 pub fn with_percentages(
74 total_bps: u64,
75 critical_pct: u8,
76 high_pct: u8,
77 normal_pct: u8,
78 low_pct: u8,
79 ) -> Self {
80 assert!(
81 critical_pct + high_pct + normal_pct + low_pct <= 100,
82 "Percentages must sum to <= 100"
83 );
84 Self {
85 critical_reserved_bps: total_bps * critical_pct as u64 / 100,
86 high_min_bps: total_bps * high_pct as u64 / 100,
87 normal_min_bps: total_bps * normal_pct as u64 / 100,
88 low_min_bps: total_bps * low_pct as u64 / 100,
89 total_available_bps: total_bps,
90 }
91 }
92}
93
94#[derive(Debug)]
96struct TokenBucket {
97 tokens: f64,
99
100 capacity: f64,
102
103 rate: f64,
105
106 last_refill: Instant,
108}
109
110impl TokenBucket {
111 fn new(capacity: f64, rate: f64) -> Self {
113 Self {
114 tokens: capacity,
115 capacity,
116 rate,
117 last_refill: Instant::now(),
118 }
119 }
120
121 fn try_consume(&mut self, count: f64) -> bool {
125 self.refill();
126 if self.tokens >= count {
127 self.tokens -= count;
128 true
129 } else {
130 false
131 }
132 }
133
134 fn available(&mut self) -> f64 {
136 self.refill();
137 self.tokens
138 }
139
140 fn refill(&mut self) {
142 let now = Instant::now();
143 let elapsed = now.duration_since(self.last_refill).as_secs_f64();
144 self.tokens = (self.tokens + elapsed * self.rate).min(self.capacity);
145 self.last_refill = now;
146 }
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum OverflowPolicy {
152 RejectNew,
154 RemoveOldest,
156 RemoveLowestPriority,
158}
159
160pub struct EventTransmitter {
167 queues: [VecDeque<PeatEvent>; 4],
169
170 max_queue_sizes: [usize; 4],
172
173 buckets: [TokenBucket; 4],
175
176 #[allow(dead_code)]
178 allocation: BandwidthAllocation,
179
180 overflow_policy: OverflowPolicy,
182
183 stats: TransmitterStats,
185}
186
187#[derive(Debug, Default, Clone)]
189pub struct TransmitterStats {
190 pub transmitted: [u64; 4],
192
193 pub dropped: [u64; 4],
195
196 pub bytes_transmitted: [u64; 4],
198}
199
200impl EventTransmitter {
201 pub fn new(allocation: BandwidthAllocation) -> Self {
203 let critical_bucket = TokenBucket::new(
205 allocation.critical_reserved_bps as f64,
206 allocation.critical_reserved_bps as f64,
207 );
208 let high_bucket = TokenBucket::new(
209 allocation.high_min_bps as f64,
210 allocation.high_min_bps as f64,
211 );
212 let normal_bucket = TokenBucket::new(
213 allocation.normal_min_bps as f64,
214 allocation.normal_min_bps as f64,
215 );
216 let low_bucket =
217 TokenBucket::new(allocation.low_min_bps as f64, allocation.low_min_bps as f64);
218
219 Self {
220 queues: Default::default(),
221 max_queue_sizes: [100, 1000, 1000, 1000], buckets: [critical_bucket, high_bucket, normal_bucket, low_bucket],
223 allocation,
224 overflow_policy: OverflowPolicy::RemoveLowestPriority,
225 stats: TransmitterStats::default(),
226 }
227 }
228
229 pub fn with_defaults() -> Self {
231 Self::new(BandwidthAllocation::default())
232 }
233
234 pub fn set_max_queue_size(&mut self, priority: EventPriority, size: usize) {
236 self.max_queue_sizes[priority_to_level(priority)] = size;
237 }
238
239 pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
241 self.overflow_policy = policy;
242 }
243
244 pub fn enqueue(&mut self, event: PeatEvent) -> bool {
248 let level = self.get_level(&event);
249
250 if self.queues[level].len() >= self.max_queue_sizes[level] {
252 match self.overflow_policy {
253 OverflowPolicy::RejectNew => {
254 self.stats.dropped[level] += 1;
255 return false;
256 }
257 OverflowPolicy::RemoveOldest => {
258 self.queues[level].pop_front();
259 self.stats.dropped[level] += 1;
260 }
261 OverflowPolicy::RemoveLowestPriority => {
262 let dropped = self.drop_lowest_priority();
264 if !dropped {
265 self.stats.dropped[level] += 1;
267 return false;
268 }
269 }
270 }
271 }
272
273 self.queues[level].push_back(event);
274 true
275 }
276
277 pub fn transmit(&mut self, max_events: usize) -> Vec<PeatEvent> {
281 let mut result = Vec::with_capacity(max_events);
282 let mut remaining = max_events;
283
284 while remaining > 0 {
286 if let Some(event) = self.queues[0].front() {
287 let size = estimate_event_size(event);
288 if self.buckets[0].try_consume(size as f64) {
289 let event = self.queues[0].pop_front().unwrap();
290 self.stats.transmitted[0] += 1;
291 self.stats.bytes_transmitted[0] += size as u64;
292 result.push(event);
293 remaining -= 1;
294 } else {
295 break; }
297 } else {
298 break; }
300 }
301
302 if remaining == 0 {
303 return result;
304 }
305
306 let high_alloc = (remaining * 50) / 100;
309 let normal_alloc = (remaining * 35) / 100;
310 let mut high_remaining = high_alloc;
314 while high_remaining > 0 {
315 if let Some(event) = self.queues[1].front() {
316 let size = estimate_event_size(event);
317 if self.buckets[1].try_consume(size as f64) {
318 let event = self.queues[1].pop_front().unwrap();
319 self.stats.transmitted[1] += 1;
320 self.stats.bytes_transmitted[1] += size as u64;
321 result.push(event);
322 high_remaining -= 1;
323 remaining -= 1;
324 } else {
325 break; }
327 } else {
328 break;
329 }
330 }
331 let high_unused = high_alloc - (high_alloc - high_remaining);
333
334 let mut normal_remaining = normal_alloc + high_unused;
336 while normal_remaining > 0 && remaining > 0 {
337 if let Some(event) = self.queues[2].front() {
338 let size = estimate_event_size(event);
339 if self.buckets[2].try_consume(size as f64) {
340 let event = self.queues[2].pop_front().unwrap();
341 self.stats.transmitted[2] += 1;
342 self.stats.bytes_transmitted[2] += size as u64;
343 result.push(event);
344 normal_remaining -= 1;
345 remaining -= 1;
346 } else {
347 break;
348 }
349 } else {
350 break;
351 }
352 }
353
354 while remaining > 0 {
356 if let Some(event) = self.queues[3].front() {
357 let size = estimate_event_size(event);
358 if self.buckets[3].try_consume(size as f64) {
359 let event = self.queues[3].pop_front().unwrap();
360 self.stats.transmitted[3] += 1;
361 self.stats.bytes_transmitted[3] += size as u64;
362 result.push(event);
363 remaining -= 1;
364 } else {
365 break;
366 }
367 } else {
368 break;
369 }
370 }
371
372 result
373 }
374
375 pub fn transmit_critical(&mut self) -> Vec<PeatEvent> {
377 let mut result = Vec::new();
378
379 while let Some(event) = self.queues[0].front() {
380 let size = estimate_event_size(event);
381 if self.buckets[0].try_consume(size as f64) {
382 let event = self.queues[0].pop_front().unwrap();
383 self.stats.transmitted[0] += 1;
384 self.stats.bytes_transmitted[0] += size as u64;
385 result.push(event);
386 } else {
387 break;
388 }
389 }
390
391 result
392 }
393
394 pub fn has_critical(&self) -> bool {
396 !self.queues[0].is_empty()
397 }
398
399 pub fn queue_lengths(&self) -> [usize; 4] {
401 [
402 self.queues[0].len(),
403 self.queues[1].len(),
404 self.queues[2].len(),
405 self.queues[3].len(),
406 ]
407 }
408
409 pub fn total_queued(&self) -> usize {
411 self.queues.iter().map(|q| q.len()).sum()
412 }
413
414 pub fn stats(&self) -> &TransmitterStats {
416 &self.stats
417 }
418
419 pub fn reset_stats(&mut self) {
421 self.stats = TransmitterStats::default();
422 }
423
424 pub fn available_bandwidth(&mut self) -> [f64; 4] {
426 [
427 self.buckets[0].available(),
428 self.buckets[1].available(),
429 self.buckets[2].available(),
430 self.buckets[3].available(),
431 ]
432 }
433
434 fn get_level(&self, event: &PeatEvent) -> usize {
437 let priority = event
438 .routing
439 .as_ref()
440 .map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
441 .unwrap_or(EventPriority::PriorityNormal);
442 priority_to_level(priority)
443 }
444
445 fn drop_lowest_priority(&mut self) -> bool {
446 if !self.queues[3].is_empty() {
448 self.queues[3].pop_front();
449 self.stats.dropped[3] += 1;
450 return true;
451 }
452 if !self.queues[2].is_empty() {
454 self.queues[2].pop_front();
455 self.stats.dropped[2] += 1;
456 return true;
457 }
458 if !self.queues[1].is_empty() {
460 self.queues[1].pop_front();
461 self.stats.dropped[1] += 1;
462 return true;
463 }
464 false
466 }
467}
468
469fn priority_to_level(priority: EventPriority) -> usize {
471 match priority {
472 EventPriority::PriorityCritical => 0,
473 EventPriority::PriorityHigh => 1,
474 EventPriority::PriorityNormal => 2,
475 EventPriority::PriorityLow => 3,
476 }
477}
478
479fn estimate_event_size(event: &PeatEvent) -> usize {
481 let base_overhead = 200; base_overhead + event.payload_value.len()
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use peat_schema::event::v1::AggregationPolicy;
490
491 fn make_event(id: &str, priority: EventPriority, payload_size: usize) -> PeatEvent {
492 PeatEvent {
493 event_id: id.to_string(),
494 timestamp: None,
495 source_node_id: "node-1".to_string(),
496 source_formation_id: "squad-1".to_string(),
497 source_instance_id: None,
498 event_class: peat_schema::event::v1::EventClass::Product as i32,
499 event_type: "test".to_string(),
500 routing: Some(AggregationPolicy {
501 propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
502 priority: priority as i32,
503 ttl_seconds: 300,
504 aggregation_window_ms: 0,
505 }),
506 payload_type_url: String::new(),
507 payload_value: vec![0u8; payload_size],
508 }
509 }
510
511 #[test]
512 fn test_bandwidth_allocation_default() {
513 let alloc = BandwidthAllocation::default();
514 assert_eq!(alloc.total_available_bps, 1_000_000);
515 assert!(alloc.critical_reserved_bps > 0);
516 assert!(alloc.high_min_bps > 0);
517 assert!(alloc.normal_min_bps > 0);
518 assert!(alloc.low_min_bps > 0);
519 }
520
521 #[test]
522 fn test_bandwidth_allocation_custom() {
523 let alloc = BandwidthAllocation::with_percentages(1_000_000, 10, 45, 30, 15);
524 assert_eq!(alloc.critical_reserved_bps, 100_000);
525 assert_eq!(alloc.high_min_bps, 450_000);
526 assert_eq!(alloc.normal_min_bps, 300_000);
527 assert_eq!(alloc.low_min_bps, 150_000);
528 }
529
530 #[test]
531 fn test_token_bucket_basic() {
532 let mut bucket = TokenBucket::new(1000.0, 100.0); assert!(bucket.try_consume(500.0));
536 assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
538
539 assert!(!bucket.try_consume(600.0));
541 assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
543
544 assert!(bucket.try_consume(400.0));
546 assert!(bucket.tokens >= 99.0 && bucket.tokens <= 110.0);
548 }
549
550 #[test]
551 fn test_transmitter_enqueue() {
552 let mut tx = EventTransmitter::with_defaults();
553
554 let event = make_event("e1", EventPriority::PriorityNormal, 100);
555 assert!(tx.enqueue(event));
556
557 assert_eq!(tx.queue_lengths()[2], 1); }
559
560 #[test]
561 fn test_transmitter_critical_preemption() {
562 let mut tx = EventTransmitter::with_defaults();
563
564 tx.enqueue(make_event("low", EventPriority::PriorityLow, 100));
566 tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
567 tx.enqueue(make_event("high", EventPriority::PriorityHigh, 100));
568 tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
569
570 let events = tx.transmit(4);
572 assert!(!events.is_empty());
573 assert_eq!(events[0].event_id, "critical");
574 }
575
576 #[test]
577 fn test_transmitter_has_critical() {
578 let mut tx = EventTransmitter::with_defaults();
579
580 assert!(!tx.has_critical());
581
582 tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
583 assert!(!tx.has_critical());
584
585 tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
586 assert!(tx.has_critical());
587
588 tx.transmit_critical();
589 assert!(!tx.has_critical());
590 }
591
592 #[test]
593 fn test_transmitter_overflow_drop_incoming() {
594 let mut tx = EventTransmitter::with_defaults();
595 tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
596 tx.set_overflow_policy(OverflowPolicy::RejectNew);
597
598 assert!(tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100)));
599 assert!(tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100)));
600 assert!(!tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100)));
601
602 assert_eq!(tx.queue_lengths()[2], 2);
603 assert_eq!(tx.stats.dropped[2], 1);
604 }
605
606 #[test]
607 fn test_transmitter_overflow_drop_oldest() {
608 let mut tx = EventTransmitter::with_defaults();
609 tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
610 tx.set_overflow_policy(OverflowPolicy::RemoveOldest);
611
612 tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100));
613 tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100));
614 tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100));
615
616 assert_eq!(tx.queue_lengths()[2], 2);
617 assert_eq!(tx.stats.dropped[2], 1);
618
619 let events = tx.transmit(10);
621 assert!(events.iter().any(|e| e.event_id == "e2"));
622 assert!(events.iter().any(|e| e.event_id == "e3"));
623 }
624
625 #[test]
626 fn test_transmitter_overflow_drop_lowest() {
627 let mut tx = EventTransmitter::with_defaults();
628 tx.set_max_queue_size(EventPriority::PriorityHigh, 2);
629 tx.set_overflow_policy(OverflowPolicy::RemoveLowestPriority);
630
631 tx.enqueue(make_event("low1", EventPriority::PriorityLow, 100));
633 tx.enqueue(make_event("low2", EventPriority::PriorityLow, 100));
634
635 tx.enqueue(make_event("high1", EventPriority::PriorityHigh, 100));
637 tx.enqueue(make_event("high2", EventPriority::PriorityHigh, 100));
638
639 tx.enqueue(make_event("high3", EventPriority::PriorityHigh, 100));
641
642 assert_eq!(tx.queue_lengths()[1], 3); assert_eq!(tx.queue_lengths()[3], 1); assert_eq!(tx.stats.dropped[3], 1);
645 }
646
647 #[test]
648 fn test_transmitter_stats() {
649 let mut tx = EventTransmitter::with_defaults();
650
651 tx.enqueue(make_event("c1", EventPriority::PriorityCritical, 100));
652 tx.enqueue(make_event("h1", EventPriority::PriorityHigh, 200));
653
654 tx.transmit(10);
655
656 let stats = tx.stats();
657 assert_eq!(stats.transmitted[0], 1); assert_eq!(stats.transmitted[1], 1); assert!(stats.bytes_transmitted[0] > 0);
660 assert!(stats.bytes_transmitted[1] > 0);
661 }
662
663 #[test]
664 fn test_transmitter_weighted_distribution() {
665 let mut tx = EventTransmitter::with_defaults();
666
667 for i in 0..20 {
669 tx.enqueue(make_event(
670 &format!("h{}", i),
671 EventPriority::PriorityHigh,
672 50,
673 ));
674 tx.enqueue(make_event(
675 &format!("n{}", i),
676 EventPriority::PriorityNormal,
677 50,
678 ));
679 tx.enqueue(make_event(
680 &format!("l{}", i),
681 EventPriority::PriorityLow,
682 50,
683 ));
684 }
685
686 let events = tx.transmit(10);
688
689 let high_count = events
691 .iter()
692 .filter(|e| e.event_id.starts_with('h'))
693 .count();
694 let normal_count = events
695 .iter()
696 .filter(|e| e.event_id.starts_with('n'))
697 .count();
698 let low_count = events
699 .iter()
700 .filter(|e| e.event_id.starts_with('l'))
701 .count();
702
703 assert!(high_count >= 3, "high_count={}", high_count);
705 assert!(normal_count >= 2, "normal_count={}", normal_count);
706 assert!(high_count >= low_count, "high >= low");
707 }
708}