use mqtt5::time::{Duration, SystemTime, UNIX_EPOCH};
use mqtt5::{broker::MqttBroker, ConnectOptions, ConnectionEvent, MqttClient};
use tokio::time::sleep;
#[tokio::main]
#[allow(clippy::too_many_lines)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let broker_url = std::env::var("MQTT_BROKER").ok();
let broker_handle = if broker_url.is_none() {
println!("🚀 Starting embedded MQTT broker...");
let broker = MqttBroker::bind("127.0.0.1:1883").await?;
println!("✅ Broker listening on mqtt://localhost:1883");
Some(tokio::spawn(async move {
let mut broker = broker;
if let Err(e) = broker.run().await {
eprintln!("❌ Broker error: {e}");
}
}))
} else {
None
};
let broker_url = broker_url.unwrap_or_else(|| "mqtt://localhost:1883".to_string());
println!("📡 Connecting to broker: {broker_url}");
let options = ConnectOptions::new("simple-client-demo")
.with_keep_alive(Duration::from_secs(60)) .with_clean_start(true) .with_automatic_reconnect(true);
let client = MqttClient::with_options(options);
client
.on_connection_event(|event| match event {
ConnectionEvent::Connecting => {
println!("🔌 Connecting to MQTT broker...");
}
ConnectionEvent::Connected { session_present } => {
println!("✅ Connected to MQTT broker (session_present: {session_present})");
}
ConnectionEvent::Disconnected { reason } => {
println!("❌ Disconnected from broker: {reason:?}");
}
ConnectionEvent::Reconnecting { attempt } => {
println!("🔄 Reconnecting... (attempt {attempt})");
}
ConnectionEvent::ReconnectFailed { error } => {
println!("💥 Reconnection failed: {error}");
}
})
.await?;
client.connect(&broker_url).await?;
println!("📬 Subscribing to topic 'demo/messages'");
let (packet_id, granted_qos) = client
.subscribe("demo/messages", |message| {
println!(
"📨 Received message on '{}': {}",
message.topic,
String::from_utf8_lossy(&message.payload)
);
})
.await?;
println!("✅ Subscribed with packet_id={packet_id}, QoS={granted_qos:?}");
println!("📬 Subscribing to topic 'demo/json'");
client
.subscribe("demo/json", |message| {
println!(
"📊 Received JSON on '{}': {}",
message.topic,
String::from_utf8_lossy(&message.payload)
);
if let Ok(json_value) = serde_json::from_slice::<serde_json::Value>(&message.payload) {
println!(" Parsed JSON: {json_value:#}");
}
})
.await?;
println!("📤 Publishing messages...");
client
.publish("demo/messages", b"Hello from Rust MQTT client!")
.await?;
println!(" ✅ Published simple message");
client
.publish_qos1("demo/messages", b"This is a QoS 1 message")
.await?;
println!(" ✅ Published QoS 1 message");
let json_data = serde_json::json!({
"sensor": "temperature",
"value": 23.5,
"unit": "celsius",
"timestamp": SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs()
});
client
.publish_qos1("demo/json", json_data.to_string().as_bytes())
.await?;
println!(" ✅ Published JSON message");
client
.publish_retain("demo/status", b"Client is running")
.await?;
println!(" ✅ Published retained status message");
println!("⏳ Waiting 30 seconds to receive messages...");
println!(" (You can publish to 'demo/messages' or 'demo/json' from another client to see them here)");
for i in 1..=6 {
sleep(Duration::from_secs(5)).await;
let message = format!("Counter message #{i}");
client.publish("demo/messages", message.as_bytes()).await?;
println!(" 📤 Published: {message}");
}
println!("👋 Disconnecting...");
client.disconnect().await?;
println!("✅ Disconnected successfully");
if let Some(handle) = broker_handle {
println!("🛑 Stopping embedded broker...");
handle.abort();
}
Ok(())
}