#![allow(clippy::large_futures)]
mod common;
use common::TestBroker;
use mqtt5::time::Duration;
use mqtt5::{MqttClient, PublishOptions, QoS};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::time::timeout;
#[tokio::test]
async fn test_retained_message_delivery() -> Result<(), Box<dyn std::error::Error>> {
let broker = TestBroker::start().await;
println!("Step 1: Publishing retained message...");
let publisher = MqttClient::new("publisher");
publisher.connect(broker.address()).await?;
let options = PublishOptions {
retain: true,
qos: QoS::AtLeastOnce,
..Default::default()
};
publisher
.publish_with_options(
"test/retained/topic",
b"This is a retained message!",
options,
)
.await?;
println!("✓ Retained message published");
publisher.disconnect().await?;
tokio::time::sleep(Duration::from_millis(500)).await;
println!("\nStep 2: New subscriber connecting...");
let subscriber = MqttClient::new("subscriber");
subscriber.connect(broker.address()).await?;
let received = Arc::new(AtomicBool::new(false));
let received_clone = received.clone();
subscriber
.subscribe("test/retained/topic", move |msg| {
println!(
"✓ Received message: {}",
String::from_utf8_lossy(&msg.payload)
);
println!(" Topic: {}", msg.topic);
println!(" Retained: {}", msg.retain);
received_clone.store(true, Ordering::Relaxed);
})
.await?;
let result = timeout(Duration::from_secs(2), async {
while !received.load(Ordering::Relaxed) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await;
match result {
Ok(()) => println!("\n✅ SUCCESS: Retained message was delivered to new subscriber!"),
Err(_) => println!("\n❌ FAILED: Retained message was NOT delivered to new subscriber"),
}
subscriber.disconnect().await?;
Ok(())
}