1use crate::cell::messaging::MessagePriority;
62use crate::qos::QoSClass;
63use crate::Result;
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67use tokio::sync::Mutex;
68use tracing::{debug, instrument, warn};
69
70#[derive(Debug, Clone, Copy)]
72pub struct BandwidthLimit {
73 pub messages_per_sec: usize,
75 pub bytes_per_sec: usize,
77}
78
79impl BandwidthLimit {
80 pub fn new(messages_per_sec: usize, bytes_per_sec: usize) -> Self {
82 Self {
83 messages_per_sec,
84 bytes_per_sec,
85 }
86 }
87
88 pub fn cell_default() -> Self {
90 Self {
91 messages_per_sec: 100,
92 bytes_per_sec: 100_000, }
94 }
95
96 pub fn zone_default() -> Self {
98 Self {
99 messages_per_sec: 50,
100 bytes_per_sec: 50_000, }
102 }
103}
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RoutingLevel {
108 Cell,
110 Zone,
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum MessageDropPolicy {
117 DropLowPriority,
119 DropOldest,
121 NeverDrop,
123}
124
125#[derive(Debug)]
127struct BackpressureState {
128 active: bool,
130 started_at: Option<Instant>,
132 #[allow(dead_code)] dropped_count: u64,
135}
136
137impl BackpressureState {
138 fn new() -> Self {
139 Self {
140 active: false,
141 started_at: None,
142 dropped_count: 0,
143 }
144 }
145
146 fn activate(&mut self) {
147 if !self.active {
148 self.active = true;
149 self.started_at = Some(Instant::now());
150 debug!("Backpressure activated");
151 }
152 }
153
154 fn deactivate(&mut self) {
155 if self.active {
156 self.active = false;
157 let duration = self
158 .started_at
159 .map(|s| s.elapsed().as_millis())
160 .unwrap_or(0);
161 debug!("Backpressure released after {}ms", duration);
162 self.started_at = None;
163 }
164 }
165}
166
167struct TokenBucket {
172 tokens: Arc<Mutex<f64>>,
174 capacity: f64,
176 refill_rate: f64,
178 last_refill: Arc<Mutex<Instant>>,
180}
181
182impl TokenBucket {
183 fn new(capacity: f64, refill_rate: f64) -> Self {
185 Self {
186 tokens: Arc::new(Mutex::new(capacity)),
187 capacity,
188 refill_rate,
189 last_refill: Arc::new(Mutex::new(Instant::now())),
190 }
191 }
192
193 async fn try_consume(&self, amount: f64) -> bool {
195 self.refill().await;
197
198 let mut tokens = self.tokens.lock().await;
199 if *tokens >= amount {
200 *tokens -= amount;
201 true
202 } else {
203 false
204 }
205 }
206
207 async fn consume(&self, amount: f64) -> Result<()> {
209 loop {
210 if self.try_consume(amount).await {
211 return Ok(());
212 }
213 tokio::time::sleep(Duration::from_millis(10)).await;
215 }
216 }
217
218 async fn refill(&self) {
220 let mut last_refill = self.last_refill.lock().await;
221 let elapsed = last_refill.elapsed().as_secs_f64();
222
223 if elapsed > 0.0 {
224 let mut tokens = self.tokens.lock().await;
225 let new_tokens = elapsed * self.refill_rate;
226 *tokens = (*tokens + new_tokens).min(self.capacity);
227 *last_refill = Instant::now();
228 }
229 }
230
231 async fn available_tokens(&self) -> f64 {
233 self.refill().await;
234 *self.tokens.lock().await
235 }
236}
237
238pub struct FlowController {
242 cell_message_limiter: Arc<TokenBucket>,
244 cell_byte_limiter: Arc<TokenBucket>,
246 zone_message_limiter: Arc<TokenBucket>,
248 zone_byte_limiter: Arc<TokenBucket>,
250 backpressure: Arc<Mutex<BackpressureState>>,
252 drop_policy: MessageDropPolicy,
254 metrics: Arc<FlowMetricsInner>,
256}
257
258struct FlowMetricsInner {
260 cell_messages_sent: AtomicU64,
261 cell_bytes_sent: AtomicU64,
262 zone_messages_sent: AtomicU64,
263 zone_bytes_sent: AtomicU64,
264 messages_dropped: AtomicU64,
265 backpressure_events: AtomicU64,
266}
267
268impl FlowController {
269 pub fn new(
271 cell_limit: BandwidthLimit,
272 zone_limit: BandwidthLimit,
273 drop_policy: MessageDropPolicy,
274 ) -> Self {
275 Self {
276 cell_message_limiter: Arc::new(TokenBucket::new(
277 cell_limit.messages_per_sec as f64,
278 cell_limit.messages_per_sec as f64,
279 )),
280 cell_byte_limiter: Arc::new(TokenBucket::new(
281 cell_limit.bytes_per_sec as f64,
282 cell_limit.bytes_per_sec as f64,
283 )),
284 zone_message_limiter: Arc::new(TokenBucket::new(
285 zone_limit.messages_per_sec as f64,
286 zone_limit.messages_per_sec as f64,
287 )),
288 zone_byte_limiter: Arc::new(TokenBucket::new(
289 zone_limit.bytes_per_sec as f64,
290 zone_limit.bytes_per_sec as f64,
291 )),
292 backpressure: Arc::new(Mutex::new(BackpressureState::new())),
293 drop_policy,
294 metrics: Arc::new(FlowMetricsInner {
295 cell_messages_sent: AtomicU64::new(0),
296 cell_bytes_sent: AtomicU64::new(0),
297 zone_messages_sent: AtomicU64::new(0),
298 zone_bytes_sent: AtomicU64::new(0),
299 messages_dropped: AtomicU64::new(0),
300 backpressure_events: AtomicU64::new(0),
301 }),
302 }
303 }
304
305 #[instrument(skip(self))]
309 pub async fn acquire_permit(
310 &self,
311 level: RoutingLevel,
312 message_size: usize,
313 priority: MessagePriority,
314 ) -> Result<Permit> {
315 let (msg_limiter, byte_limiter) = match level {
316 RoutingLevel::Cell => (&self.cell_message_limiter, &self.cell_byte_limiter),
317 RoutingLevel::Zone => (&self.zone_message_limiter, &self.zone_byte_limiter),
318 };
319
320 let priority_multiplier = match priority {
323 MessagePriority::Critical => 0.5, MessagePriority::High => 0.75, MessagePriority::Normal => 1.0, MessagePriority::Low => 1.5, };
328
329 let message_tokens = 1.0 * priority_multiplier;
330 let byte_tokens = message_size as f64 * priority_multiplier;
331
332 let acquired = msg_limiter.try_consume(message_tokens).await
334 && byte_limiter.try_consume(byte_tokens).await;
335
336 if !acquired {
337 self.apply_backpressure_internal(level).await;
339
340 msg_limiter.consume(message_tokens).await?;
342 byte_limiter.consume(byte_tokens).await?;
343 }
344
345 match level {
347 RoutingLevel::Cell => {
348 self.metrics
349 .cell_messages_sent
350 .fetch_add(1, Ordering::Relaxed);
351 self.metrics
352 .cell_bytes_sent
353 .fetch_add(message_size as u64, Ordering::Relaxed);
354 }
355 RoutingLevel::Zone => {
356 self.metrics
357 .zone_messages_sent
358 .fetch_add(1, Ordering::Relaxed);
359 self.metrics
360 .zone_bytes_sent
361 .fetch_add(message_size as u64, Ordering::Relaxed);
362 }
363 }
364
365 Ok(Permit { _private: () })
366 }
367
368 pub async fn has_backpressure(&self) -> bool {
370 let state = self.backpressure.lock().await;
371 state.active
372 }
373
374 #[instrument(skip(self))]
400 pub async fn acquire_permit_qos(
401 &self,
402 level: RoutingLevel,
403 message_size: usize,
404 qos_class: QoSClass,
405 ) -> Result<Permit> {
406 let priority: MessagePriority = qos_class.into();
408 self.acquire_permit(level, message_size, priority).await
409 }
410
411 pub fn should_drop_qos(&self, qos_class: QoSClass) -> bool {
416 let priority: MessagePriority = qos_class.into();
417 self.should_drop(priority)
418 }
419
420 async fn apply_backpressure_internal(&self, level: RoutingLevel) {
422 let mut state = self.backpressure.lock().await;
423 state.activate();
424 self.metrics
425 .backpressure_events
426 .fetch_add(1, Ordering::Relaxed);
427
428 warn!("Backpressure applied at {:?} level", level);
429 }
430
431 pub async fn release_backpressure(&self) {
433 let mut state = self.backpressure.lock().await;
434 state.deactivate();
435 }
436
437 pub fn should_drop(&self, priority: MessagePriority) -> bool {
439 match self.drop_policy {
440 MessageDropPolicy::DropLowPriority => {
441 matches!(priority, MessagePriority::Low | MessagePriority::Normal)
443 }
444 MessageDropPolicy::DropOldest => {
445 false
447 }
448 MessageDropPolicy::NeverDrop => false,
449 }
450 }
451
452 pub fn record_drop(&self) {
454 self.metrics
455 .messages_dropped
456 .fetch_add(1, Ordering::Relaxed);
457 }
458
459 pub fn get_metrics(&self) -> FlowMetrics {
461 FlowMetrics {
462 cell_messages_sent: self.metrics.cell_messages_sent.load(Ordering::Relaxed),
463 cell_bytes_sent: self.metrics.cell_bytes_sent.load(Ordering::Relaxed),
464 zone_messages_sent: self.metrics.zone_messages_sent.load(Ordering::Relaxed),
465 zone_bytes_sent: self.metrics.zone_bytes_sent.load(Ordering::Relaxed),
466 messages_dropped: self.metrics.messages_dropped.load(Ordering::Relaxed),
467 backpressure_events: self.metrics.backpressure_events.load(Ordering::Relaxed),
468 }
469 }
470
471 pub async fn available_capacity(&self, level: RoutingLevel) -> CapacityInfo {
473 let (msg_limiter, byte_limiter) = match level {
474 RoutingLevel::Cell => (&self.cell_message_limiter, &self.cell_byte_limiter),
475 RoutingLevel::Zone => (&self.zone_message_limiter, &self.zone_byte_limiter),
476 };
477
478 CapacityInfo {
479 available_messages: msg_limiter.available_tokens().await as usize,
480 available_bytes: byte_limiter.available_tokens().await as usize,
481 }
482 }
483}
484
485pub struct Permit {
489 _private: (),
490}
491
492#[derive(Debug, Clone, Copy)]
494pub struct FlowMetrics {
495 pub cell_messages_sent: u64,
497 pub cell_bytes_sent: u64,
499 pub zone_messages_sent: u64,
501 pub zone_bytes_sent: u64,
503 pub messages_dropped: u64,
505 pub backpressure_events: u64,
507}
508
509#[derive(Debug, Clone, Copy)]
511pub struct CapacityInfo {
512 pub available_messages: usize,
514 pub available_bytes: usize,
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 #[tokio::test]
523 async fn test_token_bucket_creation() {
524 let bucket = TokenBucket::new(100.0, 10.0);
525 let tokens = bucket.available_tokens().await;
526 assert_eq!(tokens, 100.0);
527 }
528
529 #[tokio::test]
530 async fn test_token_bucket_consume() {
531 let bucket = TokenBucket::new(100.0, 10.0);
532
533 assert!(bucket.try_consume(10.0).await);
535 let tokens = bucket.available_tokens().await;
536 assert!((tokens - 90.0).abs() < 0.01);
537
538 assert!(bucket.try_consume(50.0).await);
540 let tokens = bucket.available_tokens().await;
541 assert!((tokens - 40.0).abs() < 0.01);
542 }
543
544 #[tokio::test]
545 async fn test_token_bucket_overflow() {
546 let bucket = TokenBucket::new(100.0, 10.0);
547
548 assert!(!bucket.try_consume(150.0).await);
550
551 let tokens = bucket.available_tokens().await;
553 assert_eq!(tokens, 100.0);
554 }
555
556 #[tokio::test]
557 async fn test_token_bucket_refill() {
558 let bucket = TokenBucket::new(100.0, 100.0); assert!(bucket.try_consume(100.0).await);
562 let tokens_after_consume = bucket.available_tokens().await;
563 assert!(tokens_after_consume < 1.0); tokio::time::sleep(Duration::from_millis(500)).await;
567
568 let tokens = bucket.available_tokens().await;
570 assert!((40.0..=60.0).contains(&tokens)); }
572
573 #[tokio::test]
574 async fn test_flow_controller_creation() {
575 let controller = FlowController::new(
576 BandwidthLimit::cell_default(),
577 BandwidthLimit::zone_default(),
578 MessageDropPolicy::DropLowPriority,
579 );
580
581 assert!(!controller.has_backpressure().await);
582 let metrics = controller.get_metrics();
583 assert_eq!(metrics.cell_messages_sent, 0);
584 assert_eq!(metrics.zone_messages_sent, 0);
585 }
586
587 #[tokio::test]
588 async fn test_acquire_permit() {
589 let controller = FlowController::new(
590 BandwidthLimit::new(10, 1000),
591 BandwidthLimit::new(5, 500),
592 MessageDropPolicy::DropLowPriority,
593 );
594
595 let _permit = controller
597 .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Normal)
598 .await
599 .unwrap();
600
601 let metrics = controller.get_metrics();
602 assert_eq!(metrics.cell_messages_sent, 1);
603 assert_eq!(metrics.cell_bytes_sent, 100);
604 }
605
606 #[tokio::test]
607 async fn test_priority_preferential_treatment() {
608 let controller = FlowController::new(
609 BandwidthLimit::new(10, 1000),
610 BandwidthLimit::new(5, 500),
611 MessageDropPolicy::DropLowPriority,
612 );
613
614 let _permit1 = controller
616 .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Critical)
617 .await
618 .unwrap();
619
620 let _permit2 = controller
622 .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Low)
623 .await
624 .unwrap();
625
626 let metrics = controller.get_metrics();
627 assert_eq!(metrics.cell_messages_sent, 2);
628 }
629
630 #[tokio::test]
631 async fn test_message_drop_policy() {
632 let controller = FlowController::new(
633 BandwidthLimit::cell_default(),
634 BandwidthLimit::zone_default(),
635 MessageDropPolicy::DropLowPriority,
636 );
637
638 assert!(controller.should_drop(MessagePriority::Low));
640 assert!(controller.should_drop(MessagePriority::Normal));
641
642 assert!(!controller.should_drop(MessagePriority::High));
644 assert!(!controller.should_drop(MessagePriority::Critical));
645 }
646
647 #[tokio::test]
648 async fn test_never_drop_policy() {
649 let controller = FlowController::new(
650 BandwidthLimit::cell_default(),
651 BandwidthLimit::zone_default(),
652 MessageDropPolicy::NeverDrop,
653 );
654
655 assert!(!controller.should_drop(MessagePriority::Low));
657 assert!(!controller.should_drop(MessagePriority::Normal));
658 assert!(!controller.should_drop(MessagePriority::High));
659 assert!(!controller.should_drop(MessagePriority::Critical));
660 }
661
662 #[tokio::test]
663 async fn test_capacity_info() {
664 let controller = FlowController::new(
665 BandwidthLimit::new(100, 10000),
666 BandwidthLimit::new(50, 5000),
667 MessageDropPolicy::DropLowPriority,
668 );
669
670 let capacity = controller.available_capacity(RoutingLevel::Cell).await;
671 assert_eq!(capacity.available_messages, 100);
672 assert_eq!(capacity.available_bytes, 10000);
673
674 let capacity = controller.available_capacity(RoutingLevel::Zone).await;
675 assert_eq!(capacity.available_messages, 50);
676 assert_eq!(capacity.available_bytes, 5000);
677 }
678
679 #[tokio::test]
680 async fn test_record_drop() {
681 let controller = FlowController::new(
682 BandwidthLimit::cell_default(),
683 BandwidthLimit::zone_default(),
684 MessageDropPolicy::DropLowPriority,
685 );
686
687 controller.record_drop();
688 controller.record_drop();
689
690 let metrics = controller.get_metrics();
691 assert_eq!(metrics.messages_dropped, 2);
692 }
693
694 #[tokio::test]
695 async fn test_backpressure_activation() {
696 let controller = FlowController::new(
697 BandwidthLimit::new(1, 100), BandwidthLimit::new(1, 100),
699 MessageDropPolicy::DropLowPriority,
700 );
701
702 let _p1 = controller
704 .acquire_permit(RoutingLevel::Cell, 50, MessagePriority::Normal)
705 .await
706 .unwrap();
707
708 tokio::spawn({
710 let controller = FlowController::new(
711 BandwidthLimit::new(1, 100),
712 BandwidthLimit::new(1, 100),
713 MessageDropPolicy::DropLowPriority,
714 );
715 async move {
716 let _ = controller
717 .acquire_permit(RoutingLevel::Cell, 50, MessagePriority::Normal)
718 .await;
719 }
720 });
721
722 tokio::time::sleep(Duration::from_millis(50)).await;
724 }
725
726 #[tokio::test]
727 async fn test_acquire_permit_qos() {
728 let controller = FlowController::new(
729 BandwidthLimit::new(10, 1000),
730 BandwidthLimit::new(5, 500),
731 MessageDropPolicy::DropLowPriority,
732 );
733
734 let _permit = controller
736 .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Critical)
737 .await
738 .unwrap();
739
740 let metrics = controller.get_metrics();
741 assert_eq!(metrics.cell_messages_sent, 1);
742 assert_eq!(metrics.cell_bytes_sent, 100);
743 }
744
745 #[tokio::test]
746 async fn test_qos_class_preferential_treatment() {
747 let controller = FlowController::new(
748 BandwidthLimit::new(10, 1000),
749 BandwidthLimit::new(5, 500),
750 MessageDropPolicy::DropLowPriority,
751 );
752
753 let _p1 = controller
755 .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Critical)
756 .await
757 .unwrap();
758
759 let _p2 = controller
761 .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Bulk)
762 .await
763 .unwrap();
764
765 let metrics = controller.get_metrics();
766 assert_eq!(metrics.cell_messages_sent, 2);
767 }
768
769 #[test]
770 fn test_should_drop_qos() {
771 let controller = FlowController::new(
772 BandwidthLimit::cell_default(),
773 BandwidthLimit::zone_default(),
774 MessageDropPolicy::DropLowPriority,
775 );
776
777 assert!(!controller.should_drop_qos(QoSClass::Critical));
779 assert!(!controller.should_drop_qos(QoSClass::High));
780
781 assert!(controller.should_drop_qos(QoSClass::Normal));
783 assert!(controller.should_drop_qos(QoSClass::Low));
784 assert!(controller.should_drop_qos(QoSClass::Bulk));
785 }
786
787 #[test]
788 fn test_should_drop_qos_never_drop_policy() {
789 let controller = FlowController::new(
790 BandwidthLimit::cell_default(),
791 BandwidthLimit::zone_default(),
792 MessageDropPolicy::NeverDrop,
793 );
794
795 assert!(!controller.should_drop_qos(QoSClass::Critical));
797 assert!(!controller.should_drop_qos(QoSClass::Bulk));
798 }
799}