#![allow(unused_imports)]
#![allow(dead_code)]
use embassy_executor::Spawner;
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
use embassy_sync::mutex::Mutex;
use embassy_time::{Duration, Timer};
use log::{error, info};
use mqtt_async_embedded::client::{MqttClient, MqttOptions};
use mqtt_async_embedded::packet::QoS;
use mqtt_async_embedded::transport::std_tcp::StdTcpTransport;
use static_cell::StaticCell;
type Client<'a> = MqttClient<'a, StdTcpTransport, 1024, 1024>;
static CLIENT: StaticCell<Mutex<NoopRawMutex, Client>> = StaticCell::new();
#[embassy_executor::main]
async fn main(spawner: Spawner) {
env_logger::builder()
.filter(None, log::LevelFilter::Info)
.init();
spawner.spawn(mqtt_poll_task()).unwrap();
let transport = StdTcpTransport::new("localhost:1883")
.await
.expect("Failed to connect to broker");
let options = MqttOptions::new("desktop-client-123");
let client = MqttClient::new(transport, options);
let client = CLIENT.init(Mutex::new(client));
info!("Connecting to MQTT broker...");
{
let mut client_guard = client.lock().await;
client_guard.connect().await.unwrap();
}
info!("Connected!");
info!("Subscribing to 'test/topic'...");
{
let mut client_guard = client.lock().await;
client_guard
.subscribe("test/topic", QoS::AtMostOnce)
.await
.unwrap();
}
info!("Subscribed!");
let mut count = 0;
loop {
let msg = format!("Hello from desktop! Count: {}", count);
info!("Publishing: '{}'", &msg);
{
let mut client_guard = client.lock().await;
client_guard
.publish("test/topic", msg.as_bytes(), QoS::AtMostOnce)
.await
.unwrap();
}
count += 1;
Timer::after(Duration::from_secs(5)).await;
}
}
#[embassy_executor::task]
async fn mqtt_poll_task() {
let client = CLIENT.get();
loop {
let mut client_guard = client.lock().await;
match client_guard.poll().await {
Ok(Some(packet)) => {
info!("Received packet: {:?}", packet);
}
Ok(None) => {
}
Err(e) => {
error!("MQTT poll error: {:?}", e);
Timer::after(Duration::from_secs(1)).await;
}
}
}
}