mod common;
use mqtt5::broker::config::{StorageBackend, StorageConfig};
use mqtt5::broker::{BrokerConfig, MqttBroker};
use mqtt5::client::MqttClient;
use mqtt5::time::Duration;
use tokio::time::sleep;
#[tokio::test]
async fn test_real_client_connection_limits() {
let storage_config = StorageConfig {
backend: StorageBackend::Memory,
enable_persistence: true,
..Default::default()
};
let config = BrokerConfig::default()
.with_bind_address("127.0.0.1:0".parse::<std::net::SocketAddr>().unwrap())
.with_max_clients(2)
.with_storage(storage_config);
let mut broker = MqttBroker::with_config(config).await.unwrap();
let resource_monitor = broker.resource_monitor();
let broker_addr = broker.local_addr().expect("Failed to get broker address");
let broker_handle = tokio::spawn(async move {
let _ = broker.run().await;
});
sleep(Duration::from_millis(100)).await;
let mut clients = Vec::new();
for i in 0..2 {
let client_id = format!("client-{i}");
let client = MqttClient::new(&client_id);
match client.connect(&broker_addr.to_string()).await {
Ok(()) => {
println!("Client {client_id} connected successfully");
clients.push(client);
}
Err(e) => {
panic!("Client {client_id} should have connected but failed: {e}");
}
}
sleep(Duration::from_millis(50)).await;
}
let stats = resource_monitor.get_stats().await;
assert_eq!(stats.current_connections, 2);
assert!((stats.connection_utilization() - 100.0).abs() < f64::EPSILON);
let third_client = MqttClient::new("client-3");
let result = third_client.connect(&broker_addr.to_string()).await;
assert!(
result.is_err(),
"Third client should not be able to connect"
);
let final_stats = resource_monitor.get_stats().await;
assert_eq!(final_stats.current_connections, 2);
for client in clients {
let _ = client.disconnect().await;
}
broker_handle.abort();
}
#[tokio::test]
async fn test_real_client_rate_limiting() {
let storage_config = StorageConfig {
backend: StorageBackend::Memory,
enable_persistence: true,
..Default::default()
};
let config = BrokerConfig::default()
.with_bind_address("127.0.0.1:0".parse::<std::net::SocketAddr>().unwrap())
.with_max_clients(5)
.with_storage(storage_config);
let mut broker = MqttBroker::with_config(config).await.unwrap();
let resource_monitor = broker.resource_monitor();
let broker_addr = broker.local_addr().expect("Failed to get broker address");
let broker_handle = tokio::spawn(async move {
let _ = broker.run().await;
});
sleep(Duration::from_millis(100)).await;
let client = MqttClient::new("rate-test-client");
client.connect(&broker_addr.to_string()).await.unwrap();
let initial_stats = resource_monitor.get_stats().await;
assert_eq!(initial_stats.current_connections, 1);
let mut success_count = 0;
let mut error_count = 0;
for i in 0..20 {
match client
.publish(&format!("test/message/{i}"), "test payload")
.await
{
Ok(_) => {
success_count += 1;
}
Err(_) => {
error_count += 1;
}
}
sleep(Duration::from_millis(1)).await;
}
println!("Published {success_count} messages successfully, {error_count} failed");
let final_stats = resource_monitor.get_stats().await;
assert!(
final_stats.total_messages > 0,
"Some messages should have been tracked"
);
assert!(
final_stats.total_bytes > 0,
"Some bytes should have been tracked"
);
assert!(
success_count > 10,
"Most messages should succeed with default limits"
);
let _ = client.disconnect().await;
broker_handle.abort();
}
#[tokio::test]
async fn test_client_registration_unregistration() {
let storage_config = StorageConfig {
backend: StorageBackend::Memory,
enable_persistence: true,
..Default::default()
};
let config = BrokerConfig::default()
.with_bind_address("127.0.0.1:0".parse::<std::net::SocketAddr>().unwrap())
.with_max_clients(10)
.with_storage(storage_config);
let mut broker = MqttBroker::with_config(config).await.unwrap();
let resource_monitor = broker.resource_monitor();
let broker_addr = broker.local_addr().expect("Failed to get broker address");
let broker_handle = tokio::spawn(async move {
let _ = broker.run().await;
});
sleep(Duration::from_millis(100)).await;
let initial_stats = resource_monitor.get_stats().await;
assert_eq!(initial_stats.current_connections, 0);
let client = MqttClient::new("reg-test-client");
client.connect(&broker_addr.to_string()).await.unwrap();
sleep(Duration::from_millis(50)).await;
let connected_stats = resource_monitor.get_stats().await;
assert_eq!(connected_stats.current_connections, 1);
assert_eq!(connected_stats.unique_ips, 1);
client.disconnect().await.unwrap();
sleep(Duration::from_millis(50)).await;
let disconnected_stats = resource_monitor.get_stats().await;
assert_eq!(disconnected_stats.current_connections, 0);
broker_handle.abort();
}