Skip to main content

peat_protocol/event/
transmitter.rs

1//! Event Transmitter with Bandwidth Control (ADR-027 Phase 4)
2//!
3//! Implements weighted fair queuing and bandwidth allocation for priority-based
4//! event transmission.
5//!
6//! ## Bandwidth Allocation
7//!
8//! ```text
9//! Total Bandwidth
10//!     ├── CRITICAL (reserved, preempts all)
11//!     └── Remaining
12//!         ├── HIGH    (50%)
13//!         ├── NORMAL  (35%)
14//!         └── LOW     (15%)
15//! ```
16//!
17//! ## Token Bucket Rate Limiting
18//!
19//! Each priority level uses a token bucket to enforce bandwidth limits.
20//! Tokens are replenished at a configurable rate.
21
22use peat_schema::event::v1::{EventPriority, PeatEvent};
23use std::collections::VecDeque;
24use std::time::Instant;
25
26/// Bandwidth allocation configuration
27#[derive(Debug, Clone, Copy)]
28pub struct BandwidthAllocation {
29    /// Reserved bandwidth for CRITICAL events (bytes/second)
30    pub critical_reserved_bps: u64,
31
32    /// Minimum bandwidth for HIGH priority (bytes/second)
33    pub high_min_bps: u64,
34
35    /// Minimum bandwidth for NORMAL priority (bytes/second)
36    pub normal_min_bps: u64,
37
38    /// Minimum bandwidth for LOW priority (bytes/second)
39    pub low_min_bps: u64,
40
41    /// Total available bandwidth (bytes/second)
42    pub total_available_bps: u64,
43}
44
45impl Default for BandwidthAllocation {
46    fn default() -> Self {
47        // Default: 1 Mbps total, with standard 50/35/15 split
48        let total = 1_000_000; // 1 Mbps
49        Self {
50            critical_reserved_bps: total / 10, // 10% reserved for critical
51            high_min_bps: (total * 9 / 10) * 50 / 100, // 50% of remaining
52            normal_min_bps: (total * 9 / 10) * 35 / 100, // 35% of remaining
53            low_min_bps: (total * 9 / 10) * 15 / 100, // 15% of remaining
54            total_available_bps: total,
55        }
56    }
57}
58
59impl BandwidthAllocation {
60    /// Create a new bandwidth allocation
61    pub fn new(total_bps: u64) -> Self {
62        let non_critical = total_bps * 90 / 100; // 90% for non-critical
63        Self {
64            critical_reserved_bps: total_bps / 10,
65            high_min_bps: non_critical * 50 / 100,
66            normal_min_bps: non_critical * 35 / 100,
67            low_min_bps: non_critical * 15 / 100,
68            total_available_bps: total_bps,
69        }
70    }
71
72    /// Create with custom percentages
73    pub fn with_percentages(
74        total_bps: u64,
75        critical_pct: u8,
76        high_pct: u8,
77        normal_pct: u8,
78        low_pct: u8,
79    ) -> Self {
80        assert!(
81            critical_pct + high_pct + normal_pct + low_pct <= 100,
82            "Percentages must sum to <= 100"
83        );
84        Self {
85            critical_reserved_bps: total_bps * critical_pct as u64 / 100,
86            high_min_bps: total_bps * high_pct as u64 / 100,
87            normal_min_bps: total_bps * normal_pct as u64 / 100,
88            low_min_bps: total_bps * low_pct as u64 / 100,
89            total_available_bps: total_bps,
90        }
91    }
92}
93
94/// Token bucket for rate limiting
95#[derive(Debug)]
96struct TokenBucket {
97    /// Current tokens available
98    tokens: f64,
99
100    /// Maximum tokens (bucket size)
101    capacity: f64,
102
103    /// Token refill rate (tokens per second)
104    rate: f64,
105
106    /// Last refill timestamp
107    last_refill: Instant,
108}
109
110impl TokenBucket {
111    /// Create a new token bucket
112    fn new(capacity: f64, rate: f64) -> Self {
113        Self {
114            tokens: capacity,
115            capacity,
116            rate,
117            last_refill: Instant::now(),
118        }
119    }
120
121    /// Try to consume tokens
122    ///
123    /// Returns true if tokens were consumed, false if insufficient tokens.
124    fn try_consume(&mut self, count: f64) -> bool {
125        self.refill();
126        if self.tokens >= count {
127            self.tokens -= count;
128            true
129        } else {
130            false
131        }
132    }
133
134    /// Get current token count
135    fn available(&mut self) -> f64 {
136        self.refill();
137        self.tokens
138    }
139
140    /// Refill tokens based on elapsed time
141    fn refill(&mut self) {
142        let now = Instant::now();
143        let elapsed = now.duration_since(self.last_refill).as_secs_f64();
144        self.tokens = (self.tokens + elapsed * self.rate).min(self.capacity);
145        self.last_refill = now;
146    }
147}
148
149/// Queue overflow policy
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub enum OverflowPolicy {
152    /// Reject incoming event if queue full
153    RejectNew,
154    /// Remove oldest event in queue
155    RemoveOldest,
156    /// Remove lowest priority event
157    RemoveLowestPriority,
158}
159
160/// Event transmitter with bandwidth control
161///
162/// Manages event queues with:
163/// - Weighted fair queuing for priority levels
164/// - Token bucket rate limiting
165/// - Queue overflow handling
166pub struct EventTransmitter {
167    /// Priority queues for events
168    queues: [VecDeque<PeatEvent>; 4],
169
170    /// Maximum queue sizes per priority
171    max_queue_sizes: [usize; 4],
172
173    /// Token buckets for rate limiting
174    buckets: [TokenBucket; 4],
175
176    /// Bandwidth allocation (stored for reference/debugging)
177    #[allow(dead_code)]
178    allocation: BandwidthAllocation,
179
180    /// Queue overflow policy
181    overflow_policy: OverflowPolicy,
182
183    /// Statistics
184    stats: TransmitterStats,
185}
186
187/// Transmission statistics
188#[derive(Debug, Default, Clone)]
189pub struct TransmitterStats {
190    /// Events transmitted by priority
191    pub transmitted: [u64; 4],
192
193    /// Events dropped by priority
194    pub dropped: [u64; 4],
195
196    /// Bytes transmitted by priority
197    pub bytes_transmitted: [u64; 4],
198}
199
200impl EventTransmitter {
201    /// Create a new event transmitter
202    pub fn new(allocation: BandwidthAllocation) -> Self {
203        // Token bucket capacity = 1 second worth of bandwidth
204        let critical_bucket = TokenBucket::new(
205            allocation.critical_reserved_bps as f64,
206            allocation.critical_reserved_bps as f64,
207        );
208        let high_bucket = TokenBucket::new(
209            allocation.high_min_bps as f64,
210            allocation.high_min_bps as f64,
211        );
212        let normal_bucket = TokenBucket::new(
213            allocation.normal_min_bps as f64,
214            allocation.normal_min_bps as f64,
215        );
216        let low_bucket =
217            TokenBucket::new(allocation.low_min_bps as f64, allocation.low_min_bps as f64);
218
219        Self {
220            queues: Default::default(),
221            max_queue_sizes: [100, 1000, 1000, 1000], // Default limits
222            buckets: [critical_bucket, high_bucket, normal_bucket, low_bucket],
223            allocation,
224            overflow_policy: OverflowPolicy::RemoveLowestPriority,
225            stats: TransmitterStats::default(),
226        }
227    }
228
229    /// Create with default allocation
230    pub fn with_defaults() -> Self {
231        Self::new(BandwidthAllocation::default())
232    }
233
234    /// Set maximum queue size for a priority
235    pub fn set_max_queue_size(&mut self, priority: EventPriority, size: usize) {
236        self.max_queue_sizes[priority_to_level(priority)] = size;
237    }
238
239    /// Set overflow policy
240    pub fn set_overflow_policy(&mut self, policy: OverflowPolicy) {
241        self.overflow_policy = policy;
242    }
243
244    /// Enqueue an event for transmission
245    ///
246    /// Returns true if event was accepted, false if dropped due to overflow.
247    pub fn enqueue(&mut self, event: PeatEvent) -> bool {
248        let level = self.get_level(&event);
249
250        // Check for overflow
251        if self.queues[level].len() >= self.max_queue_sizes[level] {
252            match self.overflow_policy {
253                OverflowPolicy::RejectNew => {
254                    self.stats.dropped[level] += 1;
255                    return false;
256                }
257                OverflowPolicy::RemoveOldest => {
258                    self.queues[level].pop_front();
259                    self.stats.dropped[level] += 1;
260                }
261                OverflowPolicy::RemoveLowestPriority => {
262                    // Try to drop from LOW, then NORMAL, then HIGH
263                    let dropped = self.drop_lowest_priority();
264                    if !dropped {
265                        // Can't drop anything, drop incoming
266                        self.stats.dropped[level] += 1;
267                        return false;
268                    }
269                }
270            }
271        }
272
273        self.queues[level].push_back(event);
274        true
275    }
276
277    /// Transmit events within bandwidth limits
278    ///
279    /// Returns events ready for transmission, respecting bandwidth allocation.
280    pub fn transmit(&mut self, max_events: usize) -> Vec<PeatEvent> {
281        let mut result = Vec::with_capacity(max_events);
282        let mut remaining = max_events;
283
284        // CRITICAL: Always transmit all pending (preempt)
285        while remaining > 0 {
286            if let Some(event) = self.queues[0].front() {
287                let size = estimate_event_size(event);
288                if self.buckets[0].try_consume(size as f64) {
289                    let event = self.queues[0].pop_front().unwrap();
290                    self.stats.transmitted[0] += 1;
291                    self.stats.bytes_transmitted[0] += size as u64;
292                    result.push(event);
293                    remaining -= 1;
294                } else {
295                    break; // No more critical bandwidth
296                }
297            } else {
298                break; // No more critical events
299            }
300        }
301
302        if remaining == 0 {
303            return result;
304        }
305
306        // Weighted fair queuing for non-critical
307        // Calculate allocations based on remaining capacity
308        let high_alloc = (remaining * 50) / 100;
309        let normal_alloc = (remaining * 35) / 100;
310        // LOW gets remainder
311
312        // HIGH
313        let mut high_remaining = high_alloc;
314        while high_remaining > 0 {
315            if let Some(event) = self.queues[1].front() {
316                let size = estimate_event_size(event);
317                if self.buckets[1].try_consume(size as f64) {
318                    let event = self.queues[1].pop_front().unwrap();
319                    self.stats.transmitted[1] += 1;
320                    self.stats.bytes_transmitted[1] += size as u64;
321                    result.push(event);
322                    high_remaining -= 1;
323                    remaining -= 1;
324                } else {
325                    break; // No more high bandwidth
326                }
327            } else {
328                break;
329            }
330        }
331        // Unused high allocation rolls over
332        let high_unused = high_alloc - (high_alloc - high_remaining);
333
334        // NORMAL
335        let mut normal_remaining = normal_alloc + high_unused;
336        while normal_remaining > 0 && remaining > 0 {
337            if let Some(event) = self.queues[2].front() {
338                let size = estimate_event_size(event);
339                if self.buckets[2].try_consume(size as f64) {
340                    let event = self.queues[2].pop_front().unwrap();
341                    self.stats.transmitted[2] += 1;
342                    self.stats.bytes_transmitted[2] += size as u64;
343                    result.push(event);
344                    normal_remaining -= 1;
345                    remaining -= 1;
346                } else {
347                    break;
348                }
349            } else {
350                break;
351            }
352        }
353
354        // LOW gets everything remaining
355        while remaining > 0 {
356            if let Some(event) = self.queues[3].front() {
357                let size = estimate_event_size(event);
358                if self.buckets[3].try_consume(size as f64) {
359                    let event = self.queues[3].pop_front().unwrap();
360                    self.stats.transmitted[3] += 1;
361                    self.stats.bytes_transmitted[3] += size as u64;
362                    result.push(event);
363                    remaining -= 1;
364                } else {
365                    break;
366                }
367            } else {
368                break;
369            }
370        }
371
372        result
373    }
374
375    /// Transmit all critical events immediately (preempt)
376    pub fn transmit_critical(&mut self) -> Vec<PeatEvent> {
377        let mut result = Vec::new();
378
379        while let Some(event) = self.queues[0].front() {
380            let size = estimate_event_size(event);
381            if self.buckets[0].try_consume(size as f64) {
382                let event = self.queues[0].pop_front().unwrap();
383                self.stats.transmitted[0] += 1;
384                self.stats.bytes_transmitted[0] += size as u64;
385                result.push(event);
386            } else {
387                break;
388            }
389        }
390
391        result
392    }
393
394    /// Check if there are critical events pending
395    pub fn has_critical(&self) -> bool {
396        !self.queues[0].is_empty()
397    }
398
399    /// Get queue lengths
400    pub fn queue_lengths(&self) -> [usize; 4] {
401        [
402            self.queues[0].len(),
403            self.queues[1].len(),
404            self.queues[2].len(),
405            self.queues[3].len(),
406        ]
407    }
408
409    /// Get total queued events
410    pub fn total_queued(&self) -> usize {
411        self.queues.iter().map(|q| q.len()).sum()
412    }
413
414    /// Get transmission statistics
415    pub fn stats(&self) -> &TransmitterStats {
416        &self.stats
417    }
418
419    /// Reset statistics
420    pub fn reset_stats(&mut self) {
421        self.stats = TransmitterStats::default();
422    }
423
424    /// Get available bandwidth tokens per priority
425    pub fn available_bandwidth(&mut self) -> [f64; 4] {
426        [
427            self.buckets[0].available(),
428            self.buckets[1].available(),
429            self.buckets[2].available(),
430            self.buckets[3].available(),
431        ]
432    }
433
434    // Internal helpers
435
436    fn get_level(&self, event: &PeatEvent) -> usize {
437        let priority = event
438            .routing
439            .as_ref()
440            .map(|r| EventPriority::try_from(r.priority).unwrap_or(EventPriority::PriorityNormal))
441            .unwrap_or(EventPriority::PriorityNormal);
442        priority_to_level(priority)
443    }
444
445    fn drop_lowest_priority(&mut self) -> bool {
446        // Try LOW first
447        if !self.queues[3].is_empty() {
448            self.queues[3].pop_front();
449            self.stats.dropped[3] += 1;
450            return true;
451        }
452        // Then NORMAL
453        if !self.queues[2].is_empty() {
454            self.queues[2].pop_front();
455            self.stats.dropped[2] += 1;
456            return true;
457        }
458        // Then HIGH
459        if !self.queues[1].is_empty() {
460            self.queues[1].pop_front();
461            self.stats.dropped[1] += 1;
462            return true;
463        }
464        // Never drop CRITICAL
465        false
466    }
467}
468
469/// Convert EventPriority to queue level index
470fn priority_to_level(priority: EventPriority) -> usize {
471    match priority {
472        EventPriority::PriorityCritical => 0,
473        EventPriority::PriorityHigh => 1,
474        EventPriority::PriorityNormal => 2,
475        EventPriority::PriorityLow => 3,
476    }
477}
478
479/// Estimate event size in bytes
480fn estimate_event_size(event: &PeatEvent) -> usize {
481    // Approximate size: base overhead + payload
482    let base_overhead = 200; // Headers, routing, metadata
483    base_overhead + event.payload_value.len()
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use peat_schema::event::v1::AggregationPolicy;
490
491    fn make_event(id: &str, priority: EventPriority, payload_size: usize) -> PeatEvent {
492        PeatEvent {
493            event_id: id.to_string(),
494            timestamp: None,
495            source_node_id: "node-1".to_string(),
496            source_formation_id: "squad-1".to_string(),
497            source_instance_id: None,
498            event_class: peat_schema::event::v1::EventClass::Product as i32,
499            event_type: "test".to_string(),
500            routing: Some(AggregationPolicy {
501                propagation: peat_schema::event::v1::PropagationMode::PropagationFull as i32,
502                priority: priority as i32,
503                ttl_seconds: 300,
504                aggregation_window_ms: 0,
505            }),
506            payload_type_url: String::new(),
507            payload_value: vec![0u8; payload_size],
508        }
509    }
510
511    #[test]
512    fn test_bandwidth_allocation_default() {
513        let alloc = BandwidthAllocation::default();
514        assert_eq!(alloc.total_available_bps, 1_000_000);
515        assert!(alloc.critical_reserved_bps > 0);
516        assert!(alloc.high_min_bps > 0);
517        assert!(alloc.normal_min_bps > 0);
518        assert!(alloc.low_min_bps > 0);
519    }
520
521    #[test]
522    fn test_bandwidth_allocation_custom() {
523        let alloc = BandwidthAllocation::with_percentages(1_000_000, 10, 45, 30, 15);
524        assert_eq!(alloc.critical_reserved_bps, 100_000);
525        assert_eq!(alloc.high_min_bps, 450_000);
526        assert_eq!(alloc.normal_min_bps, 300_000);
527        assert_eq!(alloc.low_min_bps, 150_000);
528    }
529
530    #[test]
531    fn test_token_bucket_basic() {
532        let mut bucket = TokenBucket::new(1000.0, 100.0); // 1000 capacity, 100/sec refill
533
534        // Initial consumption
535        assert!(bucket.try_consume(500.0));
536        // Tokens should be around 500 (with small variance due to time elapsed)
537        assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
538
539        // Try to consume more than available (600 > ~500)
540        assert!(!bucket.try_consume(600.0));
541        // Tokens should still be around 500
542        assert!(bucket.tokens >= 499.0 && bucket.tokens <= 501.0);
543
544        // Consume most of remaining
545        assert!(bucket.try_consume(400.0));
546        // Should have around 100 tokens left
547        assert!(bucket.tokens >= 99.0 && bucket.tokens <= 110.0);
548    }
549
550    #[test]
551    fn test_transmitter_enqueue() {
552        let mut tx = EventTransmitter::with_defaults();
553
554        let event = make_event("e1", EventPriority::PriorityNormal, 100);
555        assert!(tx.enqueue(event));
556
557        assert_eq!(tx.queue_lengths()[2], 1); // NORMAL is level 2
558    }
559
560    #[test]
561    fn test_transmitter_critical_preemption() {
562        let mut tx = EventTransmitter::with_defaults();
563
564        // Add events of different priorities
565        tx.enqueue(make_event("low", EventPriority::PriorityLow, 100));
566        tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
567        tx.enqueue(make_event("high", EventPriority::PriorityHigh, 100));
568        tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
569
570        // Transmit should return critical first
571        let events = tx.transmit(4);
572        assert!(!events.is_empty());
573        assert_eq!(events[0].event_id, "critical");
574    }
575
576    #[test]
577    fn test_transmitter_has_critical() {
578        let mut tx = EventTransmitter::with_defaults();
579
580        assert!(!tx.has_critical());
581
582        tx.enqueue(make_event("normal", EventPriority::PriorityNormal, 100));
583        assert!(!tx.has_critical());
584
585        tx.enqueue(make_event("critical", EventPriority::PriorityCritical, 100));
586        assert!(tx.has_critical());
587
588        tx.transmit_critical();
589        assert!(!tx.has_critical());
590    }
591
592    #[test]
593    fn test_transmitter_overflow_drop_incoming() {
594        let mut tx = EventTransmitter::with_defaults();
595        tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
596        tx.set_overflow_policy(OverflowPolicy::RejectNew);
597
598        assert!(tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100)));
599        assert!(tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100)));
600        assert!(!tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100)));
601
602        assert_eq!(tx.queue_lengths()[2], 2);
603        assert_eq!(tx.stats.dropped[2], 1);
604    }
605
606    #[test]
607    fn test_transmitter_overflow_drop_oldest() {
608        let mut tx = EventTransmitter::with_defaults();
609        tx.set_max_queue_size(EventPriority::PriorityNormal, 2);
610        tx.set_overflow_policy(OverflowPolicy::RemoveOldest);
611
612        tx.enqueue(make_event("e1", EventPriority::PriorityNormal, 100));
613        tx.enqueue(make_event("e2", EventPriority::PriorityNormal, 100));
614        tx.enqueue(make_event("e3", EventPriority::PriorityNormal, 100));
615
616        assert_eq!(tx.queue_lengths()[2], 2);
617        assert_eq!(tx.stats.dropped[2], 1);
618
619        // e1 should be dropped, e2 and e3 remain
620        let events = tx.transmit(10);
621        assert!(events.iter().any(|e| e.event_id == "e2"));
622        assert!(events.iter().any(|e| e.event_id == "e3"));
623    }
624
625    #[test]
626    fn test_transmitter_overflow_drop_lowest() {
627        let mut tx = EventTransmitter::with_defaults();
628        tx.set_max_queue_size(EventPriority::PriorityHigh, 2);
629        tx.set_overflow_policy(OverflowPolicy::RemoveLowestPriority);
630
631        // Fill LOW queue first
632        tx.enqueue(make_event("low1", EventPriority::PriorityLow, 100));
633        tx.enqueue(make_event("low2", EventPriority::PriorityLow, 100));
634
635        // Fill HIGH queue
636        tx.enqueue(make_event("high1", EventPriority::PriorityHigh, 100));
637        tx.enqueue(make_event("high2", EventPriority::PriorityHigh, 100));
638
639        // Overflow HIGH - should drop LOW
640        tx.enqueue(make_event("high3", EventPriority::PriorityHigh, 100));
641
642        assert_eq!(tx.queue_lengths()[1], 3); // HIGH queue has 3
643        assert_eq!(tx.queue_lengths()[3], 1); // LOW queue lost one
644        assert_eq!(tx.stats.dropped[3], 1);
645    }
646
647    #[test]
648    fn test_transmitter_stats() {
649        let mut tx = EventTransmitter::with_defaults();
650
651        tx.enqueue(make_event("c1", EventPriority::PriorityCritical, 100));
652        tx.enqueue(make_event("h1", EventPriority::PriorityHigh, 200));
653
654        tx.transmit(10);
655
656        let stats = tx.stats();
657        assert_eq!(stats.transmitted[0], 1); // CRITICAL
658        assert_eq!(stats.transmitted[1], 1); // HIGH
659        assert!(stats.bytes_transmitted[0] > 0);
660        assert!(stats.bytes_transmitted[1] > 0);
661    }
662
663    #[test]
664    fn test_transmitter_weighted_distribution() {
665        let mut tx = EventTransmitter::with_defaults();
666
667        // Add many events at each priority
668        for i in 0..20 {
669            tx.enqueue(make_event(
670                &format!("h{}", i),
671                EventPriority::PriorityHigh,
672                50,
673            ));
674            tx.enqueue(make_event(
675                &format!("n{}", i),
676                EventPriority::PriorityNormal,
677                50,
678            ));
679            tx.enqueue(make_event(
680                &format!("l{}", i),
681                EventPriority::PriorityLow,
682                50,
683            ));
684        }
685
686        // Transmit some events
687        let events = tx.transmit(10);
688
689        // Count by priority
690        let high_count = events
691            .iter()
692            .filter(|e| e.event_id.starts_with('h'))
693            .count();
694        let normal_count = events
695            .iter()
696            .filter(|e| e.event_id.starts_with('n'))
697            .count();
698        let low_count = events
699            .iter()
700            .filter(|e| e.event_id.starts_with('l'))
701            .count();
702
703        // Should roughly follow 50/35/15 distribution (with some variance)
704        assert!(high_count >= 3, "high_count={}", high_count);
705        assert!(normal_count >= 2, "normal_count={}", normal_count);
706        assert!(high_count >= low_count, "high >= low");
707    }
708}