use mqtt5::{ConnectOptions, MqttClient, PublishOptions, PublishResult, QoS};
#[tokio::test]
async fn test_message_queuing_when_disconnected() {
let options = ConnectOptions::new("test-client").with_clean_start(false);
let client = MqttClient::with_options(options);
assert!(client.is_queue_on_disconnect().await);
let options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/topic", "queued message", options)
.await;
assert!(result.is_ok());
match result.unwrap() {
PublishResult::QoS1Or2 { packet_id } => assert!(packet_id > 0),
PublishResult::QoS0 => panic!("Expected QoS1Or2 result"),
}
}
#[tokio::test]
async fn test_message_queuing_disabled() {
let options = ConnectOptions::new("test-client").with_clean_start(true);
let client = MqttClient::with_options(options);
assert!(!client.is_queue_on_disconnect().await);
let options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/topic", "message", options)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_qos0_not_queued() {
let client = MqttClient::new("test-client");
let options = PublishOptions::default();
let result = client
.publish_with_options("test/topic", "qos0 message", options)
.await;
assert!(result.is_err()); }
#[tokio::test]
async fn test_queue_multiple_messages() {
let options = ConnectOptions::new("test-client").with_clean_start(false);
let client = MqttClient::with_options(options);
let mut packet_ids = Vec::new();
for i in 0..5 {
let options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options(format!("test/topic/{i}"), format!("message {i}"), options)
.await;
assert!(result.is_ok());
match result.unwrap() {
PublishResult::QoS1Or2 { packet_id } => packet_ids.push(packet_id),
PublishResult::QoS0 => panic!("Expected QoS1Or2 result"),
}
}
let mut unique_ids = packet_ids.clone();
unique_ids.sort_unstable();
unique_ids.dedup();
assert_eq!(packet_ids.len(), unique_ids.len());
}
#[tokio::test]
async fn test_toggle_queue_on_disconnect() {
let options = ConnectOptions::new("test-client").with_clean_start(false);
let client = MqttClient::with_options(options);
assert!(client.is_queue_on_disconnect().await);
client.set_queue_on_disconnect(false).await;
assert!(!client.is_queue_on_disconnect().await);
let options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/topic", "message", options)
.await;
assert!(result.is_err());
client.set_queue_on_disconnect(true).await;
assert!(client.is_queue_on_disconnect().await);
let options = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/topic", "message", options)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_message_replay_on_reconnect() {
let options = ConnectOptions::new("test-client").with_clean_start(false);
let client = MqttClient::with_options(options);
let messages = vec![
("test/1", "message 1", QoS::AtLeastOnce),
("test/2", "message 2", QoS::ExactlyOnce),
("test/3", "message 3", QoS::AtLeastOnce),
];
let mut packet_ids = Vec::new();
for (topic, payload, qos) in messages {
let options = PublishOptions {
qos,
..Default::default()
};
let result = client.publish_with_options(topic, payload, options).await;
assert!(result.is_ok());
match result.unwrap() {
PublishResult::QoS1Or2 { packet_id } => packet_ids.push(packet_id),
PublishResult::QoS0 => panic!("Expected QoS1Or2 result"),
}
}
assert_eq!(packet_ids.len(), 3);
}
#[tokio::test]
async fn test_retained_message_queuing() {
let options = ConnectOptions::new("test-client").with_clean_start(false);
let client = MqttClient::with_options(options);
let options = PublishOptions {
qos: QoS::AtLeastOnce,
retain: true,
..Default::default()
};
let result = client
.publish_with_options("test/retained", "retained message", options)
.await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_clean_session_no_queuing() {
let options = ConnectOptions::new("clean-client").with_clean_start(true);
let client = MqttClient::with_options(options);
assert!(!client.is_queue_on_disconnect().await);
client.set_queue_on_disconnect(true).await;
assert!(client.is_queue_on_disconnect().await);
let pub_opts = PublishOptions {
qos: QoS::AtLeastOnce,
..Default::default()
};
let result = client
.publish_with_options("test/topic", "message", pub_opts)
.await;
assert!(result.is_ok());
}