1#![no_std]
93
94use core::{
95 cell::RefCell,
96 net::{Ipv4Addr, SocketAddrV4},
97 task::Waker,
98};
99
100use embassy_net::tcp::TcpSocket;
101use embassy_sync::waitqueue::AtomicWaker;
102use embassy_time::{Duration, Timer};
103use heapless::{
104 Vec, VecView,
105 string::{String, StringView},
106};
107use serde::Serialize;
108
109mod mqtt;
110
111mod log;
112#[allow(unused)]
113use log::Format;
114
115pub mod constants;
116
117mod binary_state;
118pub use binary_state::*;
119
120mod command_policy;
121pub use command_policy::*;
122
123mod entity;
124pub use entity::*;
125
126mod entity_binary_sensor;
127pub use entity_binary_sensor::*;
128
129mod entity_button;
130pub use entity_button::*;
131
132mod entity_category;
133pub use entity_category::*;
134
135mod entity_device_tracker;
136pub use entity_device_tracker::*;
137
138mod entity_number;
139pub use entity_number::*;
140
141mod entity_sensor;
142pub use entity_sensor::*;
143
144mod entity_switch;
145pub use entity_switch::*;
146
147mod transport;
148pub use transport::Transport;
149
150mod unit;
151pub use unit::*;
152
153const AVAILABLE_PAYLOAD: &str = "online";
154const NOT_AVAILABLE_PAYLOAD: &str = "offline";
155const DEFAULT_KEEPALIVE_TIME: u16 = 30;
156const MQTT_TIMEOUT: Duration = Duration::from_secs(30);
157
158#[derive(Debug)]
159pub struct Error(&'static str);
160
161impl core::fmt::Display for Error {
162 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
163 f.write_str(self.0)
164 }
165}
166
167impl core::error::Error for Error {}
168
169impl Error {
170 pub(crate) fn new(message: &'static str) -> Self {
171 Self(message)
172 }
173}
174
175#[derive(Debug, Clone, Copy, Serialize)]
176#[cfg_attr(feature = "defmt", derive(Format))]
177struct DeviceDiscovery<'a> {
178 identifiers: &'a [&'a str],
179 name: &'a str,
180 manufacturer: &'a str,
181 model: &'a str,
182}
183
184#[derive(Debug, Clone, Copy)]
185#[cfg_attr(feature = "defmt", derive(Format))]
186struct EntityIdDiscovery<'a> {
187 device_id: &'a str,
188 entity_id: &'a str,
189}
190
191impl<'a> core::fmt::Display for EntityIdDiscovery<'a> {
192 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
193 write!(f, "{}_{}", self.device_id, self.entity_id)
194 }
195}
196
197impl<'a> Serialize for EntityIdDiscovery<'a> {
198 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
199 where
200 S: serde::Serializer,
201 {
202 serializer.collect_str(self)
203 }
204}
205
206#[derive(Debug, Serialize)]
207#[cfg_attr(feature = "defmt", derive(Format))]
208struct EntityDiscovery<'a> {
209 #[serde(rename = "unique_id")]
210 id: EntityIdDiscovery<'a>,
211
212 #[serde(skip_serializing_if = "Option::is_none")]
213 name: Option<&'a str>,
214
215 #[serde(skip_serializing_if = "Option::is_none")]
216 device_class: Option<&'a str>,
217
218 #[serde(skip_serializing_if = "Option::is_none")]
219 state_topic: Option<&'a str>,
220
221 #[serde(skip_serializing_if = "Option::is_none")]
222 command_topic: Option<&'a str>,
223
224 #[serde(skip_serializing_if = "Option::is_none")]
225 json_attributes_topic: Option<&'a str>,
226
227 #[serde(skip_serializing_if = "Option::is_none")]
228 unit_of_measurement: Option<&'a str>,
229
230 #[serde(skip_serializing_if = "Option::is_none")]
231 schema: Option<&'a str>,
232
233 #[serde(skip_serializing_if = "Option::is_none")]
234 platform: Option<&'a str>,
235
236 #[serde(skip_serializing_if = "Option::is_none")]
237 state_class: Option<&'a str>,
238
239 #[serde(skip_serializing_if = "Option::is_none")]
240 icon: Option<&'a str>,
241
242 #[serde(skip_serializing_if = "Option::is_none")]
243 entity_category: Option<&'a str>,
244
245 #[serde(skip_serializing_if = "Option::is_none")]
246 entity_picture: Option<&'a str>,
247
248 #[serde(skip_serializing_if = "Option::is_none")]
249 min: Option<f32>,
250
251 #[serde(skip_serializing_if = "Option::is_none")]
252 max: Option<f32>,
253
254 #[serde(skip_serializing_if = "Option::is_none")]
255 step: Option<f32>,
256
257 #[serde(skip_serializing_if = "Option::is_none")]
258 mode: Option<&'a str>,
259
260 #[serde(skip_serializing_if = "Option::is_none")]
261 suggested_display_precision: Option<u8>,
262
263 #[serde(skip_serializing_if = "Option::is_none")]
264 availability_topic: Option<&'a str>,
265
266 #[serde(skip_serializing_if = "Option::is_none")]
267 payload_available: Option<&'a str>,
268
269 #[serde(skip_serializing_if = "Option::is_none")]
270 payload_not_available: Option<&'a str>,
271
272 device: &'a DeviceDiscovery<'a>,
273}
274
275struct DiscoveryTopicDisplay<'a> {
276 domain: &'a str,
277 device_id: &'a str,
278 entity_id: &'a str,
279}
280
281impl<'a> core::fmt::Display for DiscoveryTopicDisplay<'a> {
282 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
283 write!(
284 f,
285 "homeassistant/{}/{}_{}/config",
286 self.domain, self.device_id, self.entity_id
287 )
288 }
289}
290
291struct StateTopicDisplay<'a> {
292 device_id: &'a str,
293 entity_id: &'a str,
294}
295
296impl<'a> core::fmt::Display for StateTopicDisplay<'a> {
297 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
298 write!(f, "embassy-ha/{}/{}/state", self.device_id, self.entity_id)
299 }
300}
301
302struct CommandTopicDisplay<'a> {
303 device_id: &'a str,
304 entity_id: &'a str,
305}
306
307impl<'a> core::fmt::Display for CommandTopicDisplay<'a> {
308 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
309 write!(
310 f,
311 "embassy-ha/{}/{}/command",
312 self.device_id, self.entity_id
313 )
314 }
315}
316
317struct AttributesTopicDisplay<'a> {
318 device_id: &'a str,
319 entity_id: &'a str,
320}
321
322impl<'a> core::fmt::Display for AttributesTopicDisplay<'a> {
323 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
324 write!(
325 f,
326 "embassy-ha/{}/{}/attributes",
327 self.device_id, self.entity_id
328 )
329 }
330}
331
332struct DeviceAvailabilityTopic<'a> {
333 device_id: &'a str,
334}
335
336impl<'a> core::fmt::Display for DeviceAvailabilityTopic<'a> {
337 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
338 write!(f, "embassy-ha/{}/availability", self.device_id)
339 }
340}
341
342pub struct DeviceConfig {
343 pub device_id: &'static str,
344 pub device_name: &'static str,
345 pub manufacturer: &'static str,
346 pub model: &'static str,
347}
348
349#[derive(Default)]
350pub struct DeviceBuffersOwned {
351 pub publish: Vec<u8, 2048>,
352 pub subscribe: Vec<u8, 128>,
353 pub discovery: Vec<u8, 2048>,
354 pub availability_topic: String<128>,
355 pub discovery_topic: String<128>,
356 pub state_topic: String<128>,
357 pub command_topic: String<128>,
358 pub attributes_topic: String<128>,
359}
360
361impl DeviceBuffersOwned {
362 pub fn as_buffers_mut(&mut self) -> DeviceBuffers<'_> {
363 DeviceBuffers {
364 publish: &mut self.publish,
365 subscribe: &mut self.subscribe,
366 discovery: &mut self.discovery,
367 availability_topic: &mut self.availability_topic,
368 discovery_topic: &mut self.discovery_topic,
369 state_topic: &mut self.state_topic,
370 command_topic: &mut self.command_topic,
371 attributes_topic: &mut self.attributes_topic,
372 }
373 }
374}
375
376pub struct DeviceBuffers<'a> {
377 pub publish: &'a mut VecView<u8>,
378 pub subscribe: &'a mut VecView<u8>,
379 pub discovery: &'a mut VecView<u8>,
380 pub availability_topic: &'a mut StringView,
381 pub discovery_topic: &'a mut StringView,
382 pub state_topic: &'a mut StringView,
383 pub command_topic: &'a mut StringView,
384 pub attributes_topic: &'a mut StringView,
385}
386
387impl<'a> DeviceBuffers<'a> {
388 pub fn clear(&mut self) {
389 self.publish.clear();
390 self.subscribe.clear();
391 self.discovery.clear();
392 self.availability_topic.clear();
393 self.discovery_topic.clear();
394 self.state_topic.clear();
395 self.command_topic.clear();
396 self.attributes_topic.clear();
397 }
398}
399
400pub struct DeviceResources {
401 waker: AtomicWaker,
402 entities: [RefCell<Option<EntityData>>; Self::ENTITY_LIMIT],
403
404 mqtt_resources: mqtt::ClientResources,
405 buffers: DeviceBuffersOwned,
406}
407
408impl DeviceResources {
409 const ENTITY_LIMIT: usize = 16;
410}
411
412impl Default for DeviceResources {
413 fn default() -> Self {
414 Self {
415 waker: AtomicWaker::new(),
416 entities: [const { RefCell::new(None) }; Self::ENTITY_LIMIT],
417
418 mqtt_resources: Default::default(),
419 buffers: Default::default(),
420 }
421 }
422}
423
424#[derive(Debug, Default)]
425pub(crate) struct ButtonStorage {
426 pub timestamp: Option<embassy_time::Instant>,
427 pub consumed: bool,
428}
429
430#[derive(Debug)]
431pub(crate) struct SwitchCommand {
432 pub value: BinaryState,
433 #[allow(unused)]
434 pub timestamp: embassy_time::Instant,
435}
436
437#[derive(Debug)]
438pub(crate) struct SwitchState {
439 pub value: BinaryState,
440 #[allow(unused)]
441 pub timestamp: embassy_time::Instant,
442}
443
444#[derive(Debug, Default)]
445pub(crate) struct SwitchStorage {
446 pub state: Option<SwitchState>,
447 pub command: Option<SwitchCommand>,
448 pub command_policy: CommandPolicy,
449}
450
451#[derive(Debug)]
452pub(crate) struct BinarySensorState {
453 pub value: BinaryState,
454 #[allow(unused)]
455 pub timestamp: embassy_time::Instant,
456}
457
458#[derive(Debug, Default)]
459pub(crate) struct BinarySensorStorage {
460 pub state: Option<BinarySensorState>,
461}
462
463#[derive(Debug)]
464pub(crate) struct NumericSensorState {
465 pub value: f32,
466 #[allow(unused)]
467 pub timestamp: embassy_time::Instant,
468}
469
470#[derive(Debug, Default)]
471pub(crate) struct NumericSensorStorage {
472 pub state: Option<NumericSensorState>,
473}
474
475#[derive(Debug)]
476pub(crate) struct NumberState {
477 pub value: f32,
478 #[allow(unused)]
479 pub timestamp: embassy_time::Instant,
480}
481
482#[derive(Debug)]
483pub(crate) struct NumberCommand {
484 pub value: f32,
485 #[allow(unused)]
486 pub timestamp: embassy_time::Instant,
487}
488
489#[derive(Debug, Default)]
490pub(crate) struct NumberStorage {
491 pub state: Option<NumberState>,
492 pub command: Option<NumberCommand>,
493 pub command_policy: CommandPolicy,
494}
495
496#[derive(Debug, Serialize)]
497pub(crate) struct DeviceTrackerState {
498 pub latitude: f32,
499 pub longitude: f32,
500 #[serde(skip_serializing_if = "Option::is_none")]
501 pub gps_accuracy: Option<f32>,
502}
503
504#[derive(Debug, Default)]
505pub(crate) struct DeviceTrackerStorage {
506 pub state: Option<DeviceTrackerState>,
507}
508
509#[derive(Debug)]
510pub(crate) enum EntityStorage {
511 Button(ButtonStorage),
512 Switch(SwitchStorage),
513 BinarySensor(BinarySensorStorage),
514 NumericSensor(NumericSensorStorage),
515 Number(NumberStorage),
516 DeviceTracker(DeviceTrackerStorage),
517}
518
519impl EntityStorage {
520 pub fn as_button_mut(&mut self) -> &mut ButtonStorage {
521 match self {
522 EntityStorage::Button(storage) => storage,
523 _ => panic!("expected storage type to be button"),
524 }
525 }
526
527 pub fn as_switch_mut(&mut self) -> &mut SwitchStorage {
528 match self {
529 EntityStorage::Switch(storage) => storage,
530 _ => panic!("expected storage type to be switch"),
531 }
532 }
533
534 pub fn as_binary_sensor_mut(&mut self) -> &mut BinarySensorStorage {
535 match self {
536 EntityStorage::BinarySensor(storage) => storage,
537 _ => panic!("expected storage type to be binary_sensor"),
538 }
539 }
540
541 pub fn as_numeric_sensor_mut(&mut self) -> &mut NumericSensorStorage {
542 match self {
543 EntityStorage::NumericSensor(storage) => storage,
544 _ => panic!("expected storage type to be numeric_sensor"),
545 }
546 }
547
548 pub fn as_number_mut(&mut self) -> &mut NumberStorage {
549 match self {
550 EntityStorage::Number(storage) => storage,
551 _ => panic!("expected storage type to be number"),
552 }
553 }
554
555 pub fn as_device_tracker_mut(&mut self) -> &mut DeviceTrackerStorage {
556 match self {
557 EntityStorage::DeviceTracker(storage) => storage,
558 _ => panic!("expected storage type to be device tracker"),
559 }
560 }
561}
562
563struct EntityData {
564 config: EntityConfig,
565 storage: EntityStorage,
566 publish: bool,
567 command: bool,
568 command_waker: Option<Waker>,
569}
570
571pub(crate) struct Entity<'a> {
572 pub(crate) data: &'a RefCell<Option<EntityData>>,
573 pub(crate) waker: &'a AtomicWaker,
574}
575
576impl<'a> Entity<'a> {
577 pub fn queue_publish(&mut self) {
578 self.with_data(|data| data.publish = true);
579 self.waker.wake();
580 }
581
582 pub async fn wait_command(&mut self) {
583 struct Fut<'a, 'b>(&'a mut Entity<'b>);
584
585 impl<'a, 'b> core::future::Future for Fut<'a, 'b> {
586 type Output = ();
587
588 fn poll(
589 mut self: core::pin::Pin<&mut Self>,
590 cx: &mut core::task::Context<'_>,
591 ) -> core::task::Poll<Self::Output> {
592 let this = &mut self.as_mut().0;
593 this.with_data(|data| {
594 let dirty = data.command;
595 if dirty {
596 data.command = false;
597 data.command_waker = None;
598 core::task::Poll::Ready(())
599 } else {
600 data.command_waker = Some(cx.waker().clone());
602 core::task::Poll::Pending
603 }
604 })
605 }
606 }
607
608 Fut(self).await
609 }
610
611 fn with_data<F, R>(&self, f: F) -> R
612 where
613 F: FnOnce(&mut EntityData) -> R,
614 {
615 f(self.data.borrow_mut().as_mut().unwrap())
616 }
617}
618
619pub struct Device<'a> {
620 config: DeviceConfig,
621
622 waker: &'a AtomicWaker,
624 entities: &'a [RefCell<Option<EntityData>>],
625
626 mqtt_resources: &'a mut mqtt::ClientResources,
627 buffers: DeviceBuffers<'a>,
628}
629
630pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Device<'a> {
631 Device {
632 config,
633 waker: &resources.waker,
634 entities: &resources.entities,
635
636 mqtt_resources: &mut resources.mqtt_resources,
637 buffers: resources.buffers.as_buffers_mut(),
638 }
639}
640
641fn create_entity<'a>(
642 device: &Device<'a>,
643 config: EntityConfig,
644 storage: EntityStorage,
645) -> Entity<'a> {
646 let index = 'outer: {
647 for idx in 0..device.entities.len() {
648 if device.entities[idx].borrow().is_none() {
649 break 'outer idx;
650 }
651 }
652 panic!("device entity limit reached");
653 };
654
655 let data = EntityData {
656 config,
657 storage,
658 publish: false,
659 command: false,
660 command_waker: None,
661 };
662 device.entities[index].replace(Some(data));
663
664 Entity {
665 data: &device.entities[index],
666 waker: device.waker,
667 }
668}
669
670pub fn create_sensor<'a>(
671 device: &Device<'a>,
672 id: &'static str,
673 config: SensorConfig,
674) -> Sensor<'a> {
675 let mut entity_config = EntityConfig {
676 id,
677 ..Default::default()
678 };
679 config.populate(&mut entity_config);
680
681 let entity = create_entity(
682 device,
683 entity_config,
684 EntityStorage::NumericSensor(Default::default()),
685 );
686 Sensor::new(entity)
687}
688
689pub fn create_button<'a>(
690 device: &Device<'a>,
691 id: &'static str,
692 config: ButtonConfig,
693) -> Button<'a> {
694 let mut entity_config = EntityConfig {
695 id,
696 ..Default::default()
697 };
698 config.populate(&mut entity_config);
699
700 let entity = create_entity(
701 device,
702 entity_config,
703 EntityStorage::Button(Default::default()),
704 );
705 Button::new(entity)
706}
707
708pub fn create_number<'a>(
709 device: &Device<'a>,
710 id: &'static str,
711 config: NumberConfig,
712) -> Number<'a> {
713 let mut entity_config = EntityConfig {
714 id,
715 ..Default::default()
716 };
717 config.populate(&mut entity_config);
718
719 let entity = create_entity(
720 device,
721 entity_config,
722 EntityStorage::Number(NumberStorage {
723 command_policy: config.command_policy,
724 ..Default::default()
725 }),
726 );
727 Number::new(entity)
728}
729
730pub fn create_switch<'a>(
731 device: &Device<'a>,
732 id: &'static str,
733 config: SwitchConfig,
734) -> Switch<'a> {
735 let mut entity_config = EntityConfig {
736 id,
737 ..Default::default()
738 };
739 config.populate(&mut entity_config);
740
741 let entity = create_entity(
742 device,
743 entity_config,
744 EntityStorage::Switch(SwitchStorage {
745 command_policy: config.command_policy,
746 ..Default::default()
747 }),
748 );
749 Switch::new(entity)
750}
751
752pub fn create_binary_sensor<'a>(
753 device: &Device<'a>,
754 id: &'static str,
755 config: BinarySensorConfig,
756) -> BinarySensor<'a> {
757 let mut entity_config = EntityConfig {
758 id,
759 ..Default::default()
760 };
761 config.populate(&mut entity_config);
762
763 let entity = create_entity(
764 device,
765 entity_config,
766 EntityStorage::BinarySensor(Default::default()),
767 );
768 BinarySensor::new(entity)
769}
770
771pub fn create_device_tracker<'a>(
772 device: &Device<'a>,
773 id: &'static str,
774 config: DeviceTrackerConfig,
775) -> DeviceTracker<'a> {
776 let mut entity_config = EntityConfig {
777 id,
778 ..Default::default()
779 };
780 config.populate(&mut entity_config);
781
782 let entity = create_entity(
783 device,
784 entity_config,
785 EntityStorage::DeviceTracker(Default::default()),
786 );
787 DeviceTracker::new(entity)
788}
789
790async fn device_mqtt_subscribe<T: Transport>(
791 client: &mut mqtt::Client<'_, T>,
792 topic: impl core::fmt::Display,
793) -> Result<(), Error> {
794 use core::fmt::Write;
795
796 let mut topic_buffer = heapless::String::<128>::new();
798 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
799 let topic_str = topic_buffer.as_str();
800
801 match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(topic_str)).await {
802 Ok(Ok(_)) => Ok(()),
803 Ok(Err(err)) => {
804 crate::log::error!(
805 "mqtt subscribe to '{}' failed with: {:?}",
806 topic_str,
807 crate::log::Debug2Format(&err)
808 );
809 Err(Error::new("mqtt subscribe failed"))
810 }
811 Err(_) => {
812 crate::log::error!("mqtt subscribe to '{}' timed out", topic_str);
813 Err(Error::new("mqtt subscribe timed out"))
814 }
815 }
816}
817
818async fn device_mqtt_publish<T: Transport>(
819 client: &mut mqtt::Client<'_, T>,
820 topic: impl core::fmt::Display,
821 data: &[u8],
822 retain: bool,
823) -> Result<(), Error> {
824 use core::fmt::Write;
825
826 let mut topic_buffer = heapless::String::<128>::new();
828 write!(&mut topic_buffer, "{}", topic).expect("topic buffer too small");
829 let topic_str = topic_buffer.as_str();
830
831 let result = if retain {
832 embassy_time::with_timeout(
833 MQTT_TIMEOUT,
834 client.publish_with(
835 topic_str,
836 data,
837 mqtt::PublishParams {
838 retain: true,
839 ..Default::default()
840 },
841 ),
842 )
843 .await
844 } else {
845 embassy_time::with_timeout(MQTT_TIMEOUT, client.publish(topic_str, data)).await
846 };
847
848 match result {
849 Ok(Ok(_)) => Ok(()),
850 Ok(Err(err)) => {
851 crate::log::error!(
852 "mqtt publish to '{}' failed with: {:?}",
853 topic_str,
854 crate::log::Debug2Format(&err)
855 );
856 Err(Error::new("mqtt publish failed"))
857 }
858 Err(_) => {
859 crate::log::error!("mqtt publish to '{}' timed out", topic_str);
860 Err(Error::new("mqtt publish timed out"))
861 }
862 }
863}
864
865async fn mqtt_receive_data<'a, T: Transport>(
887 client: &mut mqtt::Client<'_, T>,
888 data_len: usize,
889 buffer: &'a mut [u8],
890) -> Result<&'a [u8], Error> {
891 if data_len > buffer.len() {
893 crate::log::warn!(
894 "mqtt publish payload is too large ({} bytes, buffer size {} bytes), rejecting",
895 data_len,
896 buffer.len()
897 );
898 return Err(Error::new("mqtt payload too large for buffer"));
899 }
900
901 crate::log::debug!("mqtt receiving {} bytes of data", data_len);
902
903 match embassy_time::with_timeout(MQTT_TIMEOUT, client.receive_data(&mut buffer[..data_len]))
904 .await
905 {
906 Ok(Ok(())) => Ok(&buffer[..data_len]),
907 Ok(Err(err)) => {
908 crate::log::error!(
909 "mqtt receive data failed with: {:?}",
910 crate::log::Debug2Format(&err)
911 );
912 Err(Error::new("mqtt receive data failed"))
913 }
914 Err(_) => {
915 crate::log::error!("mqtt receive data timed out");
916 Err(Error::new("mqtt receive data timed out"))
917 }
918 }
919}
920
921async fn publish_entity_discoveries<T: Transport>(
937 client: &mut mqtt::Client<'_, T>,
938 entities: &[RefCell<Option<EntityData>>],
939 buffers: &mut DeviceBuffers<'_>,
940 device_config: &DeviceConfig,
941 availability_topic: &str,
942 subscribe_to_commands: bool,
943) -> Result<(), Error> {
944 crate::log::debug!("publishing entity discovery messages");
945
946 for entity in entities {
947 buffers.clear();
948
949 {
952 let mut entity = entity.borrow_mut();
953 let entity = match entity.as_mut() {
954 Some(entity) => entity,
955 None => break,
956 };
957
958 generate_entity_discovery(buffers, device_config, &entity.config, availability_topic);
959 }
960
961 let discovery_topic = buffers.discovery_topic.as_str();
962 crate::log::debug!("sending discovery to topic '{}'", discovery_topic);
963 device_mqtt_publish(client, discovery_topic, buffers.discovery, false).await?;
964
965 if subscribe_to_commands {
966 let command_topic = buffers.command_topic.as_str();
967 crate::log::debug!("subscribing to command topic '{}'", command_topic);
968 device_mqtt_subscribe(client, command_topic).await?;
969 }
970 }
971
972 Ok(())
973}
974
975fn generate_entity_discovery(
976 buffers: &mut DeviceBuffers<'_>,
977 device_config: &DeviceConfig,
978 entity_config: &EntityConfig,
979 availability_topic: &str,
980) {
981 use core::fmt::Write;
982
983 let discovery_topic_display = DiscoveryTopicDisplay {
984 domain: entity_config.domain,
985 device_id: device_config.device_id,
986 entity_id: entity_config.id,
987 };
988 let state_topic_display = StateTopicDisplay {
989 device_id: device_config.device_id,
990 entity_id: entity_config.id,
991 };
992 let command_topic_display = CommandTopicDisplay {
993 device_id: device_config.device_id,
994 entity_id: entity_config.id,
995 };
996 let attributes_topic_display = AttributesTopicDisplay {
997 device_id: device_config.device_id,
998 entity_id: entity_config.id,
999 };
1000
1001 write!(buffers.discovery_topic, "{discovery_topic_display}")
1002 .expect("discovery topic buffer too small");
1003 write!(buffers.state_topic, "{state_topic_display}").expect("state topic buffer too small");
1004 write!(buffers.command_topic, "{command_topic_display}")
1005 .expect("command topic buffer too small");
1006 write!(buffers.attributes_topic, "{attributes_topic_display}")
1007 .expect("attributes topic buffer too small");
1008
1009 let device_discovery = DeviceDiscovery {
1010 identifiers: &[device_config.device_id],
1011 name: device_config.device_name,
1012 manufacturer: device_config.manufacturer,
1013 model: device_config.model,
1014 };
1015
1016 let discovery = EntityDiscovery {
1017 id: EntityIdDiscovery {
1018 device_id: device_config.device_id,
1019 entity_id: entity_config.id,
1020 },
1021 name: entity_config.name,
1022 device_class: entity_config.device_class,
1023 state_topic: Some(buffers.state_topic.as_str()),
1024 command_topic: Some(buffers.command_topic.as_str()),
1025 json_attributes_topic: Some(buffers.attributes_topic.as_str()),
1026 unit_of_measurement: entity_config.measurement_unit,
1027 schema: entity_config.schema,
1028 platform: entity_config.platform,
1029 state_class: entity_config.state_class,
1030 icon: entity_config.icon,
1031 entity_category: entity_config.category,
1032 entity_picture: entity_config.picture,
1033 min: entity_config.min,
1034 max: entity_config.max,
1035 step: entity_config.step,
1036 mode: entity_config.mode,
1037 suggested_display_precision: entity_config.suggested_display_precision,
1038 availability_topic: Some(availability_topic),
1039 payload_available: Some(AVAILABLE_PAYLOAD),
1040 payload_not_available: Some(NOT_AVAILABLE_PAYLOAD),
1041 device: &device_discovery,
1042 };
1043
1044 crate::log::debug!(
1045 "discovery for entity '{}': {:?}",
1046 entity_config.id,
1047 discovery
1048 );
1049
1050 buffers
1051 .discovery
1052 .resize(buffers.discovery.capacity(), 0)
1053 .unwrap();
1054 let n = serde_json_core::to_slice(&discovery, buffers.discovery)
1055 .expect("discovery buffer too small");
1056 buffers.discovery.truncate(n);
1057}
1058
1059pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> {
1101 use core::fmt::Write;
1102
1103 device.buffers.availability_topic.clear();
1104 write!(
1105 device.buffers.availability_topic,
1106 "{}",
1107 DeviceAvailabilityTopic {
1108 device_id: device.config.device_id
1109 }
1110 )
1111 .expect("device availability buffer too small");
1112
1113 let mut availability_topic_copy = heapless::String::<128>::new();
1115 availability_topic_copy
1116 .push_str(device.buffers.availability_topic.as_str())
1117 .expect("availability topic too large");
1118 let availability_topic = availability_topic_copy.as_str();
1119
1120 let mut ping_ticker =
1121 embassy_time::Ticker::every(Duration::from_secs(u64::from(DEFAULT_KEEPALIVE_TIME)));
1122 let mut client = mqtt::Client::new(device.mqtt_resources, transport);
1123 let connect_params = mqtt::ConnectParams {
1124 will_topic: Some(availability_topic),
1125 will_payload: Some(NOT_AVAILABLE_PAYLOAD.as_bytes()),
1126 will_retain: true,
1127 keepalive: Some(DEFAULT_KEEPALIVE_TIME),
1128 ..Default::default()
1129 };
1130 match embassy_time::with_timeout(
1131 MQTT_TIMEOUT,
1132 client.connect_with(device.config.device_id, connect_params),
1133 )
1134 .await
1135 {
1136 Ok(Ok(())) => {}
1137 Ok(Err(err)) => {
1138 crate::log::error!(
1139 "mqtt connect failed with: {:?}",
1140 crate::log::Debug2Format(&err)
1141 );
1142 return Err(Error::new("mqtt connection failed"));
1143 }
1144 Err(_) => {
1145 crate::log::error!("mqtt connect timed out");
1146 return Err(Error::new("mqtt connect timed out"));
1147 }
1148 }
1149
1150 device_mqtt_subscribe(&mut client, constants::HA_STATUS_TOPIC).await?;
1151
1152 publish_entity_discoveries(
1153 &mut client,
1154 device.entities,
1155 &mut device.buffers,
1156 &device.config,
1157 availability_topic,
1158 true,
1159 )
1160 .await?;
1161
1162 device_mqtt_publish(
1163 &mut client,
1164 availability_topic,
1165 AVAILABLE_PAYLOAD.as_bytes(),
1166 true,
1167 )
1168 .await?;
1169
1170 let mut first_iteration_push = true;
1171 'outer_loop: loop {
1172 use core::fmt::Write;
1173
1174 for entity in device.entities {
1175 let publish_topic = {
1176 let mut entity = entity.borrow_mut();
1177 let entity = match entity.as_mut() {
1178 Some(entity) => entity,
1179 None => break,
1180 };
1181
1182 if !entity.publish && !first_iteration_push {
1183 continue;
1184 }
1185
1186 entity.publish = false;
1187 device.buffers.publish.clear();
1188
1189 let mut publish_to_attributes = false;
1190 match &entity.storage {
1191 EntityStorage::Switch(SwitchStorage {
1192 state: Some(SwitchState { value, .. }),
1193 ..
1194 }) => device
1195 .buffers
1196 .publish
1197 .extend_from_slice(value.as_str().as_bytes())
1198 .expect("publish buffer too small for switch state payload"),
1199 EntityStorage::BinarySensor(BinarySensorStorage {
1200 state: Some(BinarySensorState { value, .. }),
1201 }) => device
1202 .buffers
1203 .publish
1204 .extend_from_slice(value.as_str().as_bytes())
1205 .expect("publish buffer too small for binary sensor state payload"),
1206 EntityStorage::NumericSensor(NumericSensorStorage {
1207 state: Some(NumericSensorState { value, .. }),
1208 ..
1209 }) => write!(device.buffers.publish, "{}", value)
1210 .expect("publish buffer too small for numeric sensor payload"),
1211 EntityStorage::Number(NumberStorage {
1212 state: Some(NumberState { value, .. }),
1213 ..
1214 }) => write!(device.buffers.publish, "{}", value)
1215 .expect("publish buffer too small for number state payload"),
1216 EntityStorage::DeviceTracker(DeviceTrackerStorage {
1217 state: Some(tracker_state),
1218 }) => {
1219 publish_to_attributes = true;
1220 device
1221 .buffers
1222 .publish
1223 .resize(device.buffers.publish.capacity(), 0)
1224 .expect("resize to capacity should never fail");
1225 let n = serde_json_core::to_slice(&tracker_state, device.buffers.publish)
1226 .expect("publish buffer too small for tracker state payload");
1227 device.buffers.publish.truncate(n);
1228 }
1229 _ => {
1230 if !first_iteration_push {
1231 crate::log::warn!(
1232 "entity '{}' requested state publish but its storage does not support it",
1233 entity.config.id
1234 );
1235 }
1236 continue;
1237 }
1238 }
1239
1240 if publish_to_attributes {
1241 let attributes_topic_display = AttributesTopicDisplay {
1242 device_id: device.config.device_id,
1243 entity_id: entity.config.id,
1244 };
1245 device.buffers.attributes_topic.clear();
1246 write!(
1247 device.buffers.attributes_topic,
1248 "{attributes_topic_display}"
1249 )
1250 .expect("attributes topic buffer too small");
1251 device.buffers.attributes_topic.as_str()
1252 } else {
1253 let state_topic_display = StateTopicDisplay {
1254 device_id: device.config.device_id,
1255 entity_id: entity.config.id,
1256 };
1257 device.buffers.state_topic.clear();
1258 write!(device.buffers.state_topic, "{state_topic_display}")
1259 .expect("state topic buffer too small");
1260 device.buffers.state_topic.as_str()
1261 }
1262 };
1263
1264 device_mqtt_publish(&mut client, publish_topic, device.buffers.publish, false).await?;
1265 }
1266 first_iteration_push = false;
1267
1268 let receive = client.receive();
1269 let waker = wait_on_atomic_waker(device.waker);
1270 let publish =
1271 match embassy_futures::select::select3(receive, waker, ping_ticker.next()).await {
1272 embassy_futures::select::Either3::First(packet) => match packet {
1273 Ok(mqtt::Packet::Publish(publish)) => publish,
1274 Err(err) => {
1275 crate::log::error!(
1276 "mqtt receive failed with: {:?}",
1277 crate::log::Debug2Format(&err)
1278 );
1279 return Err(Error::new("mqtt receive failed"));
1280 }
1281 _ => continue,
1282 },
1283 embassy_futures::select::Either3::Second(_) => continue,
1284 embassy_futures::select::Either3::Third(_) => {
1285 if let Err(err) = client.ping().await {
1286 crate::log::error!(
1287 "mqtt ping failed with: {:?}",
1288 crate::log::Debug2Format(&err)
1289 );
1290 return Err(Error::new("mqtt ping failed"));
1291 }
1292 continue;
1293 }
1294 };
1295
1296 if publish.topic == constants::HA_STATUS_TOPIC {
1297 let mut receive_buffer = [0u8; 64];
1298 let receive_data_len = publish.data_len;
1299 let receive_data =
1300 mqtt_receive_data(&mut client, receive_data_len, &mut receive_buffer).await?;
1301
1302 if receive_data == constants::HA_STATUS_PAYLOAD_ONLINE.as_bytes() {
1303 first_iteration_push = true;
1304
1305 crate::log::debug!("home assistant came online, republishing discoveries");
1306 publish_entity_discoveries(
1307 &mut client,
1308 device.entities,
1309 &mut device.buffers,
1310 &device.config,
1311 availability_topic,
1312 false,
1313 )
1314 .await?;
1315 }
1316 continue;
1317 }
1318
1319 let entity = 'entity_search_block: {
1320 for entity in device.entities {
1321 let mut data = entity.borrow_mut();
1322 let data = match data.as_mut() {
1323 Some(data) => data,
1324 None => break,
1325 };
1326
1327 let command_topic_display = CommandTopicDisplay {
1328 device_id: device.config.device_id,
1329 entity_id: data.config.id,
1330 };
1331 device.buffers.command_topic.clear();
1332 write!(device.buffers.command_topic, "{command_topic_display}")
1333 .expect("command topic buffer too small");
1334
1335 if device.buffers.command_topic.as_bytes() == publish.topic.as_bytes() {
1336 break 'entity_search_block entity;
1337 }
1338 }
1339 continue 'outer_loop;
1340 };
1341
1342 let mut read_buffer = [0u8; 128];
1343 let data_len = publish.data_len;
1344 let receive_data = match mqtt_receive_data(&mut client, data_len, &mut read_buffer).await {
1345 Ok(data) => data,
1346 Err(_) => continue 'outer_loop,
1347 };
1348
1349 let command = match str::from_utf8(receive_data) {
1350 Ok(command) => command,
1351 Err(_) => {
1352 crate::log::warn!("mqtt message contained invalid utf-8, ignoring it");
1353 continue;
1354 }
1355 };
1356
1357 let mut entity = entity.borrow_mut();
1358 let data = entity.as_mut().unwrap();
1359
1360 match &mut data.storage {
1361 EntityStorage::Button(button_storage) => {
1362 if command != constants::HA_BUTTON_PAYLOAD_PRESS {
1363 crate::log::warn!(
1364 "button '{}' received unexpected command '{}', expected '{}', ignoring it",
1365 data.config.id,
1366 command,
1367 constants::HA_BUTTON_PAYLOAD_PRESS
1368 );
1369 continue;
1370 }
1371 button_storage.consumed = false;
1372 button_storage.timestamp = Some(embassy_time::Instant::now());
1373 }
1374 EntityStorage::Switch(switch_storage) => {
1375 let command = match command.parse::<BinaryState>() {
1376 Ok(command) => command,
1377 Err(_) => {
1378 crate::log::warn!(
1379 "switch '{}' received invalid command '{}', expected 'ON' or 'OFF', ignoring it",
1380 data.config.id,
1381 command
1382 );
1383 continue;
1384 }
1385 };
1386 let timestamp = embassy_time::Instant::now();
1387 if switch_storage.command_policy == CommandPolicy::PublishState {
1388 data.publish = true;
1389 switch_storage.state = Some(SwitchState {
1390 value: command,
1391 timestamp,
1392 });
1393 }
1394 switch_storage.command = Some(SwitchCommand {
1395 value: command,
1396 timestamp,
1397 });
1398 }
1399 EntityStorage::Number(number_storage) => {
1400 let command = match command.parse::<f32>() {
1401 Ok(command) => command,
1402 Err(_) => {
1403 crate::log::warn!(
1404 "number '{}' received invalid command '{}', expected a valid number, ignoring it",
1405 data.config.id,
1406 command
1407 );
1408 continue;
1409 }
1410 };
1411 let timestamp = embassy_time::Instant::now();
1412 if number_storage.command_policy == CommandPolicy::PublishState {
1413 data.publish = true;
1414 number_storage.state = Some(NumberState {
1415 value: command,
1416 timestamp,
1417 });
1418 }
1419 number_storage.command = Some(NumberCommand {
1420 value: command,
1421 timestamp,
1422 });
1423 }
1424 _ => continue 'outer_loop,
1425 }
1426
1427 data.command = true;
1428 if let Some(waker) = data.command_waker.take() {
1429 waker.wake();
1430 }
1431 }
1432}
1433
1434pub async fn connect_and_run(
1481 stack: embassy_net::Stack<'_>,
1482 mut device: Device<'_>,
1483 address: &str,
1484) -> ! {
1485 const DEFAULT_MQTT_PORT: u16 = 1883;
1486
1487 let mut rx_buffer = [0u8; 1024];
1488 let mut tx_buffer = [0u8; 1024];
1489 let mut delay = false;
1490
1491 loop {
1492 if !delay {
1493 delay = true;
1494 } else {
1495 crate::log::info!("Retrying connection in 5 seconds...");
1496 Timer::after_secs(5).await;
1497 }
1498
1499 let addr = {
1500 if let Ok(sock_addr) = address.parse::<SocketAddrV4>() {
1502 sock_addr
1503 }
1504 else if let Ok(ip_addr) = address.parse::<Ipv4Addr>() {
1506 SocketAddrV4::new(ip_addr, DEFAULT_MQTT_PORT)
1507 }
1508 else {
1510 let (addr_str, port) = match address.split_once(':') {
1511 Some((addr_str, port_str)) => {
1512 let port = port_str
1513 .parse::<u16>()
1514 .expect("Invalid port number in address");
1515 (addr_str, port)
1516 }
1517 None => (address, DEFAULT_MQTT_PORT),
1518 };
1519
1520 let addrs = match stack
1521 .dns_query(addr_str, embassy_net::dns::DnsQueryType::A)
1522 .await
1523 {
1524 Ok(addrs) => addrs,
1525 Err(err) => {
1526 crate::log::error!(
1527 "DNS query for '{}' failed with: {:?}",
1528 addr_str,
1529 crate::log::Debug2Format(&err)
1530 );
1531 continue;
1532 }
1533 };
1534
1535 #[allow(unreachable_patterns)]
1536 let ipv4_addr = match addrs
1537 .iter()
1538 .filter_map(|addr| match addr {
1539 embassy_net::IpAddress::Ipv4(ipv4) => Some(*ipv4),
1540 _ => None,
1541 })
1542 .next()
1543 {
1544 Some(addr) => addr,
1545 None => {
1546 crate::log::error!(
1547 "DNS query for '{}' returned no IPv4 addresses",
1548 addr_str
1549 );
1550 continue;
1551 }
1552 };
1553
1554 SocketAddrV4::new(ipv4_addr, port)
1555 }
1556 };
1557
1558 crate::log::info!("Connecting to MQTT broker at {}", addr);
1559
1560 let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
1561 socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));
1562
1563 let connect_fut = embassy_time::with_timeout(Duration::from_secs(10), socket.connect(addr));
1564 match connect_fut.await {
1565 Ok(Err(err)) => {
1566 crate::log::error!(
1567 "TCP connect to {} failed with: {:?}",
1568 addr,
1569 crate::log::Debug2Format(&err)
1570 );
1571 continue;
1572 }
1573 Err(_) => {
1574 crate::log::error!("TCP connect to {} timed out", addr);
1575 continue;
1576 }
1577 _ => {}
1578 }
1579
1580 socket.set_timeout(None);
1581
1582 if let Err(err) = run(&mut device, &mut socket).await {
1583 crate::log::error!(
1584 "Device run failed with: {:?}",
1585 crate::log::Debug2Format(&err)
1586 );
1587 }
1588 }
1589}
1590
1591async fn wait_on_atomic_waker(waker: &AtomicWaker) {
1592 struct F<'a>(&'a AtomicWaker, bool);
1593 impl<'a> core::future::Future for F<'a> {
1594 type Output = ();
1595
1596 fn poll(
1597 self: core::pin::Pin<&mut Self>,
1598 cx: &mut core::task::Context<'_>,
1599 ) -> core::task::Poll<Self::Output> {
1600 if !self.1 {
1601 self.0.register(cx.waker());
1602 self.get_mut().1 = true;
1603 core::task::Poll::Pending
1604 } else {
1605 core::task::Poll::Ready(())
1606 }
1607 }
1608 }
1609 F(waker, false).await
1610}