embassy_ha/
lib.rs

1//! MQTT Home Assistant integration library for the [Embassy](https://embassy.dev/) async runtime.
2//!
3//! # Features
4//!
5//! - Support for multiple entity types: sensors, buttons, switches, binary sensors, numbers, device trackers
6//! - Built on top of Embassy's async runtime for embedded systems
7//! - No-std compatible
8//! - Automatic MQTT discovery for Home Assistant
9//! - No runtime allocation
10//!
11//! # Installation
12//!
13//! ```bash
14//! cargo add embassy-ha
15//! ```
16//!
17//! # Quick Start
18//!
19//! This example does not compile as-is because it requires device-specific setup, but it should
20//! be easy to adapt if you already have Embassy running on your microcontroller.
21//!
22//! ```no_run
23//! use embassy_executor::Spawner;
24//! use embassy_ha::{DeviceConfig, SensorConfig, SensorClass, StateClass};
25//! use embassy_time::Timer;
26//! use static_cell::StaticCell;
27//!
28//! static HA_RESOURCES: StaticCell<embassy_ha::DeviceResources> = StaticCell::new();
29//!
30//! #[embassy_executor::main]
31//! async fn main(spawner: Spawner) {
32//!     // Initialize your network stack
33//!     // This is device specific
34//!     let stack: embassy_net::Stack<'static>;
35//! #   let stack = unsafe { core::mem::zeroed() };
36//!
37//!     // Create a Home Assistant device
38//!     let device = embassy_ha::new(
39//!         HA_RESOURCES.init(Default::default()),
40//!         DeviceConfig {
41//!             device_id: "my-device",
42//!             device_name: "My Device",
43//!             manufacturer: "ACME Corp",
44//!             model: "Model X",
45//!         },
46//!     );
47//!
48//!     // Create a temperature sensor
49//!     let sensor_config = SensorConfig {
50//!         class: SensorClass::Temperature,
51//!         state_class: StateClass::Measurement,
52//!         unit: Some(embassy_ha::constants::HA_UNIT_TEMPERATURE_CELSIUS),
53//!         ..Default::default()
54//!     };
55//!     let mut sensor = embassy_ha::create_sensor(&device, "temp-sensor", sensor_config);
56//!
57//!     // Spawn the Home Assistant communication task
58//!     spawner.spawn(ha_task(stack, device)).unwrap();
59//!
60//!     // Main loop - read and publish temperature
61//!     loop {
62//! #       let temperature = 0.0;
63//!         // let temperature = read_temperature().await;
64//!         sensor.publish(temperature);
65//!         Timer::after_secs(60).await;
66//!     }
67//! }
68//!
69//! #[embassy_executor::task]
70//! async fn ha_task(stack: embassy_net::Stack<'static>, device: embassy_ha::Device<'static>) {
71//!     embassy_ha::connect_and_run(stack, device, "mqtt-broker-address").await;
72//! }
73//! ```
74//!
75//! # Examples
76//!
77//! The repository includes several examples demonstrating different entity types. To run an example:
78//!
79//! ```bash
80//! export MQTT_ADDRESS="mqtt://your-mqtt-broker:1883"
81//! cargo run --example sensor
82//! ```
83//!
84//! Available examples:
85//! - `sensor` - Temperature and humidity sensors
86//! - `button` - Triggerable button entity
87//! - `switch` - On/off switch control
88//! - `binary_sensor` - Binary state sensor
89//! - `number` - Numeric input entity
90//! - `device_tracker` - Location tracking entity
91
92#![no_std]
93
94use core::{
95    cell::RefCell,
96    net::{Ipv4Addr, SocketAddrV4},
97    task::Waker,
98};
99
100use embassy_net::tcp::TcpSocket;
101use embassy_sync::waitqueue::AtomicWaker;
102use embassy_time::{Duration, Timer};
103use heapless::{
104    Vec, VecView,
105    string::{String, StringView},
106};
107use serde::Serialize;
108
109mod mqtt;
110
111mod log;
112#[allow(unused)]
113use log::Format;
114
115pub mod constants;
116
117mod binary_state;
118pub use binary_state::*;
119
120mod command_policy;
121pub use command_policy::*;
122
123mod entity;
124pub use entity::*;
125
126mod entity_binary_sensor;
127pub use entity_binary_sensor::*;
128
129mod entity_button;
130pub use entity_button::*;
131
132mod entity_category;
133pub use entity_category::*;
134
135mod entity_device_tracker;
136pub use entity_device_tracker::*;
137
138mod entity_number;
139pub use entity_number::*;
140
141mod entity_sensor;
142pub use entity_sensor::*;
143
144mod entity_switch;
145pub use entity_switch::*;
146
147mod transport;
148pub use transport::Transport;
149
150mod unit;
151pub use unit::*;
152
153const AVAILABLE_PAYLOAD: &str = "online";
154const NOT_AVAILABLE_PAYLOAD: &str = "offline";
155const DEFAULT_KEEPALIVE_TIME: u16 = 30;
156const MQTT_TIMEOUT: Duration = Duration::from_secs(30);
157
158#[derive(Debug)]
159pub struct Error(&'static str);
160
161impl core::fmt::Display for Error {
162    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
163        f.write_str(self.0)
164    }
165}
166
167impl core::error::Error for Error {}
168
169impl Error {
170    pub(crate) fn new(message: &'static str) -> Self {
171        Self(message)
172    }
173}
174
175#[derive(Debug, Clone, Copy, Serialize)]
176#[cfg_attr(feature = "defmt", derive(Format))]
177struct DeviceDiscovery<'a> {
178    identifiers: &'a [&'a str],
179    name: &'a str,
180    manufacturer: &'a str,
181    model: &'a str,
182}
183
184#[derive(Debug, Clone, Copy)]
185#[cfg_attr(feature = "defmt", derive(Format))]
186struct EntityIdDiscovery<'a> {
187    device_id: &'a str,
188    entity_id: &'a str,
189}
190
191impl<'a> core::fmt::Display for EntityIdDiscovery<'a> {
192    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
193        write!(f, "{}_{}", self.device_id, self.entity_id)
194    }
195}
196
197impl<'a> Serialize for EntityIdDiscovery<'a> {
198    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
199    where
200        S: serde::Serializer,
201    {
202        serializer.collect_str(self)
203    }
204}
205
206#[derive(Debug, Serialize)]
207#[cfg_attr(feature = "defmt", derive(Format))]
208struct EntityDiscovery<'a> {
209    #[serde(rename = "unique_id")]
210    id: EntityIdDiscovery<'a>,
211
212    #[serde(skip_serializing_if = "Option::is_none")]
213    name: Option<&'a str>,
214
215    #[serde(skip_serializing_if = "Option::is_none")]
216    device_class: Option<&'a str>,
217
218    #[serde(skip_serializing_if = "Option::is_none")]
219    state_topic: Option<&'a str>,
220
221    #[serde(skip_serializing_if = "Option::is_none")]
222    command_topic: Option<&'a str>,
223
224    #[serde(skip_serializing_if = "Option::is_none")]
225    json_attributes_topic: Option<&'a str>,
226
227    #[serde(skip_serializing_if = "Option::is_none")]
228    unit_of_measurement: Option<&'a str>,
229
230    #[serde(skip_serializing_if = "Option::is_none")]
231    schema: Option<&'a str>,
232
233    #[serde(skip_serializing_if = "Option::is_none")]
234    platform: Option<&'a str>,
235
236    #[serde(skip_serializing_if = "Option::is_none")]
237    state_class: Option<&'a str>,
238
239    #[serde(skip_serializing_if = "Option::is_none")]
240    icon: Option<&'a str>,
241
242    #[serde(skip_serializing_if = "Option::is_none")]
243    entity_category: Option<&'a str>,
244
245    #[serde(skip_serializing_if = "Option::is_none")]
246    entity_picture: Option<&'a str>,
247
248    #[serde(skip_serializing_if = "Option::is_none")]
249    min: Option<f32>,
250
251    #[serde(skip_serializing_if = "Option::is_none")]
252    max: Option<f32>,
253
254    #[serde(skip_serializing_if = "Option::is_none")]
255    step: Option<f32>,
256
257    #[serde(skip_serializing_if = "Option::is_none")]
258    mode: Option<&'a str>,
259
260    #[serde(skip_serializing_if = "Option::is_none")]
261    suggested_display_precision: Option<u8>,
262
263    #[serde(skip_serializing_if = "Option::is_none")]
264    availability_topic: Option<&'a str>,
265
266    #[serde(skip_serializing_if = "Option::is_none")]
267    payload_available: Option<&'a str>,
268
269    #[serde(skip_serializing_if = "Option::is_none")]
270    payload_not_available: Option<&'a str>,
271
272    device: &'a DeviceDiscovery<'a>,
273}
274
275struct DiscoveryTopicDisplay<'a> {
276    domain: &'a str,
277    device_id: &'a str,
278    entity_id: &'a str,
279}
280
281impl<'a> core::fmt::Display for DiscoveryTopicDisplay<'a> {
282    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
283        write!(
284            f,
285            "homeassistant/{}/{}_{}/config",
286            self.domain, self.device_id, self.entity_id
287        )
288    }
289}
290
291struct StateTopicDisplay<'a> {
292    device_id: &'a str,
293    entity_id: &'a str,
294}
295
296impl<'a> core::fmt::Display for StateTopicDisplay<'a> {
297    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
298        write!(f, "embassy-ha/{}/{}/state", self.device_id, self.entity_id)
299    }
300}
301
302struct CommandTopicDisplay<'a> {
303    device_id: &'a str,
304    entity_id: &'a str,
305}
306
307impl<'a> core::fmt::Display for CommandTopicDisplay<'a> {
308    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
309        write!(
310            f,
311            "embassy-ha/{}/{}/command",
312            self.device_id, self.entity_id
313        )
314    }
315}
316
317struct AttributesTopicDisplay<'a> {
318    device_id: &'a str,
319    entity_id: &'a str,
320}
321
322impl<'a> core::fmt::Display for AttributesTopicDisplay<'a> {
323    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
324        write!(
325            f,
326            "embassy-ha/{}/{}/attributes",
327            self.device_id, self.entity_id
328        )
329    }
330}
331
332struct DeviceAvailabilityTopic<'a> {
333    device_id: &'a str,
334}
335
336impl<'a> core::fmt::Display for DeviceAvailabilityTopic<'a> {
337    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
338        write!(f, "embassy-ha/{}/availability", self.device_id)
339    }
340}
341
342pub struct DeviceConfig {
343    pub device_id: &'static str,
344    pub device_name: &'static str,
345    pub manufacturer: &'static str,
346    pub model: &'static str,
347}
348
349#[derive(Default)]
350pub struct DeviceBuffersOwned {
351    pub publish: Vec<u8, 2048>,
352    pub subscribe: Vec<u8, 128>,
353    pub discovery: Vec<u8, 2048>,
354    pub availability_topic: String<128>,
355    pub discovery_topic: String<128>,
356    pub state_topic: String<128>,
357    pub command_topic: String<128>,
358    pub attributes_topic: String<128>,
359}
360
361impl DeviceBuffersOwned {
362    pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> {
363        DeviceBuffers {
364            publish: &mut self.publish,
365            subscribe: &mut self.subscribe,
366            discovery: &mut self.discovery,
367            availability_topic: &mut self.availability_topic,
368            discovery_topic: &mut self.discovery_topic,
369            state_topic: &mut self.state_topic,
370            command_topic: &mut self.command_topic,
371            attributes_topic: &mut self.attributes_topic,
372        }
373    }
374}
375
376pub struct DeviceBuffers<'a> {
377    pub publish: &'a mut VecView<u8>,
378    pub subscribe: &'a mut VecView<u8>,
379    pub discovery: &'a mut VecView<u8>,
380    pub availability_topic: &'a mut StringView,
381    pub discovery_topic: &'a mut StringView,
382    pub state_topic: &'a mut StringView,
383    pub command_topic: &'a mut StringView,
384    pub attributes_topic: &'a mut StringView,
385}
386
387impl<'a> DeviceBuffers<'a> {
388    pub fn clear(&mut self) {
389        self.publish.clear();
390        self.subscribe.clear();
391        self.discovery.clear();
392        self.availability_topic.clear();
393        self.discovery_topic.clear();
394        self.state_topic.clear();
395        self.command_topic.clear();
396        self.attributes_topic.clear();
397    }
398}
399
400pub struct DeviceResources {
401    waker: AtomicWaker,
402    entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT],
403
404    mqtt_resources: mqtt::ClientResources,
405    buffers: DeviceBuffersOwned,
406}
407
408impl DeviceResources {
409    const ENTITY_LIMIT: usize = 16;
410}
411
412impl Default for DeviceResources {
413    fn default() -> Self {
414        Self {
415            waker: AtomicWaker::new(),
416            entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT],
417
418            mqtt_resources: Default::default(),
419            buffers: Default::default(),
420        }
421    }
422}
423
424#[derive(Debug, Default)]
425pub(crate) struct ButtonStorage {
426    pub timestamp: Option<embassy_time::Instant>,
427    pub consumed: bool,
428}
429
430#[derive(Debug)]
431pub(crate) struct SwitchCommand {
432    pub value: BinaryState,
433    #[allow(unused)]
434    pub timestamp: embassy_time::Instant,
435}
436
437#[derive(Debug)]
438pub(crate) struct SwitchState {
439    pub value: BinaryState,
440    #[allow(unused)]
441    pub timestamp: embassy_time::Instant,
442}
443
444#[derive(Debug, Default)]
445pub(crate) struct SwitchStorage {
446    pub state: Option<SwitchState>,
447    pub command: Option<SwitchCommand>,
448    pub command_policy: CommandPolicy,
449}
450
451#[derive(Debug)]
452pub(crate) struct BinarySensorState {
453    pub value: BinaryState,
454    #[allow(unused)]
455    pub timestamp: embassy_time::Instant,
456}
457
458#[derive(Debug, Default)]
459pub(crate) struct BinarySensorStorage {
460    pub state: Option<BinarySensorState>,
461}
462
463#[derive(Debug)]
464pub(crate) struct NumericSensorState {
465    pub value: f32,
466    #[allow(unused)]
467    pub timestamp: embassy_time::Instant,
468}
469
470#[derive(Debug, Default)]
471pub(crate) struct NumericSensorStorage {
472    pub state: Option<NumericSensorState>,
473}
474
475#[derive(Debug)]
476pub(crate) struct NumberState {
477    pub value: f32,
478    #[allow(unused)]
479    pub timestamp: embassy_time::Instant,
480}
481
482#[derive(Debug)]
483pub(crate) struct NumberCommand {
484    pub value: f32,
485    #[allow(unused)]
486    pub timestamp: embassy_time::Instant,
487}
488
489#[derive(Debug, Default)]
490pub(crate) struct NumberStorage {
491    pub state: Option<NumberState>,
492    pub command: Option<NumberCommand>,
493    pub command_policy: CommandPolicy,
494}
495
496#[derive(Debug, Serialize)]
497pub(crate) struct DeviceTrackerState {
498    pub latitude: f32,
499    pub longitude: f32,
500    #[serde(skip_serializing_if = "Option::is_none")]
501    pub gps_accuracy: Option<f32>,
502}
503
504#[derive(Debug, Default)]
505pub(crate) struct DeviceTrackerStorage {
506    pub state: Option<DeviceTrackerState>,
507}
508
509#[derive(Debug)]
510pub(crate) enum EntityStorage {
511    Button(ButtonStorage),
512    Switch(SwitchStorage),
513    BinarySensor(BinarySensorStorage),
514    NumericSensor(NumericSensorStorage),
515    Number(NumberStorage),
516    DeviceTracker(DeviceTrackerStorage),
517}
518
519impl EntityStorage {
520    pub fn as_button_mut(&mut self) -> &mut ButtonStorage {
521        match self {
522            EntityStorage::Button(storage) => storage,
523            _ => panic!("expected storage type to be button"),
524        }
525    }
526
527    pub fn as_switch_mut(&mut self) -> &mut SwitchStorage {
528        match self {
529            EntityStorage::Switch(storage) => storage,
530            _ => panic!("expected storage type to be switch"),
531        }
532    }
533
534    pub fn as_binary_sensor_mut(&mut self) -> &mut BinarySensorStorage {
535        match self {
536            EntityStorage::BinarySensor(storage) => storage,
537            _ => panic!("expected storage type to be binary_sensor"),
538        }
539    }
540
541    pub fn as_numeric_sensor_mut(&mut self) -> &mut NumericSensorStorage {
542        match self {
543            EntityStorage::NumericSensor(storage) => storage,
544            _ => panic!("expected storage type to be numeric_sensor"),
545        }
546    }
547
548    pub fn as_number_mut(&mut self) -> &mut NumberStorage {
549        match self {
550            EntityStorage::Number(storage) => storage,
551            _ => panic!("expected storage type to be number"),
552        }
553    }
554
555    pub fn as_device_tracker_mut(&mut self) -> &mut DeviceTrackerStorage {
556        match self {
557            EntityStorage::DeviceTracker(storage) => storage,
558            _ => panic!("expected storage type to be device tracker"),
559        }
560    }
561}
562
563struct EntityData {
564    config: EntityConfig,
565    storage: EntityStorage,
566    publish: bool,
567    command: bool,
568    command_waker: Option<Waker>,
569}
570
571pub(crate) struct Entity<'a> {
572    pub(crate) data: &'a RefCell<Option<EntityData>>,
573    pub(crate) waker: &'a AtomicWaker,
574}
575
576impl<'a> Entity<'a> {
577    pub fn queue_publish(&mut self) {
578        self.with_data(|data| data.publish = true);
579        self.waker.wake();
580    }
581
582    pub async fn wait_command(&mut self) {
583        struct Fut<'a, 'b>(&'a mut Entity<'b>);
584
585        impl<'a, 'b> core::future::Future for Fut<'a, 'b> {
586            type Output = ();
587
588            fn poll(
589                mut self: core::pin::Pin<&mut Self>,
590                cx: &mut core::task::Context<'_>,
591            ) -> core::task::Poll<Self::Output> {
592                let this = &mut self.as_mut().0;
593                this.with_data(|data| {
594                    let dirty = data.command;
595                    if dirty {
596                        data.command = false;
597                        data.command_waker = None;
598                        core::task::Poll::Ready(())
599                    } else {
600                        // TODO: avoid clone if waker would wake
601                        data.command_waker = Some(cx.waker().clone());
602                        core::task::Poll::Pending
603                    }
604                })
605            }
606        }
607
608        Fut(self).await
609    }
610
611    fn with_data<F, R>(&self, f: F) -> R
612    where
613        F: FnOnce(&mut EntityData) -> R,
614    {
615        f(self.data.borrow_mut().as_mut().unwrap())
616    }
617}
618
619pub struct Device<'a> {
620    config: DeviceConfig,
621
622    // resources
623    waker: &'a AtomicWaker,
624    entities: &'a [RefCell<Option<EntityData>>],
625
626    mqtt_resources: &'a mut mqtt::ClientResources,
627    buffers: DeviceBuffers<'a>,
628}
629
630pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> {
631    Device {
632        config,
633        waker: &resources.waker,
634        entities: &resources.entities,
635
636        mqtt_resources: &mut resources.mqtt_resources,
637        buffers: resources.buffers.as_buffers_mut(),
638    }
639}
640
641fn create_entity<'a>(
642    device: &Device<'a>,
643    config: EntityConfig,
644    storage: EntityStorage,
645) -> Entity<'a> {
646    let index = 'outer: {
647        for idx in 0..device.entities.len() {
648            if device.entities[idx].borrow().is_none() {
649                break 'outer idx;
650            }
651        }
652        panic!("device entity limit reached");
653    };
654
655    let data = EntityData {
656        config,
657        storage,
658        publish: false,
659        command: false,
660        command_waker: None,
661    };
662    device.entities[index].replace(Some(data));
663
664    Entity {
665        data: &device.entities[index],
666        waker: device.waker,
667    }
668}
669
670pub fn create_sensor<'a>(
671    device: &Device<'a>,
672    id: &'static str,
673    config: SensorConfig,
674) -> Sensor<'a> {
675    let mut entity_config = EntityConfig {
676        id,
677        ..Default::default()
678    };
679    config.populate(&mut entity_config);
680
681    let entity = create_entity(
682        device,
683        entity_config,
684        EntityStorage::NumericSensor(Default::default()),
685    );
686    Sensor::new(entity)
687}
688
689pub fn create_button<'a>(
690    device: &Device<'a>,
691    id: &'static str,
692    config: ButtonConfig,
693) -> Button<'a> {
694    let mut entity_config = EntityConfig {
695        id,
696        ..Default::default()
697    };
698    config.populate(&mut entity_config);
699
700    let entity = create_entity(
701        device,
702        entity_config,
703        EntityStorage::Button(Default::default()),
704    );
705    Button::new(entity)
706}
707
708pub fn create_number<'a>(
709    device: &Device<'a>,
710    id: &'static str,
711    config: NumberConfig,
712) -> Number<'a> {
713    let mut entity_config = EntityConfig {
714        id,
715        ..Default::default()
716    };
717    config.populate(&mut entity_config);
718
719    let entity = create_entity(
720        device,
721        entity_config,
722        EntityStorage::Number(NumberStorage {
723            command_policy: config.command_policy,
724            ..Default::default()
725        }),
726    );
727    Number::new(entity)
728}
729
730pub fn create_switch<'a>(
731    device: &Device<'a>,
732    id: &'static str,
733    config: SwitchConfig,
734) -> Switch<'a> {
735    let mut entity_config = EntityConfig {
736        id,
737        ..Default::default()
738    };
739    config.populate(&mut entity_config);
740
741    let entity = create_entity(
742        device,
743        entity_config,
744        EntityStorage::Switch(SwitchStorage {
745            command_policy: config.command_policy,
746            ..Default::default()
747        }),
748    );
749    Switch::new(entity)
750}
751
752pub fn create_binary_sensor<'a>(
753    device: &Device<'a>,
754    id: &'static str,
755    config: BinarySensorConfig,
756) -> BinarySensor<'a> {
757    let mut entity_config = EntityConfig {
758        id,
759        ..Default::default()
760    };
761    config.populate(&mut entity_config);
762
763    let entity = create_entity(
764        device,
765        entity_config,
766        EntityStorage::BinarySensor(Default::default()),
767    );
768    BinarySensor::new(entity)
769}
770
771pub fn create_device_tracker<'a>(
772    device: &Device<'a>,
773    id: &'static str,
774    config: DeviceTrackerConfig,
775) -> DeviceTracker<'a> {
776    let mut entity_config = EntityConfig {
777        id,
778        ..Default::default()
779    };
780    config.populate(&mut entity_config);
781
782    let entity = create_entity(
783        device,
784        entity_config,
785        EntityStorage::DeviceTracker(Default::default()),
786    );
787    DeviceTracker::new(entity)
788}
789
790async fn device_mqtt_subscribe<T: Transport>(
791    client: &mut mqtt::Client<'_, T>,
792    topic: impl core::fmt::Display,
793) -> Result<(), Error> {
794    use core::fmt::Write;
795
796    // Format topic to string for both subscribe call and logging
797    let mut topic_buffer = heapless::String::<128>::new();
798    write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
799    let topic_str = topic_buffer.as_str();
800
801    match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await {
802        Ok(Ok(_)) => Ok(()),
803        Ok(Err(err)) => {
804            crate::log::error!(
805                "mqtt subscribe to '{}' failed with: {:?}",
806                topic_str,
807                crate::log::Debug2Format(&err)
808            );
809            Err(Error::new("mqtt subscribe failed"))
810        }
811        Err(_) => {
812            crate::log::error!("mqtt subscribe to '{}' timed out", topic_str);
813            Err(Error::new("mqtt subscribe timed out"))
814        }
815    }
816}
817
818async fn device_mqtt_publish<T: Transport>(
819    client: &mut mqtt::Client<'_, T>,
820    topic: impl core::fmt::Display,
821    data: &[u8],
822    retain: bool,
823) -> Result<(), Error> {
824    use core::fmt::Write;
825
826    // Format topic to string for both publish call and logging
827    let mut topic_buffer = heapless::String::<128>::new();
828    write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
829    let topic_str = topic_buffer.as_str();
830
831    let result = if retain {
832        embassy_time::with_timeout(
833            MQTT_TIMEOUT,
834            client.publish_with(
835                topic_str,
836                data,
837                mqtt::PublishParams {
838                    retain: true,
839                    ..Default::default()
840                },
841            ),
842        )
843        .await
844    } else {
845        embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await
846    };
847
848    match result {
849        Ok(Ok(_)) => Ok(()),
850        Ok(Err(err)) => {
851            crate::log::error!(
852                "mqtt publish to '{}' failed with: {:?}",
853                topic_str,
854                crate::log::Debug2Format(&err)
855            );
856            Err(Error::new("mqtt publish failed"))
857        }
858        Err(_) => {
859            crate::log::error!("mqtt publish to '{}' timed out", topic_str);
860            Err(Error::new("mqtt publish timed out"))
861        }
862    }
863}
864
865/// Receives MQTT publish data with timeout and proper error handling.
866///
867/// This helper function handles the common pattern of receiving MQTT data with:
868/// - Automatic timeout handling using MQTT_TIMEOUT
869/// - Consistent error logging
870/// - Size validation against buffer capacity
871///
872/// # Arguments
873/// * `client` - MQTT client for receiving data
874/// * `data_len` - Expected length of data to receive
875/// * `buffer` - Buffer to receive the data into
876///
877/// # Returns
878/// * `Ok(&[u8])` - Slice of the buffer containing the received data
879/// * `Err(Error)` - If operation fails, times out, or data exceeds buffer size
880///
881/// # Errors
882/// Returns error if:
883/// - `data_len` is greater than `buffer.len()` (buffer too small)
884/// - The receive operation times out after MQTT_TIMEOUT seconds
885/// - The underlying MQTT receive operation fails
886async fn mqtt_receive_data<'a, T: Transport>(
887    client: &mut mqtt::Client<'_, T>,
888    data_len: usize,
889    buffer: &'a mut [u8],
890) -> Result<&'a [u8], Error> {
891    // Validate buffer size - reject if too small (per user requirement)
892    if data_len > buffer.len() {
893        crate::log::warn!(
894            "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting",
895            data_len,
896            buffer.len()
897        );
898        return Err(Error::new("mqtt payload too large for buffer"));
899    }
900
901    crate::log::debug!("mqtt receiving {} bytes of data", data_len);
902
903    match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len]))
904        .await
905    {
906        Ok(Ok(())) => Ok(&buffer[..data_len]),
907        Ok(Err(err)) => {
908            crate::log::error!(
909                "mqtt receive data failed with: {:?}",
910                crate::log::Debug2Format(&err)
911            );
912            Err(Error::new("mqtt receive data failed"))
913        }
914        Err(_) => {
915            crate::log::error!("mqtt receive data timed out");
916            Err(Error::new("mqtt receive data timed out"))
917        }
918    }
919}
920
921/// Publishes discovery messages for all entities in the device.
922///
923/// This function iterates over all entities, generates their discovery messages,
924/// publishes them to MQTT, and optionally subscribes to their command topics.
925///
926/// # Arguments
927/// * `client` - MQTT client for publishing and subscribing
928/// * `entities` - Slice of entities to publish discoveries for
929/// * `buffers` - Device buffers for generating discovery messages
930/// * `device_config` - Device configuration
931/// * `availability_topic` - The device availability topic string
932/// * `subscribe_to_commands` - Whether to subscribe to command topics (true for initial discovery, false for rediscovery)
933///
934/// # Returns
935/// `Ok(())` if all discoveries were published successfully, or an error if any operation fails
936async fn publish_entity_discoveries<T: Transport>(
937    client: &mut mqtt::Client<'_, T>,
938    entities: &[RefCell<Option<EntityData>>],
939    buffers: &mut DeviceBuffers<'_>,
940    device_config: &DeviceConfig,
941    availability_topic: &str,
942    subscribe_to_commands: bool,
943) -> Result<(), Error> {
944    crate::log::debug!("publishing entity discovery messages");
945
946    for entity in entities {
947        buffers.clear();
948
949        // Borrow the entity and fill out the buffers to be sent
950        // This should be done inside a block so that we do not hold the RefMut across an await
951        {
952            let mut entity = entity.borrow_mut();
953            let entity = match entity.as_mut() {
954                Some(entity) => entity,
955                None => break,
956            };
957
958            generate_entity_discovery(buffers, device_config, &entity.config, availability_topic);
959        }
960
961        let discovery_topic = buffers.discovery_topic.as_str();
962        crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
963        device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?;
964
965        if subscribe_to_commands {
966            let command_topic = buffers.command_topic.as_str();
967            crate::log::debug!("subscribing to command topic '{}'", command_topic);
968            device_mqtt_subscribe(client, command_topic).await?;
969        }
970    }
971
972    Ok(())
973}
974
975fn generate_entity_discovery(
976    buffers: &mut DeviceBuffers<'_>,
977    device_config: &DeviceConfig,
978    entity_config: &EntityConfig,
979    availability_topic: &str,
980) {
981    use core::fmt::Write;
982
983    let discovery_topic_display = DiscoveryTopicDisplay {
984        domain: entity_config.domain,
985        device_id: device_config.device_id,
986        entity_id: entity_config.id,
987    };
988    let state_topic_display = StateTopicDisplay {
989        device_id: device_config.device_id,
990        entity_id: entity_config.id,
991    };
992    let command_topic_display = CommandTopicDisplay {
993        device_id: device_config.device_id,
994        entity_id: entity_config.id,
995    };
996    let attributes_topic_display = AttributesTopicDisplay {
997        device_id: device_config.device_id,
998        entity_id: entity_config.id,
999    };
1000
1001    write!(buffers.discovery_topic, "{discovery_topic_display}")
1002        .expect("discovery topic buffer too small");
1003    write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small");
1004    write!(buffers.command_topic, "{command_topic_display}")
1005        .expect("command topic buffer too small");
1006    write!(buffers.attributes_topic, "{attributes_topic_display}")
1007        .expect("attributes topic buffer too small");
1008
1009    let device_discovery = DeviceDiscovery {
1010        identifiers: &[device_config.device_id],
1011        name: device_config.device_name,
1012        manufacturer: device_config.manufacturer,
1013        model: device_config.model,
1014    };
1015
1016    let discovery = EntityDiscovery {
1017        id: EntityIdDiscovery {
1018            device_id: device_config.device_id,
1019            entity_id: entity_config.id,
1020        },
1021        name: entity_config.name,
1022        device_class: entity_config.device_class,
1023        state_topic: Some(buffers.state_topic.as_str()),
1024        command_topic: Some(buffers.command_topic.as_str()),
1025        json_attributes_topic: Some(buffers.attributes_topic.as_str()),
1026        unit_of_measurement: entity_config.measurement_unit,
1027        schema: entity_config.schema,
1028        platform: entity_config.platform,
1029        state_class: entity_config.state_class,
1030        icon: entity_config.icon,
1031        entity_category: entity_config.category,
1032        entity_picture: entity_config.picture,
1033        min: entity_config.min,
1034        max: entity_config.max,
1035        step: entity_config.step,
1036        mode: entity_config.mode,
1037        suggested_display_precision: entity_config.suggested_display_precision,
1038        availability_topic: Some(availability_topic),
1039        payload_available: Some(AVAILABLE_PAYLOAD),
1040        payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
1041        device: &device_discovery,
1042    };
1043
1044    crate::log::debug!(
1045        "discovery for entity '{}': {:?}",
1046        entity_config.id,
1047        discovery
1048    );
1049
1050    buffers
1051        .discovery
1052        .resize(buffers.discovery.capacity(), 0)
1053        .unwrap();
1054    let n = serde_json_core::to_slice(&discovery, buffers.discovery)
1055        .expect("discovery buffer too small");
1056    buffers.discovery.truncate(n);
1057}
1058
1059/// Runs the main Home Assistant device event loop.
1060///
1061/// This function handles MQTT communication, entity discovery, and state updates. It will run
1062/// until the first error is encountered, at which point it returns immediately.
1063///
1064/// # Behavior
1065///
1066/// - Connects to the MQTT broker using the provided transport
1067/// - Publishes discovery messages for all entities
1068/// - Subscribes to command topics for controllable entities
1069/// - Enters the main event loop to handle state updates and commands
1070/// - Returns on the first error (connection loss, timeout, protocol error, etc.)
1071///
1072/// # Error Handling
1073///
1074/// This function should be called inside a retry loop, as any network error will cause this
1075/// function to fail. When an error occurs, the transport may be in an invalid state and should
1076/// be re-established before calling `run` again.
1077///
1078/// # Example
1079///
1080/// ```no_run
1081/// # use embassy_ha::{Device, Transport};
1082/// # async fn example(mut device: Device<'_>, mut transport: impl Transport) {
1083/// loop {
1084///     match embassy_ha::run(&mut device, &mut transport).await {
1085///         Ok(()) => {
1086///             // Normal exit (this shouldn't happen in practice)
1087///             break;
1088///         }
1089///         Err(err) => {
1090///             // Log error and retry after delay
1091///             // The transport connection should be re-established
1092///             embassy_time::Timer::after_secs(5).await;
1093///         }
1094///     }
1095/// }
1096/// # }
1097/// ```
1098///
1099/// For a higher-level alternative that handles retries automatically, see [`connect_and_run`].
1100pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
1101    use core::fmt::Write;
1102
1103    device.buffers.availability_topic.clear();
1104    write!(
1105        device.buffers.availability_topic,
1106        "{}",
1107        DeviceAvailabilityTopic {
1108            device_id: device.config.device_id
1109        }
1110    )
1111    .expect("device availability buffer too small");
1112
1113    // Store availability_topic in a separate buffer to avoid borrow conflicts
1114    let mut availability_topic_copy = heapless::String::<128>::new();
1115    availability_topic_copy
1116        .push_str(device.buffers.availability_topic.as_str())
1117        .expect("availability topic too large");
1118    let availability_topic = availability_topic_copy.as_str();
1119
1120    let mut ping_ticker =
1121        embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
1122    let mut client = mqtt::Client::new(device.mqtt_resources, transport);
1123    let connect_params = mqtt::ConnectParams {
1124        will_topic: Some(availability_topic),
1125        will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
1126        will_retain: true,
1127        keepalive: Some(DEFAULT_KEEPALIVE_TIME),
1128        ..Default::default()
1129    };
1130    match embassy_time::with_timeout(
1131        MQTT_TIMEOUT,
1132        client.connect_with(device.config.device_id, connect_params),
1133    )
1134    .await
1135    {
1136        Ok(Ok(())) => {}
1137        Ok(Err(err)) => {
1138            crate::log::error!(
1139                "mqtt connect failed with: {:?}",
1140                crate::log::Debug2Format(&err)
1141            );
1142            return Err(Error::new("mqtt connection failed"));
1143        }
1144        Err(_) => {
1145            crate::log::error!("mqtt connect timed out");
1146            return Err(Error::new("mqtt connect timed out"));
1147        }
1148    }
1149
1150    device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?;
1151
1152    publish_entity_discoveries(
1153        &mut client,
1154        device.entities,
1155        &mut device.buffers,
1156        &device.config,
1157        availability_topic,
1158        true,
1159    )
1160    .await?;
1161
1162    device_mqtt_publish(
1163        &mut client,
1164        availability_topic,
1165        AVAILABLE_PAYLOAD.as_bytes(),
1166        true,
1167    )
1168    .await?;
1169
1170    let mut first_iteration_push = true;
1171    'outer_loop: loop {
1172        use core::fmt::Write;
1173
1174        for entity in device.entities {
1175            let publish_topic = {
1176                let mut entity = entity.borrow_mut();
1177                let entity = match entity.as_mut() {
1178                    Some(entity) => entity,
1179                    None => break,
1180                };
1181
1182                if !entity.publish && !first_iteration_push {
1183                    continue;
1184                }
1185
1186                entity.publish = false;
1187                device.buffers.publish.clear();
1188
1189                let mut publish_to_attributes = false;
1190                match &entity.storage {
1191                    EntityStorage::Switch(SwitchStorage {
1192                        state: Some(SwitchState { value, .. }),
1193                        ..
1194                    }) => device
1195                        .buffers
1196                        .publish
1197                        .extend_from_slice(value.as_str().as_bytes())
1198                        .expect("publish buffer too small for switch state payload"),
1199                    EntityStorage::BinarySensor(BinarySensorStorage {
1200                        state: Some(BinarySensorState { value, .. }),
1201                    }) => device
1202                        .buffers
1203                        .publish
1204                        .extend_from_slice(value.as_str().as_bytes())
1205                        .expect("publish buffer too small for binary sensor state payload"),
1206                    EntityStorage::NumericSensor(NumericSensorStorage {
1207                        state: Some(NumericSensorState { value, .. }),
1208                        ..
1209                    }) => write!(device.buffers.publish, "{}", value)
1210                        .expect("publish buffer too small for numeric sensor payload"),
1211                    EntityStorage::Number(NumberStorage {
1212                        state: Some(NumberState { value, .. }),
1213                        ..
1214                    }) => write!(device.buffers.publish, "{}", value)
1215                        .expect("publish buffer too small for number state payload"),
1216                    EntityStorage::DeviceTracker(DeviceTrackerStorage {
1217                        state: Some(tracker_state),
1218                    }) => {
1219                        publish_to_attributes = true;
1220                        device
1221                            .buffers
1222                            .publish
1223                            .resize(device.buffers.publish.capacity(), 0)
1224                            .expect("resize to capacity should never fail");
1225                        let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish)
1226                            .expect("publish buffer too small for tracker state payload");
1227                        device.buffers.publish.truncate(n);
1228                    }
1229                    _ => {
1230                        if !first_iteration_push {
1231                            crate::log::warn!(
1232                                "entity '{}' requested state publish but its storage does not support it",
1233                                entity.config.id
1234                            );
1235                        }
1236                        continue;
1237                    }
1238                }
1239
1240                if publish_to_attributes {
1241                    let attributes_topic_display = AttributesTopicDisplay {
1242                        device_id: device.config.device_id,
1243                        entity_id: entity.config.id,
1244                    };
1245                    device.buffers.attributes_topic.clear();
1246                    write!(
1247                        device.buffers.attributes_topic,
1248                        "{attributes_topic_display}"
1249                    )
1250                    .expect("attributes topic buffer too small");
1251                    device.buffers.attributes_topic.as_str()
1252                } else {
1253                    let state_topic_display = StateTopicDisplay {
1254                        device_id: device.config.device_id,
1255                        entity_id: entity.config.id,
1256                    };
1257                    device.buffers.state_topic.clear();
1258                    write!(device.buffers.state_topic, "{state_topic_display}")
1259                        .expect("state topic buffer too small");
1260                    device.buffers.state_topic.as_str()
1261                }
1262            };
1263
1264            device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?;
1265        }
1266        first_iteration_push = false;
1267
1268        let receive = client.receive();
1269        let waker = wait_on_atomic_waker(device.waker);
1270        let publish =
1271            match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
1272                embassy_futures::select::Either3::First(packet) => match packet {
1273                    Ok(mqtt::Packet::Publish(publish)) => publish,
1274                    Err(err) => {
1275                        crate::log::error!(
1276                            "mqtt receive failed with: {:?}",
1277                            crate::log::Debug2Format(&err)
1278                        );
1279                        return Err(Error::new("mqtt receive failed"));
1280                    }
1281                    _ => continue,
1282                },
1283                embassy_futures::select::Either3::Second(_) => continue,
1284                embassy_futures::select::Either3::Third(_) => {
1285                    if let Err(err) = client.ping().await {
1286                        crate::log::error!(
1287                            "mqtt ping failed with: {:?}",
1288                            crate::log::Debug2Format(&err)
1289                        );
1290                        return Err(Error::new("mqtt ping failed"));
1291                    }
1292                    continue;
1293                }
1294            };
1295
1296        if publish.topic == constants::HA_STATUS_TOPIC {
1297            let mut receive_buffer = [0u8; 64];
1298            let receive_data_len = publish.data_len;
1299            let receive_data =
1300                mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?;
1301
1302            if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() {
1303                first_iteration_push = true;
1304
1305                crate::log::debug!("home assistant came online, republishing discoveries");
1306                publish_entity_discoveries(
1307                    &mut client,
1308                    device.entities,
1309                    &mut device.buffers,
1310                    &device.config,
1311                    availability_topic,
1312                    false,
1313                )
1314                .await?;
1315            }
1316            continue;
1317        }
1318
1319        let entity = 'entity_search_block: {
1320            for entity in device.entities {
1321                let mut data = entity.borrow_mut();
1322                let data = match data.as_mut() {
1323                    Some(data) => data,
1324                    None => break,
1325                };
1326
1327                let command_topic_display = CommandTopicDisplay {
1328                    device_id: device.config.device_id,
1329                    entity_id: data.config.id,
1330                };
1331                device.buffers.command_topic.clear();
1332                write!(device.buffers.command_topic, "{command_topic_display}")
1333                    .expect("command topic buffer too small");
1334
1335                if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() {
1336                    break 'entity_search_block entity;
1337                }
1338            }
1339            continue 'outer_loop;
1340        };
1341
1342        let mut read_buffer = [0u8; 128];
1343        let data_len = publish.data_len;
1344        let receive_data = match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await {
1345            Ok(data) => data,
1346            Err(_) => continue 'outer_loop,
1347        };
1348
1349        let command = match str::from_utf8(receive_data) {
1350            Ok(command) => command,
1351            Err(_) => {
1352                crate::log::warn!("mqtt message contained invalid utf-8, ignoring it");
1353                continue;
1354            }
1355        };
1356
1357        let mut entity = entity.borrow_mut();
1358        let data = entity.as_mut().unwrap();
1359
1360        match &mut data.storage {
1361            EntityStorage::Button(button_storage) => {
1362                if command != constants::HA_BUTTON_PAYLOAD_PRESS {
1363                    crate::log::warn!(
1364                        "button '{}' received unexpected command '{}', expected '{}', ignoring it",
1365                        data.config.id,
1366                        command,
1367                        constants::HA_BUTTON_PAYLOAD_PRESS
1368                    );
1369                    continue;
1370                }
1371                button_storage.consumed = false;
1372                button_storage.timestamp = Some(embassy_time::Instant::now());
1373            }
1374            EntityStorage::Switch(switch_storage) => {
1375                let command = match command.parse::<BinaryState>() {
1376                    Ok(command) => command,
1377                    Err(_) => {
1378                        crate::log::warn!(
1379                            "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it",
1380                            data.config.id,
1381                            command
1382                        );
1383                        continue;
1384                    }
1385                };
1386                let timestamp = embassy_time::Instant::now();
1387                if switch_storage.command_policy == CommandPolicy::PublishState {
1388                    data.publish = true;
1389                    switch_storage.state = Some(SwitchState {
1390                        value: command,
1391                        timestamp,
1392                    });
1393                }
1394                switch_storage.command = Some(SwitchCommand {
1395                    value: command,
1396                    timestamp,
1397                });
1398            }
1399            EntityStorage::Number(number_storage) => {
1400                let command = match command.parse::<f32>() {
1401                    Ok(command) => command,
1402                    Err(_) => {
1403                        crate::log::warn!(
1404                            "number '{}' received invalid command '{}', expected a valid number, ignoring it",
1405                            data.config.id,
1406                            command
1407                        );
1408                        continue;
1409                    }
1410                };
1411                let timestamp = embassy_time::Instant::now();
1412                if number_storage.command_policy == CommandPolicy::PublishState {
1413                    data.publish = true;
1414                    number_storage.state = Some(NumberState {
1415                        value: command,
1416                        timestamp,
1417                    });
1418                }
1419                number_storage.command = Some(NumberCommand {
1420                    value: command,
1421                    timestamp,
1422                });
1423            }
1424            _ => continue 'outer_loop,
1425        }
1426
1427        data.command = true;
1428        if let Some(waker) = data.command_waker.take() {
1429            waker.wake();
1430        }
1431    }
1432}
1433
1434/// High-level function that manages TCP connections and runs the device event loop with automatic retries.
1435///
1436/// This is a convenience wrapper around [`run`] that handles:
1437/// - DNS resolution (if hostname is provided)
1438/// - TCP connection establishment
1439/// - Automatic reconnection on failure with 5-second delay
1440/// - Infinite retry loop
1441///
1442/// # Arguments
1443///
1444/// * `stack` - The Embassy network stack for TCP connections
1445/// * `device` - The Home Assistant device to run
1446/// * `address` - MQTT broker address in one of these formats:
1447///   - `"192.168.1.100"` - IPv4 address (uses default port 1883)
1448///   - `"192.168.1.100:1883"` - IPv4 address with explicit port
1449///   - `"mqtt.example.com"` - Hostname (uses default port 1883)
1450///   - `"mqtt.example.com:1883"` - Hostname with explicit port
1451///
1452/// # Returns
1453///
1454/// This function never returns normally (returns `!`). It runs indefinitely, automatically
1455/// reconnecting on any error.
1456///
1457/// # Example
1458///
1459/// ```no_run
1460/// # use embassy_executor::Spawner;
1461/// # use embassy_ha::{Device, DeviceConfig};
1462/// # use static_cell::StaticCell;
1463/// # static HA_RESOURCES: StaticCell<embassy_ha::DeviceResources> = StaticCell::new();
1464/// #[embassy_executor::task]
1465/// async fn ha_task(stack: embassy_net::Stack<'static>) {
1466///     let device = embassy_ha::new(
1467///         HA_RESOURCES.init(Default::default()),
1468///         DeviceConfig {
1469///             device_id: "my-device",
1470///             device_name: "My Device",
1471///             manufacturer: "ACME",
1472///             model: "X",
1473///         },
1474///     );
1475///
1476///     // This function never returns
1477///     embassy_ha::connect_and_run(stack, device, "mqtt.example.com:1883").await;
1478/// }
1479/// ```
1480pub async fn connect_and_run(
1481    stack: embassy_net::Stack<'_>,
1482    mut device: Device<'_>,
1483    address: &str,
1484) -> ! {
1485    const DEFAULT_MQTT_PORT: u16 = 1883;
1486
1487    let mut rx_buffer = [0u8; 1024];
1488    let mut tx_buffer = [0u8; 1024];
1489    let mut delay = false;
1490
1491    loop {
1492        if !delay {
1493            delay = true;
1494        } else {
1495            crate::log::info!("Retrying connection in 5 seconds...");
1496            Timer::after_secs(5).await;
1497        }
1498
1499        let addr = {
1500            // Try to parse as complete SocketAddrV4 first (e.g., "192.168.1.1:1883")
1501            if let Ok(sock_addr) = address.parse::<SocketAddrV4>() {
1502                sock_addr
1503            }
1504            // Try to parse as Ipv4Addr with default port (e.g., "192.168.1.1")
1505            else if let Ok(ip_addr) = address.parse::<Ipv4Addr>() {
1506                SocketAddrV4::new(ip_addr, DEFAULT_MQTT_PORT)
1507            }
1508            // Otherwise, parse as hostname:port or hostname
1509            else {
1510                let (addr_str, port) = match address.split_once(':') {
1511                    Some((addr_str, port_str)) => {
1512                        let port = port_str
1513                            .parse::<u16>()
1514                            .expect("Invalid port number in address");
1515                        (addr_str, port)
1516                    }
1517                    None => (address, DEFAULT_MQTT_PORT),
1518                };
1519
1520                let addrs = match stack
1521                    .dns_query(addr_str, embassy_net::dns::DnsQueryType::A)
1522                    .await
1523                {
1524                    Ok(addrs) => addrs,
1525                    Err(err) => {
1526                        crate::log::error!(
1527                            "DNS query for '{}' failed with: {:?}",
1528                            addr_str,
1529                            crate::log::Debug2Format(&err)
1530                        );
1531                        continue;
1532                    }
1533                };
1534
1535                #[allow(unreachable_patterns)]
1536                let ipv4_addr = match addrs
1537                    .iter()
1538                    .filter_map(|addr| match addr {
1539                        embassy_net::IpAddress::Ipv4(ipv4) => Some(*ipv4),
1540                        _ => None,
1541                    })
1542                    .next()
1543                {
1544                    Some(addr) => addr,
1545                    None => {
1546                        crate::log::error!(
1547                            "DNS query for '{}' returned no IPv4 addresses",
1548                            addr_str
1549                        );
1550                        continue;
1551                    }
1552                };
1553
1554                SocketAddrV4::new(ipv4_addr, port)
1555            }
1556        };
1557
1558        crate::log::info!("Connecting to MQTT broker at {}", addr);
1559
1560        let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
1561        socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));
1562
1563        let connect_fut = embassy_time::with_timeout(Duration::from_secs(10), socket.connect(addr));
1564        match connect_fut.await {
1565            Ok(Err(err)) => {
1566                crate::log::error!(
1567                    "TCP connect to {} failed with: {:?}",
1568                    addr,
1569                    crate::log::Debug2Format(&err)
1570                );
1571                continue;
1572            }
1573            Err(_) => {
1574                crate::log::error!("TCP connect to {} timed out", addr);
1575                continue;
1576            }
1577            _ => {}
1578        }
1579
1580        socket.set_timeout(None);
1581
1582        if let Err(err) = run(&mut device, &mut socket).await {
1583            crate::log::error!(
1584                "Device run failed with: {:?}",
1585                crate::log::Debug2Format(&err)
1586            );
1587        }
1588    }
1589}
1590
1591async fn wait_on_atomic_waker(waker: &AtomicWaker) {
1592    struct F<'a>(&'a AtomicWaker, bool);
1593    impl<'a> core::future::Future for F<'a> {
1594        type Output = ();
1595
1596        fn poll(
1597            self: core::pin::Pin<&mut Self>,
1598            cx: &mut core::task::Context<'_>,
1599        ) -> core::task::Poll<Self::Output> {
1600            if !self.1 {
1601                self.0.register(cx.waker());
1602                self.get_mut().1 = true;
1603                core::task::Poll::Pending
1604            } else {
1605                core::task::Poll::Ready(())
1606            }
1607        }
1608    }
1609    F(waker, false).await
1610}