mqtt5_protocol/session/
flow_control.rs

1use crate::time::{Duration, Instant};
2
3#[derive(Debug, Clone)]
4pub struct FlowControlConfig {
5    pub enable_backpressure: bool,
6    pub backpressure_timeout: Option<Duration>,
7    pub max_pending_queue_size: usize,
8    pub in_flight_timeout: Duration,
9}
10
11impl Default for FlowControlConfig {
12    fn default() -> Self {
13        Self {
14            enable_backpressure: true,
15            backpressure_timeout: Some(Duration::from_secs(30)),
16            max_pending_queue_size: 1000,
17            in_flight_timeout: Duration::from_secs(60),
18        }
19    }
20}
21
22#[derive(Debug, Clone)]
23pub struct FlowControlStats {
24    pub receive_maximum: u16,
25    pub in_flight_count: usize,
26    pub available_quota: usize,
27    pub pending_requests: usize,
28    pub oldest_in_flight: Option<Instant>,
29}
30
31#[cfg(test)]
32mod tests {
33    use super::*;
34
35    #[test]
36    fn test_flow_control_config_default() {
37        let config = FlowControlConfig::default();
38        assert!(config.enable_backpressure);
39        assert_eq!(config.backpressure_timeout, Some(Duration::from_secs(30)));
40        assert_eq!(config.max_pending_queue_size, 1000);
41        assert_eq!(config.in_flight_timeout, Duration::from_secs(60));
42    }
43
44    #[test]
45    fn test_flow_control_stats() {
46        let stats = FlowControlStats {
47            receive_maximum: 65535,
48            in_flight_count: 10,
49            available_quota: 100,
50            pending_requests: 5,
51            oldest_in_flight: Some(Instant::now()),
52        };
53
54        assert_eq!(stats.receive_maximum, 65535);
55        assert_eq!(stats.in_flight_count, 10);
56        assert_eq!(stats.available_quota, 100);
57        assert_eq!(stats.pending_requests, 5);
58        assert!(stats.oldest_in_flight.is_some());
59    }
60}