use mqtt5::error::MqttError;
use mqtt5::session::flow_control::{FlowControlConfig, FlowControlManager};
use mqtt5::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn test_flow_control_basic_functionality() {
let fc = FlowControlManager::new(2);
assert!(fc.can_send());
assert_eq!(fc.available_permits(), 2);
fc.acquire_send_quota(1).await.unwrap();
assert_eq!(fc.in_flight_count().await, 1);
assert_eq!(fc.available_permits(), 1);
fc.acquire_send_quota(2).await.unwrap();
assert_eq!(fc.in_flight_count().await, 2);
assert_eq!(fc.available_permits(), 0);
assert!(!fc.can_send());
let result = fc.try_acquire_send_quota(3).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
MqttError::FlowControlExceeded
));
}
#[tokio::test]
async fn test_flow_control_acknowledgment() {
let fc = FlowControlManager::new(2);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
assert!(!fc.can_send());
fc.acknowledge(1).await.unwrap();
assert_eq!(fc.in_flight_count().await, 1);
assert_eq!(fc.available_permits(), 1);
assert!(fc.can_send());
fc.acquire_send_quota(3).await.unwrap();
assert!(!fc.can_send());
let result = fc.acknowledge(999).await;
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
MqttError::PacketIdNotFound(999)
));
}
#[tokio::test]
async fn test_flow_control_unlimited() {
let fc = FlowControlManager::new(0);
assert!(fc.can_send());
for i in 1..=1000 {
fc.acquire_send_quota(i).await.unwrap();
}
assert!(fc.can_send());
assert_eq!(fc.in_flight_count().await, 0); }
#[tokio::test]
async fn test_flow_control_backpressure() {
let config = FlowControlConfig {
enable_backpressure: true,
backpressure_timeout: Some(Duration::from_millis(100)),
max_pending_queue_size: 10,
in_flight_timeout: Duration::from_secs(60),
};
let fc = FlowControlManager::with_config(1, config);
fc.acquire_send_quota(1).await.unwrap();
assert!(!fc.can_send());
let start = std::time::Instant::now();
let result = fc.acquire_send_quota(2).await;
let elapsed = start.elapsed();
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
MqttError::FlowControlExceeded
));
assert!(elapsed >= Duration::from_millis(100));
assert!(elapsed < Duration::from_millis(200)); }
#[tokio::test]
async fn test_flow_control_backpressure_release() {
let config = FlowControlConfig {
enable_backpressure: true,
backpressure_timeout: Some(Duration::from_millis(500)),
max_pending_queue_size: 10,
in_flight_timeout: Duration::from_secs(60),
};
let fc = FlowControlManager::with_config(1, config);
fc.acquire_send_quota(1).await.unwrap();
let fc_clone = fc.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
fc_clone.acknowledge(1).await.unwrap();
});
let start = std::time::Instant::now();
fc.acquire_send_quota(2).await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(100));
assert!(elapsed < Duration::from_millis(200));
}
#[tokio::test]
async fn test_flow_control_receive_maximum_adjustment() {
let mut fc = FlowControlManager::new(2);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
assert!(!fc.can_send());
fc.set_receive_maximum(3).await;
assert!(fc.can_send());
assert_eq!(fc.available_permits(), 1);
fc.set_receive_maximum(1).await;
assert!(!fc.can_send());
assert_eq!(fc.available_permits(), 0);
fc.set_receive_maximum(0).await;
assert!(fc.can_send());
}
#[tokio::test]
async fn test_flow_control_expired_cleanup() {
let config = FlowControlConfig {
enable_backpressure: true,
backpressure_timeout: Some(Duration::from_secs(30)),
max_pending_queue_size: 1000,
in_flight_timeout: Duration::from_millis(100),
};
let fc = FlowControlManager::with_config(3, config);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
tokio::time::sleep(Duration::from_millis(150)).await;
fc.acquire_send_quota(3).await.unwrap();
let expired = fc.cleanup_expired().await;
assert_eq!(expired.len(), 2);
assert!(expired.contains(&1));
assert!(expired.contains(&2));
assert!(!expired.contains(&3));
assert_eq!(fc.in_flight_count().await, 1);
assert_eq!(fc.available_permits(), 2);
}
#[tokio::test]
async fn test_flow_control_statistics() {
let fc = FlowControlManager::new(5);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
fc.acquire_send_quota(3).await.unwrap();
let stats = fc.get_stats().await;
assert_eq!(stats.receive_maximum, 5);
assert_eq!(stats.in_flight_count, 3);
assert_eq!(stats.available_quota, 2);
assert_eq!(stats.pending_requests, 0);
assert!(stats.oldest_in_flight.is_some());
}
#[tokio::test]
async fn test_flow_control_clear() {
let fc = FlowControlManager::new(3);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
assert_eq!(fc.in_flight_count().await, 2);
fc.clear().await;
assert_eq!(fc.in_flight_count().await, 0);
assert_eq!(fc.available_permits(), 1); }
#[tokio::test]
async fn test_flow_control_concurrent_access() {
let fc = FlowControlManager::new(10);
let fc = std::sync::Arc::new(fc);
let mut handles = Vec::new();
for i in 1..=20 {
let fc_clone = fc.clone();
let handle = tokio::spawn(async move { fc_clone.try_acquire_send_quota(i).await });
handles.push(handle);
}
let mut successes = 0;
let mut failures = 0;
for handle in handles {
match handle.await.unwrap() {
Ok(()) => successes += 1,
Err(_) => failures += 1,
}
}
assert_eq!(successes, 10);
assert_eq!(failures, 10);
assert_eq!(fc.in_flight_count().await, 10);
assert_eq!(fc.available_permits(), 0);
}
#[tokio::test]
async fn test_flow_control_legacy_register_send() {
let fc = FlowControlManager::new(2);
fc.register_send(1).await.unwrap();
fc.register_send(2).await.unwrap();
assert!(!fc.can_send());
assert_eq!(fc.in_flight_count().await, 2);
let result = fc.register_send(3).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_flow_control_config_variations() {
let config = FlowControlConfig {
enable_backpressure: false,
backpressure_timeout: None,
max_pending_queue_size: 100,
in_flight_timeout: Duration::from_secs(30),
};
let fc = FlowControlManager::with_config(1, config);
fc.acquire_send_quota(1).await.unwrap();
let result = timeout(Duration::from_millis(100), fc.acquire_send_quota(2)).await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_flow_control_edge_cases() {
let fc = FlowControlManager::new(1);
fc.acquire_send_quota(1).await.unwrap();
assert!(!fc.can_send());
fc.acknowledge(1).await.unwrap();
assert!(fc.can_send());
fc.acquire_send_quota(2).await.unwrap();
assert!(!fc.can_send());
let fc = FlowControlManager::new(1);
fc.acquire_send_quota(u16::MAX).await.unwrap();
fc.acknowledge(u16::MAX).await.unwrap();
assert!(fc.can_send());
}
#[tokio::test]
async fn test_flow_control_get_expired() {
let fc = FlowControlManager::new(5);
fc.acquire_send_quota(1).await.unwrap();
fc.acquire_send_quota(2).await.unwrap();
fc.acquire_send_quota(3).await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
let expired = fc.get_expired(Duration::from_millis(100)).await;
assert_eq!(expired.len(), 3);
assert!(expired.contains(&1));
assert!(expired.contains(&2));
assert!(expired.contains(&3));
let expired = fc.get_expired(Duration::from_secs(10)).await;
assert_eq!(expired.len(), 0);
}