1use futures::FutureExt;
7use futures::future::try_join;
8
9use mac_address::get_mac_address;
10use rumqttc::{
11 self, AsyncClient, ClientError, ConnectionError, Event, EventLoop, Incoming, LastWill,
12 MqttOptions, QoS,
13};
14use std::fmt::{self, Debug, Display, Formatter};
15use std::future::Future;
16use std::pin::Pin;
17use std::str;
18use std::time::{Duration, Instant};
19use thiserror::Error;
20use tokio::task::{self, JoinError, JoinHandle};
21use tokio::time::sleep;
22
23mod types;
24pub use crate::types::{Datatype, Node, Property};
25mod values;
26pub use crate::values::{Color, ColorFormat, ColorHsv, ColorRgb};
27
28const HOMIE_VERSION: &str = "4.0";
29const HOMIE_IMPLEMENTATION: &str = "homie-rs";
30const STATS_INTERVAL: Duration = Duration::from_secs(60);
31const REQUESTS_CAP: usize = 10;
32
33#[derive(Error, Debug)]
35pub enum SpawnError {
36 #[error("{0}")]
37 Client(#[from] ClientError),
38 #[error("{0}")]
39 Connection(#[from] ConnectionError),
40 #[error("Task failed: {0}")]
41 Join(#[from] JoinError),
42 #[error("Internal error: {0}")]
43 Internal(&'static str),
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
47enum State {
48 Init,
50 Ready,
52 Disconnected,
54 Sleeping,
56 Lost,
59 Alert,
62}
63
64impl State {
65 fn as_str(&self) -> &'static str {
66 match self {
67 Self::Init => "init",
68 Self::Ready => "ready",
69 Self::Disconnected => "disconnected",
70 Self::Sleeping => "sleeping",
71 Self::Lost => "lost",
72 Self::Alert => "alert",
73 }
74 }
75}
76
77impl Display for State {
78 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
79 f.write_str(self.as_str())
80 }
81}
82
83impl From<State> for Vec<u8> {
84 fn from(state: State) -> Self {
85 state.as_str().into()
86 }
87}
88
89type UpdateCallback = Box<
90 dyn FnMut(String, String, String) -> Pin<Box<dyn Future<Output = Option<String>> + Send>>
91 + Send
92 + Sync,
93>;
94
95pub struct HomieDeviceBuilder {
97 device_base: String,
98 device_name: String,
99 firmware_name: Option<String>,
100 firmware_version: Option<String>,
101 mqtt_options: MqttOptions,
102 update_callback: Option<UpdateCallback>,
103}
104
105impl Debug for HomieDeviceBuilder {
106 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
107 f.debug_struct("HomieDeviceBuilder")
108 .field("device_base", &self.device_base)
109 .field("device_name", &self.device_name)
110 .field("firmware_name", &self.firmware_name)
111 .field("firmware_version", &self.firmware_version)
112 .field("mqtt_options", &self.mqtt_options)
113 .field(
114 "update_callback",
115 &self.update_callback.as_ref().map(|_| "..."),
116 )
117 .finish()
118 }
119}
120
121impl HomieDeviceBuilder {
122 pub fn set_firmware(&mut self, firmware_name: &str, firmware_version: &str) {
126 self.firmware_name = Some(firmware_name.to_string());
127 self.firmware_version = Some(firmware_version.to_string());
128 }
129
130 pub fn set_update_callback<F, Fut>(&mut self, mut update_callback: F)
131 where
132 F: (FnMut(String, String, String) -> Fut) + Send + Sync + 'static,
133 Fut: Future<Output = Option<String>> + Send + 'static,
134 {
135 self.update_callback = Some(Box::new(
136 move |node_id: String, property_id: String, value: String| {
137 update_callback(node_id, property_id, value).boxed()
138 },
139 ));
140 }
141
142 pub async fn spawn(
149 self,
150 ) -> Result<(HomieDevice, impl Future<Output = Result<(), SpawnError>>), ClientError> {
151 let (event_loop, mut homie, stats, firmware, update_callback) = self.build();
152
153 let event_task = homie.spawn(event_loop, update_callback);
155
156 stats.start().await?;
157 if let Some(firmware) = firmware {
158 firmware.start().await?;
159 }
160 homie.start().await?;
161
162 let stats_task = stats.spawn();
163 let join_handle = try_join(event_task, stats_task).map(simplify_unit_pair);
164
165 Ok((homie, join_handle))
166 }
167
168 fn build(
169 self,
170 ) -> (
171 EventLoop,
172 HomieDevice,
173 HomieStats,
174 Option<HomieFirmware>,
175 Option<UpdateCallback>,
176 ) {
177 let mut mqtt_options = self.mqtt_options;
178 let last_will = LastWill::new(
179 format!("{}/$state", self.device_base),
180 State::Lost,
181 QoS::AtLeastOnce,
182 true,
183 );
184 mqtt_options.set_last_will(last_will);
185 let (client, event_loop) = AsyncClient::new(mqtt_options, REQUESTS_CAP);
186
187 let publisher = DevicePublisher::new(client, self.device_base);
188
189 let mut extension_ids = vec![HomieStats::EXTENSION_ID];
190 let stats = HomieStats::new(publisher.clone());
191 let firmware = if let (Some(firmware_name), Some(firmware_version)) =
192 (self.firmware_name, self.firmware_version)
193 {
194 extension_ids.push(HomieFirmware::EXTENSION_ID);
195 Some(HomieFirmware::new(
196 publisher.clone(),
197 firmware_name,
198 firmware_version,
199 ))
200 } else {
201 None
202 };
203
204 let homie = HomieDevice::new(publisher, self.device_name, &extension_ids);
205
206 (event_loop, homie, stats, firmware, self.update_callback)
207 }
208}
209
210#[derive(Debug)]
213pub struct HomieDevice {
214 publisher: DevicePublisher,
215 device_name: String,
216 nodes: Vec<Node>,
217 state: State,
218 extension_ids: String,
219}
220
221impl HomieDevice {
222 pub fn builder(
232 device_base: &str,
233 device_name: &str,
234 mqtt_options: MqttOptions,
235 ) -> HomieDeviceBuilder {
236 HomieDeviceBuilder {
237 device_base: device_base.to_string(),
238 device_name: device_name.to_string(),
239 firmware_name: None,
240 firmware_version: None,
241 mqtt_options,
242 update_callback: None,
243 }
244 }
245
246 fn new(publisher: DevicePublisher, device_name: String, extension_ids: &[&str]) -> HomieDevice {
247 HomieDevice {
248 publisher,
249 device_name,
250 nodes: vec![],
251 state: State::Disconnected,
252 extension_ids: extension_ids.join(","),
253 }
254 }
255
256 async fn start(&mut self) -> Result<(), ClientError> {
257 assert_eq!(self.state, State::Disconnected);
258 self.publisher
259 .publish_retained("$homie", HOMIE_VERSION)
260 .await?;
261 self.publisher
262 .publish_retained("$extensions", self.extension_ids.as_str())
263 .await?;
264 self.publisher
265 .publish_retained("$implementation", HOMIE_IMPLEMENTATION)
266 .await?;
267 self.publisher
268 .publish_retained("$name", self.device_name.as_str())
269 .await?;
270 self.set_state(State::Init).await?;
271 Ok(())
272 }
273
274 fn spawn(
276 &self,
277 mut event_loop: EventLoop,
278 mut update_callback: Option<UpdateCallback>,
279 ) -> impl Future<Output = Result<(), SpawnError>> + use<> {
280 let device_base = format!("{}/", self.publisher.device_base);
281 let (incoming_tx, incoming_rx) = flume::unbounded();
282
283 let mqtt_task = task::spawn(async move {
284 loop {
285 let notification = event_loop.poll().await?;
286 log::trace!("Notification = {notification:?}");
287
288 if let Event::Incoming(incoming) = notification {
289 incoming_tx.send_async(incoming).await.map_err(|_| {
290 SpawnError::Internal("Incoming event channel receiver closed.")
291 })?;
292 }
293 }
294 });
295
296 let publisher = self.publisher.clone();
297 let incoming_task: JoinHandle<Result<(), SpawnError>> = task::spawn(async move {
298 loop {
299 if let Incoming::Publish(publish) = incoming_rx
300 .recv_async()
301 .await
302 .map_err(|_| SpawnError::Internal("Incoming event channel sender closed."))?
303 {
304 if let Some(rest) = publish.topic.strip_prefix(&device_base) {
305 if let ([node_id, property_id, "set"], Ok(payload)) = (
306 rest.split('/').collect::<Vec<&str>>().as_slice(),
307 str::from_utf8(&publish.payload),
308 ) {
309 log::trace!(
310 "set node {node_id:?} property {property_id:?} to {payload:?}"
311 );
312 if let Some(callback) = update_callback.as_mut() {
313 if let Some(value) = callback(
314 node_id.to_string(),
315 property_id.to_string(),
316 payload.to_string(),
317 )
318 .await
319 {
320 publisher
321 .publish_retained(
322 &format!("{node_id}/{property_id}"),
323 value,
324 )
325 .await?;
326 }
327 }
328 }
329 } else {
330 log::warn!("Unexpected publish: {publish:?}");
331 }
332 }
333 }
334 });
335 try_join_unit_handles(mqtt_task, incoming_task)
336 }
337
338 pub fn has_node(&self, node_id: &str) -> bool {
340 self.nodes.iter().any(|n| n.id == node_id)
341 }
342
343 pub async fn add_node(&mut self, node: Node) -> Result<(), ClientError> {
348 if self.has_node(&node.id) {
350 panic!("Tried to add node with duplicate ID: {node:?}");
351 }
352 self.nodes.push(node);
353 let node = &self.nodes[self.nodes.len() - 1];
356
357 self.publish_node(node).await?;
358 self.publish_nodes().await
359 }
360
361 pub async fn remove_node(&mut self, node_id: &str) -> Result<(), ClientError> {
363 let index = self.nodes.iter().position(|n| n.id == node_id).unwrap();
365 self.unpublish_node(&self.nodes[index]).await?;
366 self.nodes.remove(index);
367 self.publish_nodes().await
368 }
369
370 async fn publish_node(&self, node: &Node) -> Result<(), ClientError> {
371 self.publisher
372 .publish_retained(&format!("{}/$name", node.id), node.name.as_str())
373 .await?;
374 self.publisher
375 .publish_retained(&format!("{}/$type", node.id), node.node_type.as_str())
376 .await?;
377 let mut property_ids: Vec<&str> = vec![];
378 for property in &node.properties {
379 property_ids.push(&property.id);
380 self.publisher
381 .publish_retained(
382 &format!("{}/{}/$name", node.id, property.id),
383 property.name.as_str(),
384 )
385 .await?;
386 self.publisher
387 .publish_retained(
388 &format!("{}/{}/$datatype", node.id, property.id),
389 property.datatype,
390 )
391 .await?;
392 self.publisher
393 .publish_retained(
394 &format!("{}/{}/$settable", node.id, property.id),
395 if property.settable { "true" } else { "false" },
396 )
397 .await?;
398 self.publisher
399 .publish_retained(
400 &format!("{}/{}/$retained", node.id, property.id),
401 if property.retained { "true" } else { "false" },
402 )
403 .await?;
404 if let Some(unit) = &property.unit {
405 self.publisher
406 .publish_retained(&format!("{}/{}/$unit", node.id, property.id), unit.as_str())
407 .await?;
408 }
409 if let Some(format) = &property.format {
410 self.publisher
411 .publish_retained(
412 &format!("{}/{}/$format", node.id, property.id),
413 format.as_str(),
414 )
415 .await?;
416 }
417 if property.settable {
418 self.publisher
419 .subscribe(&format!("{}/{}/set", node.id, property.id))
420 .await?;
421 }
422 }
423 self.publisher
424 .publish_retained(&format!("{}/$properties", node.id), property_ids.join(","))
425 .await?;
426 Ok(())
427 }
428
429 async fn unpublish_node(&self, node: &Node) -> Result<(), ClientError> {
430 for property in &node.properties {
431 if property.settable {
432 self.publisher
433 .unsubscribe(&format!("{}/{}/set", node.id, property.id))
434 .await?;
435 }
436 }
437 Ok(())
438 }
439
440 async fn publish_nodes(&mut self) -> Result<(), ClientError> {
441 let node_ids = self
442 .nodes
443 .iter()
444 .map(|node| node.id.as_str())
445 .collect::<Vec<&str>>()
446 .join(",");
447 self.publisher.publish_retained("$nodes", node_ids).await
448 }
449
450 async fn set_state(&mut self, state: State) -> Result<(), ClientError> {
451 self.state = state;
452 self.publisher.publish_retained("$state", self.state).await
453 }
454
455 pub async fn ready(&mut self) -> Result<(), ClientError> {
459 assert!(&[State::Init, State::Sleeping, State::Alert].contains(&self.state));
460 self.set_state(State::Ready).await
461 }
462
463 pub async fn sleep(&mut self) -> Result<(), ClientError> {
466 assert_eq!(self.state, State::Ready);
467 self.set_state(State::Sleeping).await
468 }
469
470 pub async fn alert(&mut self) -> Result<(), ClientError> {
474 assert_eq!(self.state, State::Ready);
475 self.set_state(State::Alert).await
476 }
477
478 pub async fn disconnect(mut self) -> Result<(), ClientError> {
481 self.set_state(State::Disconnected).await?;
482 self.publisher.client.disconnect().await
483 }
484
485 pub async fn publish_value(
488 &self,
489 node_id: &str,
490 property_id: &str,
491 value: impl ToString,
492 ) -> Result<(), ClientError> {
493 self.publisher
494 .publish_retained(&format!("{node_id}/{property_id}"), value.to_string())
495 .await
496 }
497
498 pub async fn publish_nonretained_value(
501 &self,
502 node_id: &str,
503 property_id: &str,
504 value: impl ToString,
505 ) -> Result<(), ClientError> {
506 self.publisher
507 .publish_nonretained(&format!("{node_id}/{property_id}"), value.to_string())
508 .await
509 }
510}
511
512#[derive(Clone, Debug)]
513struct DevicePublisher {
514 pub client: AsyncClient,
515 device_base: String,
516}
517
518impl DevicePublisher {
519 fn new(client: AsyncClient, device_base: String) -> Self {
520 Self {
521 client,
522 device_base,
523 }
524 }
525
526 async fn publish_retained(
527 &self,
528 subtopic: &str,
529 value: impl Into<Vec<u8>>,
530 ) -> Result<(), ClientError> {
531 let topic = format!("{}/{}", self.device_base, subtopic);
532 self.client
533 .publish(topic, QoS::AtLeastOnce, true, value)
534 .await
535 }
536
537 async fn publish_nonretained(
538 &self,
539 subtopic: &str,
540 value: impl Into<Vec<u8>>,
541 ) -> Result<(), ClientError> {
542 let topic = format!("{}/{}", self.device_base, subtopic);
543 self.client
544 .publish(topic, QoS::AtLeastOnce, false, value)
545 .await
546 }
547
548 async fn subscribe(&self, subtopic: &str) -> Result<(), ClientError> {
549 let topic = format!("{}/{}", self.device_base, subtopic);
550 self.client.subscribe(topic, QoS::AtLeastOnce).await
551 }
552
553 async fn unsubscribe(&self, subtopic: &str) -> Result<(), ClientError> {
554 let topic = format!("{}/{}", self.device_base, subtopic);
555 self.client.unsubscribe(topic).await
556 }
557}
558
559#[derive(Debug)]
561struct HomieStats {
562 publisher: DevicePublisher,
563 start_time: Instant,
564}
565
566impl HomieStats {
567 const EXTENSION_ID: &'static str = "org.homie.legacy-stats:0.1.1:[4.x]";
568
569 fn new(publisher: DevicePublisher) -> Self {
570 let now = Instant::now();
571 Self {
572 publisher,
573 start_time: now,
574 }
575 }
576
577 async fn start(&self) -> Result<(), ClientError> {
579 self.publisher
580 .publish_retained("$stats/interval", STATS_INTERVAL.as_secs().to_string())
581 .await
582 }
583
584 fn spawn(self) -> impl Future<Output = Result<(), SpawnError>> {
586 let task: JoinHandle<Result<(), SpawnError>> = task::spawn(async move {
587 loop {
588 let uptime = Instant::now() - self.start_time;
589 self.publisher
590 .publish_retained("$stats/uptime", uptime.as_secs().to_string())
591 .await?;
592 sleep(STATS_INTERVAL).await;
593 }
594 });
595 task.map(|res| res?)
596 }
597}
598
599#[derive(Debug)]
601struct HomieFirmware {
602 publisher: DevicePublisher,
603 firmware_name: String,
604 firmware_version: String,
605}
606
607impl HomieFirmware {
608 const EXTENSION_ID: &'static str = "org.homie.legacy-firmware:0.1.1:[4.x]";
609
610 fn new(publisher: DevicePublisher, firmware_name: String, firmware_version: String) -> Self {
611 Self {
612 publisher,
613 firmware_name,
614 firmware_version,
615 }
616 }
617
618 async fn start(&self) -> Result<(), ClientError> {
620 self.publisher
621 .publish_retained("$localip", local_ipaddress::get().unwrap())
622 .await?;
623 self.publisher
624 .publish_retained("$mac", get_mac_address().unwrap().unwrap().to_string())
625 .await?;
626 self.publisher
627 .publish_retained("$fw/name", self.firmware_name.as_str())
628 .await?;
629 self.publisher
630 .publish_retained("$fw/version", self.firmware_version.as_str())
631 .await?;
632 Ok(())
633 }
634}
635
636fn try_join_handles<A, B, E>(
637 a: JoinHandle<Result<A, E>>,
638 b: JoinHandle<Result<B, E>>,
639) -> impl Future<Output = Result<(A, B), E>>
640where
641 E: From<JoinError>,
642{
643 try_join(a.map(|res| res?), b.map(|res| res?))
645}
646
647fn try_join_unit_handles<E>(
648 a: JoinHandle<Result<(), E>>,
649 b: JoinHandle<Result<(), E>>,
650) -> impl Future<Output = Result<(), E>>
651where
652 E: From<JoinError>,
653{
654 try_join_handles(a, b).map(simplify_unit_pair)
655}
656
657fn simplify_unit_pair<E>(m: Result<((), ()), E>) -> Result<(), E> {
658 m.map(|((), ())| ())
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use flume::Receiver;
665 use rumqttc::Request;
666
667 fn make_test_device() -> (HomieDevice, Receiver<Request>) {
668 let (requests_tx, requests_rx) = flume::unbounded();
669 let client = AsyncClient::from_senders(requests_tx);
670 let publisher = DevicePublisher::new(client, "homie/test-device".to_string());
671 let device = HomieDevice::new(publisher, "Test device".to_string(), &[]);
672 (device, requests_rx)
673 }
674
675 #[tokio::test]
676 #[should_panic(expected = "Tried to add node with duplicate ID")]
677 async fn add_node_fails_given_duplicate_id() {
678 let (mut device, rx) = make_test_device();
679
680 device
681 .add_node(Node::new("id", "Name", "type", vec![]))
682 .await
683 .unwrap();
684 device
685 .add_node(Node::new("id", "Name 2", "type2", vec![]))
686 .await
687 .unwrap();
688
689 drop(rx);
691 }
692
693 #[tokio::test]
694 #[should_panic(expected = "Init")]
695 async fn ready_fails_if_called_before_start() {
696 let (mut device, rx) = make_test_device();
697
698 device.ready().await.unwrap();
699
700 drop(rx);
702 }
703
704 #[tokio::test]
705 async fn start_succeeds_with_no_nodes() -> Result<(), ClientError> {
706 let (mut device, rx) = make_test_device();
707
708 device.start().await?;
709 device.ready().await?;
710
711 drop(rx);
713 Ok(())
714 }
715
716 #[tokio::test]
717 async fn sleep_then_ready_again_succeeds() -> Result<(), ClientError> {
718 let (mut device, rx) = make_test_device();
719
720 device.start().await?;
721 device.ready().await?;
722 device.sleep().await?;
723 device.ready().await?;
724
725 drop(rx);
727 Ok(())
728 }
729
730 #[tokio::test]
731 async fn alert_then_ready_again_succeeds() -> Result<(), ClientError> {
732 let (mut device, rx) = make_test_device();
733
734 device.start().await?;
735 device.ready().await?;
736 device.alert().await?;
737 device.ready().await?;
738
739 drop(rx);
741 Ok(())
742 }
743
744 #[tokio::test]
745 async fn disconnect_succeeds_before_ready() -> Result<(), ClientError> {
746 let (mut device, rx) = make_test_device();
747
748 device.start().await?;
749 device.disconnect().await?;
750
751 drop(rx);
753 Ok(())
754 }
755
756 #[tokio::test]
757 async fn disconnect_succeeds_after_ready() -> Result<(), ClientError> {
758 let (mut device, rx) = make_test_device();
759
760 device.start().await?;
761 device.ready().await?;
762 device.disconnect().await?;
763
764 drop(rx);
766 Ok(())
767 }
768
769 #[tokio::test]
770 async fn minimal_build_succeeds() -> Result<(), ClientError> {
771 let builder = HomieDevice::builder(
772 "homie/test-device",
773 "Test device",
774 MqttOptions::new("client_id", "hostname", 1234),
775 );
776
777 let (_event_loop, homie, _stats, firmware, _callback) = builder.build();
778
779 assert_eq!(homie.device_name, "Test device");
780 assert_eq!(homie.publisher.device_base, "homie/test-device");
781 assert!(firmware.is_none());
782
783 Ok(())
784 }
785
786 #[tokio::test]
787 async fn set_firmware_build_succeeds() -> Result<(), ClientError> {
788 let mut builder = HomieDevice::builder(
789 "homie/test-device",
790 "Test device",
791 MqttOptions::new("client_id", "hostname", 1234),
792 );
793
794 builder.set_firmware("firmware_name", "firmware_version");
795
796 let (_event_loop, homie, _stats, firmware, _callback) = builder.build();
797
798 assert_eq!(homie.device_name, "Test device");
799 assert_eq!(homie.publisher.device_base, "homie/test-device");
800 let firmware = firmware.unwrap();
801 assert_eq!(firmware.firmware_name, "firmware_name");
802 assert_eq!(firmware.firmware_version, "firmware_version");
803
804 Ok(())
805 }
806
807 #[tokio::test]
808 async fn add_node_succeeds_before_and_after_start() -> Result<(), ClientError> {
809 let (mut device, rx) = make_test_device();
810
811 device
812 .add_node(Node::new("id", "Name", "type", vec![]))
813 .await?;
814
815 device.start().await?;
816 device.ready().await?;
817
818 device
820 .add_node(Node::new("id2", "Name 2", "type2", vec![]))
821 .await?;
822
823 drop(rx);
825 Ok(())
826 }
827
828 #[tokio::test]
830 async fn add_node_succeeds_after_remove() -> Result<(), ClientError> {
831 let (mut device, rx) = make_test_device();
832
833 device
834 .add_node(Node::new("id", "Name", "type", vec![]))
835 .await?;
836
837 device.remove_node("id").await?;
838
839 device
841 .add_node(Node::new("id", "Name", "type", vec![]))
842 .await?;
843
844 drop(rx);
846 Ok(())
847 }
848
849 #[tokio::test]
851 async fn has_node() -> Result<(), ClientError> {
852 let (mut device, rx) = make_test_device();
853
854 assert!(!device.has_node("id"));
855
856 device
857 .add_node(Node::new("id", "Name", "type", vec![]))
858 .await?;
859 assert!(device.has_node("id"));
860
861 device.remove_node("id").await?;
862 assert!(!device.has_node("id"));
863
864 drop(rx);
866 Ok(())
867 }
868}