mqtt5_protocol/session/
flow_control.rs1use crate::time::{Duration, Instant};
2
3#[derive(Debug, Clone)]
4pub struct FlowControlConfig {
5 pub enable_backpressure: bool,
6 pub backpressure_timeout: Option<Duration>,
7 pub max_pending_queue_size: usize,
8 pub in_flight_timeout: Duration,
9}
10
11impl Default for FlowControlConfig {
12 fn default() -> Self {
13 Self {
14 enable_backpressure: true,
15 backpressure_timeout: Some(Duration::from_secs(30)),
16 max_pending_queue_size: 1000,
17 in_flight_timeout: Duration::from_secs(60),
18 }
19 }
20}
21
22#[derive(Debug, Clone)]
23pub struct FlowControlStats {
24 pub receive_maximum: u16,
25 pub in_flight_count: usize,
26 pub available_quota: usize,
27 pub pending_requests: usize,
28 pub oldest_in_flight: Option<Instant>,
29}
30
31#[cfg(test)]
32mod tests {
33 use super::*;
34
35 #[test]
36 fn test_flow_control_config_default() {
37 let config = FlowControlConfig::default();
38 assert!(config.enable_backpressure);
39 assert_eq!(config.backpressure_timeout, Some(Duration::from_secs(30)));
40 assert_eq!(config.max_pending_queue_size, 1000);
41 assert_eq!(config.in_flight_timeout, Duration::from_secs(60));
42 }
43
44 #[test]
45 fn test_flow_control_stats() {
46 let stats = FlowControlStats {
47 receive_maximum: 65535,
48 in_flight_count: 10,
49 available_quota: 100,
50 pending_requests: 5,
51 oldest_in_flight: Some(Instant::now()),
52 };
53
54 assert_eq!(stats.receive_maximum, 65535);
55 assert_eq!(stats.in_flight_count, 10);
56 assert_eq!(stats.available_quota, 100);
57 assert_eq!(stats.pending_requests, 5);
58 assert!(stats.oldest_in_flight.is_some());
59 }
60}