Skip to main content

peat_protocol/hierarchy/
flow_control.rs

1//! Message flow control for hierarchical routing
2//!
3//! This module implements bandwidth limiting and backpressure mechanisms
4//! to prevent congestion in the hierarchical messaging system.
5//!
6//! # Architecture
7//!
8//! Flow control operates at two levels:
9//! - **Cell Level**: Controls message flow within cells
10//! - **Zone Level**: Controls message flow at zone coordinator level
11//!
12//! ## Token Bucket Algorithm
13//!
14//! Rate limiting uses a token bucket algorithm:
15//! - Tokens represent message/bandwidth capacity
16//! - Tokens refill at a constant rate
17//! - Messages consume tokens based on size and priority
18//! - When bucket is empty, backpressure is applied
19//!
20//! ## Backpressure Strategy
21//!
22//! When congestion is detected:
23//! 1. **Drop low-priority messages** (based on policy)
24//! 2. **Slow down message generation** (apply backpressure)
25//! 3. **Signal upstream** (propagate backpressure up hierarchy)
26//!
27//! # Example
28//!
29//! ```
30//! use peat_protocol::hierarchy::flow_control::{FlowController, BandwidthLimit, MessageDropPolicy, RoutingLevel};
31//! use peat_protocol::cell::messaging::{MessagePriority, CellMessage};
32//!
33//! # async fn example() -> peat_protocol::Result<()> {
34//! let cell_limit = BandwidthLimit {
35//!     messages_per_sec: 100,
36//!     bytes_per_sec: 10_000,
37//! };
38//!
39//! let zone_limit = BandwidthLimit {
40//!     messages_per_sec: 50,
41//!     bytes_per_sec: 5_000,
42//! };
43//!
44//! let controller = FlowController::new(
45//!     cell_limit,
46//!     zone_limit,
47//!     MessageDropPolicy::DropLowPriority,
48//! );
49//!
50//! // Acquire permit before sending
51//! let permit = controller.acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Normal).await?;
52//!
53//! // Check if backpressure is active
54//! if controller.has_backpressure().await {
55//!     // Slow down message generation
56//! }
57//! # Ok(())
58//! # }
59//! ```
60
61use crate::cell::messaging::MessagePriority;
62use crate::qos::QoSClass;
63use crate::Result;
64use std::sync::atomic::{AtomicU64, Ordering};
65use std::sync::Arc;
66use std::time::{Duration, Instant};
67use tokio::sync::Mutex;
68use tracing::{debug, instrument, warn};
69
70/// Bandwidth limits for a routing level
71#[derive(Debug, Clone, Copy)]
72pub struct BandwidthLimit {
73    /// Maximum messages per second
74    pub messages_per_sec: usize,
75    /// Maximum bytes per second
76    pub bytes_per_sec: usize,
77}
78
79impl BandwidthLimit {
80    /// Create a new bandwidth limit
81    pub fn new(messages_per_sec: usize, bytes_per_sec: usize) -> Self {
82        Self {
83            messages_per_sec,
84            bytes_per_sec,
85        }
86    }
87
88    /// Default cell-level limits
89    pub fn cell_default() -> Self {
90        Self {
91            messages_per_sec: 100,
92            bytes_per_sec: 100_000, // 100 KB/s
93        }
94    }
95
96    /// Default zone-level limits (more restrictive)
97    pub fn zone_default() -> Self {
98        Self {
99            messages_per_sec: 50,
100            bytes_per_sec: 50_000, // 50 KB/s
101        }
102    }
103}
104
105/// Routing level for flow control
106#[derive(Debug, Clone, Copy, PartialEq, Eq)]
107pub enum RoutingLevel {
108    /// Cell-level routing (intra-cell)
109    Cell,
110    /// Zone-level routing (cell ↔ zone)
111    Zone,
112}
113
114/// Message drop policy when under backpressure
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116pub enum MessageDropPolicy {
117    /// Drop low-priority messages first (Low, then Normal)
118    DropLowPriority,
119    /// Drop oldest messages first (FIFO)
120    DropOldest,
121    /// Never drop messages (apply max backpressure instead)
122    NeverDrop,
123}
124
125/// Backpressure state
126#[derive(Debug)]
127struct BackpressureState {
128    /// Is backpressure currently active?
129    active: bool,
130    /// When backpressure started
131    started_at: Option<Instant>,
132    /// Number of messages dropped due to backpressure
133    #[allow(dead_code)] // Reserved for future use
134    dropped_count: u64,
135}
136
137impl BackpressureState {
138    fn new() -> Self {
139        Self {
140            active: false,
141            started_at: None,
142            dropped_count: 0,
143        }
144    }
145
146    fn activate(&mut self) {
147        if !self.active {
148            self.active = true;
149            self.started_at = Some(Instant::now());
150            debug!("Backpressure activated");
151        }
152    }
153
154    fn deactivate(&mut self) {
155        if self.active {
156            self.active = false;
157            let duration = self
158                .started_at
159                .map(|s| s.elapsed().as_millis())
160                .unwrap_or(0);
161            debug!("Backpressure released after {}ms", duration);
162            self.started_at = None;
163        }
164    }
165}
166
167/// Token bucket rate limiter
168///
169/// Implements the token bucket algorithm for rate limiting.
170/// Tokens represent message/bandwidth capacity.
171struct TokenBucket {
172    /// Current token count
173    tokens: Arc<Mutex<f64>>,
174    /// Maximum bucket capacity
175    capacity: f64,
176    /// Token refill rate (tokens per second)
177    refill_rate: f64,
178    /// Last refill time
179    last_refill: Arc<Mutex<Instant>>,
180}
181
182impl TokenBucket {
183    /// Create a new token bucket
184    fn new(capacity: f64, refill_rate: f64) -> Self {
185        Self {
186            tokens: Arc::new(Mutex::new(capacity)),
187            capacity,
188            refill_rate,
189            last_refill: Arc::new(Mutex::new(Instant::now())),
190        }
191    }
192
193    /// Try to consume tokens (non-blocking)
194    async fn try_consume(&self, amount: f64) -> bool {
195        // Refill tokens based on elapsed time
196        self.refill().await;
197
198        let mut tokens = self.tokens.lock().await;
199        if *tokens >= amount {
200            *tokens -= amount;
201            true
202        } else {
203            false
204        }
205    }
206
207    /// Wait for tokens to be available (blocking)
208    async fn consume(&self, amount: f64) -> Result<()> {
209        loop {
210            if self.try_consume(amount).await {
211                return Ok(());
212            }
213            // Wait a bit before retrying
214            tokio::time::sleep(Duration::from_millis(10)).await;
215        }
216    }
217
218    /// Refill tokens based on elapsed time
219    async fn refill(&self) {
220        let mut last_refill = self.last_refill.lock().await;
221        let elapsed = last_refill.elapsed().as_secs_f64();
222
223        if elapsed > 0.0 {
224            let mut tokens = self.tokens.lock().await;
225            let new_tokens = elapsed * self.refill_rate;
226            *tokens = (*tokens + new_tokens).min(self.capacity);
227            *last_refill = Instant::now();
228        }
229    }
230
231    /// Get current token count
232    async fn available_tokens(&self) -> f64 {
233        self.refill().await;
234        *self.tokens.lock().await
235    }
236}
237
238/// Flow control for hierarchical message routing
239///
240/// Provides bandwidth limiting and backpressure for cell and zone level routing.
241pub struct FlowController {
242    /// Cell-level rate limiter (message count)
243    cell_message_limiter: Arc<TokenBucket>,
244    /// Cell-level rate limiter (byte count)
245    cell_byte_limiter: Arc<TokenBucket>,
246    /// Zone-level rate limiter (message count)
247    zone_message_limiter: Arc<TokenBucket>,
248    /// Zone-level rate limiter (byte count)
249    zone_byte_limiter: Arc<TokenBucket>,
250    /// Backpressure state
251    backpressure: Arc<Mutex<BackpressureState>>,
252    /// Message dropping policy
253    drop_policy: MessageDropPolicy,
254    /// Flow control metrics
255    metrics: Arc<FlowMetricsInner>,
256}
257
258/// Internal metrics tracking
259struct FlowMetricsInner {
260    cell_messages_sent: AtomicU64,
261    cell_bytes_sent: AtomicU64,
262    zone_messages_sent: AtomicU64,
263    zone_bytes_sent: AtomicU64,
264    messages_dropped: AtomicU64,
265    backpressure_events: AtomicU64,
266}
267
268impl FlowController {
269    /// Create a new flow controller
270    pub fn new(
271        cell_limit: BandwidthLimit,
272        zone_limit: BandwidthLimit,
273        drop_policy: MessageDropPolicy,
274    ) -> Self {
275        Self {
276            cell_message_limiter: Arc::new(TokenBucket::new(
277                cell_limit.messages_per_sec as f64,
278                cell_limit.messages_per_sec as f64,
279            )),
280            cell_byte_limiter: Arc::new(TokenBucket::new(
281                cell_limit.bytes_per_sec as f64,
282                cell_limit.bytes_per_sec as f64,
283            )),
284            zone_message_limiter: Arc::new(TokenBucket::new(
285                zone_limit.messages_per_sec as f64,
286                zone_limit.messages_per_sec as f64,
287            )),
288            zone_byte_limiter: Arc::new(TokenBucket::new(
289                zone_limit.bytes_per_sec as f64,
290                zone_limit.bytes_per_sec as f64,
291            )),
292            backpressure: Arc::new(Mutex::new(BackpressureState::new())),
293            drop_policy,
294            metrics: Arc::new(FlowMetricsInner {
295                cell_messages_sent: AtomicU64::new(0),
296                cell_bytes_sent: AtomicU64::new(0),
297                zone_messages_sent: AtomicU64::new(0),
298                zone_bytes_sent: AtomicU64::new(0),
299                messages_dropped: AtomicU64::new(0),
300                backpressure_events: AtomicU64::new(0),
301            }),
302        }
303    }
304
305    /// Acquire permit to send a message
306    ///
307    /// This will block if rate limits are exceeded, applying backpressure.
308    #[instrument(skip(self))]
309    pub async fn acquire_permit(
310        &self,
311        level: RoutingLevel,
312        message_size: usize,
313        priority: MessagePriority,
314    ) -> Result<Permit> {
315        let (msg_limiter, byte_limiter) = match level {
316            RoutingLevel::Cell => (&self.cell_message_limiter, &self.cell_byte_limiter),
317            RoutingLevel::Zone => (&self.zone_message_limiter, &self.zone_byte_limiter),
318        };
319
320        // Adjust token consumption based on priority
321        // Higher priority messages consume fewer tokens (get preferential treatment)
322        let priority_multiplier = match priority {
323            MessagePriority::Critical => 0.5, // Consumes half tokens
324            MessagePriority::High => 0.75,    // Consumes 75% tokens
325            MessagePriority::Normal => 1.0,   // Consumes normal tokens
326            MessagePriority::Low => 1.5,      // Consumes 50% more tokens
327        };
328
329        let message_tokens = 1.0 * priority_multiplier;
330        let byte_tokens = message_size as f64 * priority_multiplier;
331
332        // Try to acquire tokens (will block if needed)
333        let acquired = msg_limiter.try_consume(message_tokens).await
334            && byte_limiter.try_consume(byte_tokens).await;
335
336        if !acquired {
337            // Apply backpressure
338            self.apply_backpressure_internal(level).await;
339
340            // Wait for tokens
341            msg_limiter.consume(message_tokens).await?;
342            byte_limiter.consume(byte_tokens).await?;
343        }
344
345        // Update metrics
346        match level {
347            RoutingLevel::Cell => {
348                self.metrics
349                    .cell_messages_sent
350                    .fetch_add(1, Ordering::Relaxed);
351                self.metrics
352                    .cell_bytes_sent
353                    .fetch_add(message_size as u64, Ordering::Relaxed);
354            }
355            RoutingLevel::Zone => {
356                self.metrics
357                    .zone_messages_sent
358                    .fetch_add(1, Ordering::Relaxed);
359                self.metrics
360                    .zone_bytes_sent
361                    .fetch_add(message_size as u64, Ordering::Relaxed);
362            }
363        }
364
365        Ok(Permit { _private: () })
366    }
367
368    /// Check if backpressure is currently active
369    pub async fn has_backpressure(&self) -> bool {
370        let state = self.backpressure.lock().await;
371        state.active
372    }
373
374    /// Acquire permit using QoSClass instead of MessagePriority
375    ///
376    /// This method provides QoS-aware flow control by converting QoSClass
377    /// to MessagePriority for the underlying rate limiter. Higher QoS classes
378    /// (Critical, High) receive preferential treatment with lower token consumption.
379    ///
380    /// # Arguments
381    ///
382    /// * `level` - The routing level (Cell or Zone)
383    /// * `message_size` - Size of the message in bytes
384    /// * `qos_class` - The QoS classification for priority handling
385    ///
386    /// # Example
387    ///
388    /// ```rust,ignore
389    /// use peat_protocol::hierarchy::flow_control::{FlowController, RoutingLevel};
390    /// use peat_protocol::qos::QoSClass;
391    ///
392    /// let controller = FlowController::new(/* ... */);
393    /// let permit = controller.acquire_permit_qos(
394    ///     RoutingLevel::Cell,
395    ///     1024,
396    ///     QoSClass::Critical,
397    /// ).await?;
398    /// ```
399    #[instrument(skip(self))]
400    pub async fn acquire_permit_qos(
401        &self,
402        level: RoutingLevel,
403        message_size: usize,
404        qos_class: QoSClass,
405    ) -> Result<Permit> {
406        // Convert QoSClass to MessagePriority for the underlying implementation
407        let priority: MessagePriority = qos_class.into();
408        self.acquire_permit(level, message_size, priority).await
409    }
410
411    /// Determine if a message should be dropped based on QoS class
412    ///
413    /// Higher priority classes (Critical, High) are never dropped.
414    /// Lower priority classes may be dropped under backpressure.
415    pub fn should_drop_qos(&self, qos_class: QoSClass) -> bool {
416        let priority: MessagePriority = qos_class.into();
417        self.should_drop(priority)
418    }
419
420    /// Apply backpressure at a specific routing level
421    async fn apply_backpressure_internal(&self, level: RoutingLevel) {
422        let mut state = self.backpressure.lock().await;
423        state.activate();
424        self.metrics
425            .backpressure_events
426            .fetch_add(1, Ordering::Relaxed);
427
428        warn!("Backpressure applied at {:?} level", level);
429    }
430
431    /// Release backpressure
432    pub async fn release_backpressure(&self) {
433        let mut state = self.backpressure.lock().await;
434        state.deactivate();
435    }
436
437    /// Determine if a message should be dropped based on policy
438    pub fn should_drop(&self, priority: MessagePriority) -> bool {
439        match self.drop_policy {
440            MessageDropPolicy::DropLowPriority => {
441                // Drop Low and Normal priority messages when under pressure
442                matches!(priority, MessagePriority::Low | MessagePriority::Normal)
443            }
444            MessageDropPolicy::DropOldest => {
445                // This is handled externally (queue management)
446                false
447            }
448            MessageDropPolicy::NeverDrop => false,
449        }
450    }
451
452    /// Record a dropped message
453    pub fn record_drop(&self) {
454        self.metrics
455            .messages_dropped
456            .fetch_add(1, Ordering::Relaxed);
457    }
458
459    /// Get flow control metrics
460    pub fn get_metrics(&self) -> FlowMetrics {
461        FlowMetrics {
462            cell_messages_sent: self.metrics.cell_messages_sent.load(Ordering::Relaxed),
463            cell_bytes_sent: self.metrics.cell_bytes_sent.load(Ordering::Relaxed),
464            zone_messages_sent: self.metrics.zone_messages_sent.load(Ordering::Relaxed),
465            zone_bytes_sent: self.metrics.zone_bytes_sent.load(Ordering::Relaxed),
466            messages_dropped: self.metrics.messages_dropped.load(Ordering::Relaxed),
467            backpressure_events: self.metrics.backpressure_events.load(Ordering::Relaxed),
468        }
469    }
470
471    /// Get current available capacity
472    pub async fn available_capacity(&self, level: RoutingLevel) -> CapacityInfo {
473        let (msg_limiter, byte_limiter) = match level {
474            RoutingLevel::Cell => (&self.cell_message_limiter, &self.cell_byte_limiter),
475            RoutingLevel::Zone => (&self.zone_message_limiter, &self.zone_byte_limiter),
476        };
477
478        CapacityInfo {
479            available_messages: msg_limiter.available_tokens().await as usize,
480            available_bytes: byte_limiter.available_tokens().await as usize,
481        }
482    }
483}
484
485/// Permit to send a message (RAII guard)
486///
487/// Holding this permit means rate limit tokens have been consumed.
488pub struct Permit {
489    _private: (),
490}
491
492/// Flow control metrics
493#[derive(Debug, Clone, Copy)]
494pub struct FlowMetrics {
495    /// Messages sent at cell level
496    pub cell_messages_sent: u64,
497    /// Bytes sent at cell level
498    pub cell_bytes_sent: u64,
499    /// Messages sent at zone level
500    pub zone_messages_sent: u64,
501    /// Bytes sent at zone level
502    pub zone_bytes_sent: u64,
503    /// Total messages dropped
504    pub messages_dropped: u64,
505    /// Number of backpressure events
506    pub backpressure_events: u64,
507}
508
509/// Current capacity information
510#[derive(Debug, Clone, Copy)]
511pub struct CapacityInfo {
512    /// Available message tokens
513    pub available_messages: usize,
514    /// Available byte tokens
515    pub available_bytes: usize,
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521
522    #[tokio::test]
523    async fn test_token_bucket_creation() {
524        let bucket = TokenBucket::new(100.0, 10.0);
525        let tokens = bucket.available_tokens().await;
526        assert_eq!(tokens, 100.0);
527    }
528
529    #[tokio::test]
530    async fn test_token_bucket_consume() {
531        let bucket = TokenBucket::new(100.0, 10.0);
532
533        // Consume some tokens
534        assert!(bucket.try_consume(10.0).await);
535        let tokens = bucket.available_tokens().await;
536        assert!((tokens - 90.0).abs() < 0.01);
537
538        // Consume more tokens
539        assert!(bucket.try_consume(50.0).await);
540        let tokens = bucket.available_tokens().await;
541        assert!((tokens - 40.0).abs() < 0.01);
542    }
543
544    #[tokio::test]
545    async fn test_token_bucket_overflow() {
546        let bucket = TokenBucket::new(100.0, 10.0);
547
548        // Try to consume more than available
549        assert!(!bucket.try_consume(150.0).await);
550
551        // Available should be unchanged
552        let tokens = bucket.available_tokens().await;
553        assert_eq!(tokens, 100.0);
554    }
555
556    #[tokio::test]
557    async fn test_token_bucket_refill() {
558        let bucket = TokenBucket::new(100.0, 100.0); // 100 tokens/sec
559
560        // Consume all tokens
561        assert!(bucket.try_consume(100.0).await);
562        let tokens_after_consume = bucket.available_tokens().await;
563        assert!(tokens_after_consume < 1.0); // Should be near zero
564
565        // Wait for refill
566        tokio::time::sleep(Duration::from_millis(500)).await;
567
568        // Should have ~50 tokens (0.5 sec * 100 tokens/sec)
569        let tokens = bucket.available_tokens().await;
570        assert!((40.0..=60.0).contains(&tokens)); // Allow some timing variance
571    }
572
573    #[tokio::test]
574    async fn test_flow_controller_creation() {
575        let controller = FlowController::new(
576            BandwidthLimit::cell_default(),
577            BandwidthLimit::zone_default(),
578            MessageDropPolicy::DropLowPriority,
579        );
580
581        assert!(!controller.has_backpressure().await);
582        let metrics = controller.get_metrics();
583        assert_eq!(metrics.cell_messages_sent, 0);
584        assert_eq!(metrics.zone_messages_sent, 0);
585    }
586
587    #[tokio::test]
588    async fn test_acquire_permit() {
589        let controller = FlowController::new(
590            BandwidthLimit::new(10, 1000),
591            BandwidthLimit::new(5, 500),
592            MessageDropPolicy::DropLowPriority,
593        );
594
595        // Acquire a permit
596        let _permit = controller
597            .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Normal)
598            .await
599            .unwrap();
600
601        let metrics = controller.get_metrics();
602        assert_eq!(metrics.cell_messages_sent, 1);
603        assert_eq!(metrics.cell_bytes_sent, 100);
604    }
605
606    #[tokio::test]
607    async fn test_priority_preferential_treatment() {
608        let controller = FlowController::new(
609            BandwidthLimit::new(10, 1000),
610            BandwidthLimit::new(5, 500),
611            MessageDropPolicy::DropLowPriority,
612        );
613
614        // Critical priority consumes fewer tokens
615        let _permit1 = controller
616            .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Critical)
617            .await
618            .unwrap();
619
620        // Low priority consumes more tokens
621        let _permit2 = controller
622            .acquire_permit(RoutingLevel::Cell, 100, MessagePriority::Low)
623            .await
624            .unwrap();
625
626        let metrics = controller.get_metrics();
627        assert_eq!(metrics.cell_messages_sent, 2);
628    }
629
630    #[tokio::test]
631    async fn test_message_drop_policy() {
632        let controller = FlowController::new(
633            BandwidthLimit::cell_default(),
634            BandwidthLimit::zone_default(),
635            MessageDropPolicy::DropLowPriority,
636        );
637
638        // Low priority should be droppable
639        assert!(controller.should_drop(MessagePriority::Low));
640        assert!(controller.should_drop(MessagePriority::Normal));
641
642        // High priority should not be droppable
643        assert!(!controller.should_drop(MessagePriority::High));
644        assert!(!controller.should_drop(MessagePriority::Critical));
645    }
646
647    #[tokio::test]
648    async fn test_never_drop_policy() {
649        let controller = FlowController::new(
650            BandwidthLimit::cell_default(),
651            BandwidthLimit::zone_default(),
652            MessageDropPolicy::NeverDrop,
653        );
654
655        // Nothing should be droppable
656        assert!(!controller.should_drop(MessagePriority::Low));
657        assert!(!controller.should_drop(MessagePriority::Normal));
658        assert!(!controller.should_drop(MessagePriority::High));
659        assert!(!controller.should_drop(MessagePriority::Critical));
660    }
661
662    #[tokio::test]
663    async fn test_capacity_info() {
664        let controller = FlowController::new(
665            BandwidthLimit::new(100, 10000),
666            BandwidthLimit::new(50, 5000),
667            MessageDropPolicy::DropLowPriority,
668        );
669
670        let capacity = controller.available_capacity(RoutingLevel::Cell).await;
671        assert_eq!(capacity.available_messages, 100);
672        assert_eq!(capacity.available_bytes, 10000);
673
674        let capacity = controller.available_capacity(RoutingLevel::Zone).await;
675        assert_eq!(capacity.available_messages, 50);
676        assert_eq!(capacity.available_bytes, 5000);
677    }
678
679    #[tokio::test]
680    async fn test_record_drop() {
681        let controller = FlowController::new(
682            BandwidthLimit::cell_default(),
683            BandwidthLimit::zone_default(),
684            MessageDropPolicy::DropLowPriority,
685        );
686
687        controller.record_drop();
688        controller.record_drop();
689
690        let metrics = controller.get_metrics();
691        assert_eq!(metrics.messages_dropped, 2);
692    }
693
694    #[tokio::test]
695    async fn test_backpressure_activation() {
696        let controller = FlowController::new(
697            BandwidthLimit::new(1, 100), // Very low limits
698            BandwidthLimit::new(1, 100),
699            MessageDropPolicy::DropLowPriority,
700        );
701
702        // Consume all available tokens
703        let _p1 = controller
704            .acquire_permit(RoutingLevel::Cell, 50, MessagePriority::Normal)
705            .await
706            .unwrap();
707
708        // This should trigger backpressure (in background)
709        tokio::spawn({
710            let controller = FlowController::new(
711                BandwidthLimit::new(1, 100),
712                BandwidthLimit::new(1, 100),
713                MessageDropPolicy::DropLowPriority,
714            );
715            async move {
716                let _ = controller
717                    .acquire_permit(RoutingLevel::Cell, 50, MessagePriority::Normal)
718                    .await;
719            }
720        });
721
722        // Give it time to process
723        tokio::time::sleep(Duration::from_millis(50)).await;
724    }
725
726    #[tokio::test]
727    async fn test_acquire_permit_qos() {
728        let controller = FlowController::new(
729            BandwidthLimit::new(10, 1000),
730            BandwidthLimit::new(5, 500),
731            MessageDropPolicy::DropLowPriority,
732        );
733
734        // Acquire permit using QoSClass
735        let _permit = controller
736            .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Critical)
737            .await
738            .unwrap();
739
740        let metrics = controller.get_metrics();
741        assert_eq!(metrics.cell_messages_sent, 1);
742        assert_eq!(metrics.cell_bytes_sent, 100);
743    }
744
745    #[tokio::test]
746    async fn test_qos_class_preferential_treatment() {
747        let controller = FlowController::new(
748            BandwidthLimit::new(10, 1000),
749            BandwidthLimit::new(5, 500),
750            MessageDropPolicy::DropLowPriority,
751        );
752
753        // Critical QoS class should work
754        let _p1 = controller
755            .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Critical)
756            .await
757            .unwrap();
758
759        // Bulk QoS class should also work (consumes more tokens)
760        let _p2 = controller
761            .acquire_permit_qos(RoutingLevel::Cell, 100, QoSClass::Bulk)
762            .await
763            .unwrap();
764
765        let metrics = controller.get_metrics();
766        assert_eq!(metrics.cell_messages_sent, 2);
767    }
768
769    #[test]
770    fn test_should_drop_qos() {
771        let controller = FlowController::new(
772            BandwidthLimit::cell_default(),
773            BandwidthLimit::zone_default(),
774            MessageDropPolicy::DropLowPriority,
775        );
776
777        // Critical and High should never be dropped
778        assert!(!controller.should_drop_qos(QoSClass::Critical));
779        assert!(!controller.should_drop_qos(QoSClass::High));
780
781        // Normal and below may be dropped under policy
782        assert!(controller.should_drop_qos(QoSClass::Normal));
783        assert!(controller.should_drop_qos(QoSClass::Low));
784        assert!(controller.should_drop_qos(QoSClass::Bulk));
785    }
786
787    #[test]
788    fn test_should_drop_qos_never_drop_policy() {
789        let controller = FlowController::new(
790            BandwidthLimit::cell_default(),
791            BandwidthLimit::zone_default(),
792            MessageDropPolicy::NeverDrop,
793        );
794
795        // With NeverDrop policy, nothing should be dropped
796        assert!(!controller.should_drop_qos(QoSClass::Critical));
797        assert!(!controller.should_drop_qos(QoSClass::Bulk));
798    }
799}