use mqtt5::error::MqttError;
use mqtt5::time::Duration;
use mqtt5::{ConnectOptions, MqttClient, PublishOptions, QoS, SubscribeOptions};
use tokio::time::timeout;
#[tokio::test]
async fn test_connection_refused_wrong_port() {
let client = MqttClient::new("error-test-1");
let result = client.connect("mqtt://localhost:12345").await;
assert!(result.is_err());
match result.unwrap_err() {
MqttError::ConnectionError(_) => {} other => panic!("Expected ConnectionError, got: {other:?}"),
}
}
#[tokio::test]
async fn test_connection_timeout() {
let options = ConnectOptions::new("timeout-test");
let client = MqttClient::with_options(options);
let result = timeout(
Duration::from_secs(2),
client.connect("mqtt://192.0.2.1:1883"), )
.await;
assert!(result.is_err() || result.unwrap().is_err());
}
#[tokio::test]
async fn test_invalid_client_id() {
let mut options = ConnectOptions::new("");
options.clean_start = false; let client = MqttClient::with_options(options);
let result = client.connect("mqtt://localhost:1883").await;
if result.is_ok() {
client.disconnect().await.ok();
}
}
#[tokio::test]
async fn test_publish_before_connect() {
let client = MqttClient::new("not-connected");
let result = client.publish("test/topic", "message").await;
assert!(result.is_err());
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_subscribe_before_connect() {
let client = MqttClient::new("not-connected-2");
let result = client.subscribe("test/topic", |_| {}).await;
assert!(result.is_err());
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_invalid_topic_name_publish() {
let client = MqttClient::new("invalid-topic-test");
let result = client.publish("test/+/invalid", "message").await;
assert!(result.is_err(), "Topic with wildcard should be rejected");
let result = client.publish("", "message").await;
assert!(result.is_err(), "Empty topic should be rejected");
let result = client.publish("test\0topic", "message").await;
assert!(
result.is_err(),
"Topic with null character should be rejected"
);
}
#[tokio::test]
async fn test_invalid_topic_filter_subscribe() {
let client = MqttClient::new("invalid-filter-test");
let result = client.subscribe("", |_| {}).await;
assert!(result.is_err(), "Empty topic filter should be rejected");
let result = client.subscribe("test\0filter", |_| {}).await;
assert!(
result.is_err(),
"Topic filter with null character should be rejected"
);
}
#[tokio::test]
async fn test_packet_too_large() {
let mut options = ConnectOptions::new("large-packet-test");
options.properties.maximum_packet_size = Some(1024);
let client = MqttClient::with_options(options);
let large_payload = vec![0u8; 2048]; let result = client.publish("test/large", &large_payload[..]).await;
assert!(result.is_err(), "Large packet should be rejected");
match result.unwrap_err() {
MqttError::NotConnected | MqttError::PacketTooLarge { .. } => {} other => panic!("Expected NotConnected or PacketTooLarge error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_duplicate_connect() {
let client = MqttClient::new("duplicate-connect-test");
let result1 = client.connect("mqtt://localhost:19999").await;
let result2 = client.connect("mqtt://localhost:19999").await;
assert!(
result1.is_err(),
"First connect should fail (no broker on port 19999)"
);
assert!(
result2.is_err(),
"Second connect should fail (no broker on port 19999)"
);
}
#[tokio::test]
async fn test_disconnect_not_connected() {
let client = MqttClient::new("disconnect-test");
let result = client.disconnect().await;
assert!(result.is_err());
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_qos2_timeout() {
let client = MqttClient::new("qos2-timeout-test");
let options = PublishOptions {
qos: QoS::ExactlyOnce,
..Default::default()
};
let result = client
.publish_with_options("test/qos2/timeout", "test", options)
.await;
assert!(
result.is_err(),
"QoS 2 publish should fail when not connected"
);
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_invalid_qos_value() {
let qos = QoS::from(3u8); assert_eq!(qos, QoS::AtMostOnce);
let qos = QoS::from(255u8); assert_eq!(qos, QoS::AtMostOnce); }
#[tokio::test]
async fn test_network_disconnection_during_publish() {
let client = MqttClient::new("network-disconnect-test");
let result = client.publish("test/topic", "message").await;
assert!(result.is_err(), "Publish should fail when not connected");
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_malformed_connack_handling() {
let client = MqttClient::new("malformed-test");
let result = client.connect("mqtt://localhost:19999").await;
assert!(
result.is_err(),
"Connection should fail with no broker on port 19999"
);
match result.unwrap_err() {
MqttError::ConnectionError(_) => {} other => println!("Got error type: {other:?}"), }
}
#[tokio::test]
async fn test_subscribe_with_invalid_qos() {
let client = MqttClient::new("invalid-qos-sub");
let options = SubscribeOptions {
qos: QoS::ExactlyOnce, ..Default::default()
};
let result = client
.subscribe_with_options("test/topic", options, |_| {})
.await;
assert!(result.is_err(), "Subscribe should fail when not connected");
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}
#[tokio::test]
async fn test_connection_lost_callback() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
let disconnected = Arc::new(AtomicBool::new(false));
let options = ConnectOptions::new("callback-test");
let client = MqttClient::with_options(options);
let result = client.disconnect().await;
assert!(result.is_err(), "Disconnect should fail when not connected");
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
assert!(!disconnected.load(Ordering::Relaxed));
}
#[tokio::test]
async fn test_multiple_error_conditions() {
let client = MqttClient::new("multi-error-test");
assert!(client.publish("test", "msg").await.is_err());
assert!(client.subscribe("test", |_| {}).await.is_err());
assert!(client.connect("not-a-valid-url").await.is_err());
assert!(client.disconnect().await.is_err());
}
#[tokio::test]
async fn test_flow_control_exceeded() {
let mut options = ConnectOptions::new("flow-control-test");
options.properties.receive_maximum = Some(2);
let client = MqttClient::with_options(options);
let pub_options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/flow/1", "message 1", pub_options)
.await;
assert!(result.is_err(), "Publish should fail when not connected");
match result.unwrap_err() {
MqttError::NotConnected => {} other => panic!("Expected NotConnected error, got: {other:?}"),
}
}