#![allow(clippy::doc_markdown)]
mod common;
use common::cli_helpers::*;
use common::TestBroker;
use mqtt5::time::Duration;
#[tokio::test]
async fn test_cli_pub_sub_basic() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let result = verify_pub_sub_delivery(broker_url, "test/basic", "Hello MQTT", &[]).await;
assert!(result.is_ok(), "Basic pub/sub failed: {:?}", result.err());
assert!(result.unwrap(), "Message not delivered");
println!("✅ Basic pub/sub works with verified message delivery");
}
#[tokio::test]
async fn test_cli_session_persistence() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let client_id = "persist-test";
let result = verify_session_persistence(broker_url, client_id).await;
match result {
Ok(true) => {
println!("✅ Session persistence verified - session was resumed");
}
Ok(false) => {
println!("⚠️ Session not resumed - broker may not support persistence");
}
Err(e) => {
println!("❌ Session persistence test failed: {e}");
panic!("Session persistence test failed");
}
}
}
#[tokio::test]
async fn test_cli_will_message_delivery() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let result = verify_will_delivery(broker_url, "test/will/status", "disconnected").await;
match result {
Ok(true) => {
println!("✅ Will message was delivered successfully");
}
Ok(false) => {
println!("❌ Will message was not delivered");
panic!("Will message delivery failed");
}
Err(e) => {
println!("❌ Will message test error: {e}");
panic!("Will message test failed");
}
}
}
#[tokio::test]
async fn test_cli_qos_delivery() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
for qos in 0..=2 {
let result = verify_qos_delivery(broker_url, qos).await;
assert!(result.is_ok(), "QoS {qos} test failed: {:?}", result.err());
assert!(result.unwrap(), "QoS {qos} message not delivered");
}
println!("✅ All QoS levels work correctly with verified delivery");
}
#[tokio::test]
async fn test_cli_wildcard_subscriptions() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let sub_handle = run_cli_sub_async(broker_url, "test/wild/+/data", 2, &["--verbose"]);
tokio::time::sleep(Duration::from_millis(500)).await;
for sensor in ["temp", "humidity"] {
let pub_result = run_cli_pub(
broker_url,
&format!("test/wild/{sensor}/data"),
&format!("{sensor}_value"),
&[],
)
.await;
assert!(pub_result.success, "Failed to publish to {sensor}");
tokio::time::sleep(Duration::from_millis(100)).await;
}
let sub_result = tokio::time::timeout(Duration::from_secs(3), sub_handle)
.await
.expect("Timeout waiting for messages")
.expect("Subscribe task failed");
assert!(
sub_result.stdout_contains("temp_value"),
"Should receive temp message"
);
assert!(
sub_result.stdout_contains("humidity_value"),
"Should receive humidity message"
);
println!("✅ Wildcard subscriptions work with verified routing");
}
#[tokio::test]
async fn test_cli_with_auth() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let result = verify_authentication(
broker_url, "user1", "pass1", true, )
.await;
assert!(
result.is_ok(),
"Authentication test failed: {:?}",
result.err()
);
println!("✅ Authentication parameters work correctly");
}
#[tokio::test]
async fn test_cli_stdin_message() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let sub_handle = run_cli_sub_async(broker_url, "test/stdin", 1, &[]);
tokio::time::sleep(Duration::from_millis(500)).await;
let pub_result = run_cli_with_stdin(
&[
"pub",
"--url",
broker_url,
"--topic",
"test/stdin",
"--stdin",
"--non-interactive",
],
"Message from stdin\n",
)
.await;
assert!(pub_result.success, "Failed to publish from stdin");
let sub_result = tokio::time::timeout(Duration::from_secs(2), sub_handle)
.await
.expect("Timeout")
.expect("Subscribe task failed");
assert!(
sub_result.stdout_contains("Message from stdin"),
"Stdin message not received"
);
println!("✅ Stdin message input works with verified delivery");
}
#[tokio::test]
async fn test_cli_file_message() {
let broker = TestBroker::start().await;
let broker_url = broker.address();
let temp_file = "/tmp/mqtt_test_msg.txt";
std::fs::write(temp_file, "Message from file").expect("Failed to write temp file");
let sub_handle = run_cli_sub_async(broker_url, "test/file", 1, &[]);
tokio::time::sleep(Duration::from_millis(500)).await;
let pub_result = run_cli_command(&[
"pub",
"--url",
broker_url,
"--topic",
"test/file",
"--file",
temp_file,
"--non-interactive",
])
.await;
assert!(pub_result.success, "Failed to publish from file");
let sub_result = tokio::time::timeout(Duration::from_secs(2), sub_handle)
.await
.expect("Timeout")
.expect("Subscribe task failed");
assert!(
sub_result.stdout_contains("Message from file"),
"File message not received"
);
let _ = std::fs::remove_file(temp_file);
println!("✅ File message input works with verified delivery");
}