homie_controller/
lib.rs

1//! `homie-controller` is a library for creating controllers to interact via an MQTT broker with IoT
2//! devices implementing the [Homie convention](https://homieiot.github.io/).
3
4use rumqttc::{
5    AsyncClient, ClientError, ConnectionError, EventLoop, Incoming, MqttOptions, Publish, QoS,
6};
7use std::collections::HashMap;
8use std::num::{ParseFloatError, ParseIntError};
9use std::str;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use thiserror::Error;
13
14mod types;
15pub use types::{Datatype, Device, Extension, Node, Property, State};
16use types::{ParseDatatypeError, ParseExtensionError, ParseStateError};
17
18mod values;
19pub use values::{
20    ColorFormat, ColorHsv, ColorRgb, EnumValue, ParseColorError, ParseEnumError, Value, ValueError,
21};
22
23const REQUESTS_CAP: usize = 1000;
24
25/// An error encountered while polling a `HomieController`.
26#[derive(Error, Debug)]
27pub enum PollError {
28    /// Error sending to the MQTT broker.
29    #[error("{0}")]
30    Client(#[from] ClientError),
31    /// Error connecting to or communicating with the MQTT broker.
32    #[error("{0}")]
33    Connection(#[from] ConnectionError),
34}
35
36/// An event from a Homie device, either because of a property change or because something new has
37/// been discovered.
38#[derive(Clone, Debug, Eq, PartialEq)]
39pub enum Event {
40    /// A new device has been discovered, or an attribute of the device has been updated.
41    DeviceUpdated {
42        device_id: String,
43        has_required_attributes: bool,
44    },
45    /// An attribute of a node on a device has been updated.
46    NodeUpdated {
47        device_id: String,
48        node_id: String,
49        has_required_attributes: bool,
50    },
51    /// An attribute of a property on a node has been updated.
52    PropertyUpdated {
53        device_id: String,
54        node_id: String,
55        property_id: String,
56        has_required_attributes: bool,
57    },
58    /// The value of a property has changed.
59    PropertyValueChanged {
60        device_id: String,
61        node_id: String,
62        property_id: String,
63        /// The new value.
64        value: String,
65        /// Whether the new value is fresh, i.e. it has just been sent by the device, as opposed to
66        /// being the initial value because the controller just connected to the MQTT broker.
67        fresh: bool,
68    },
69    /// Connected to the MQTT broker. This could be either the initial connection or a reconnection
70    /// after the connection was dropped for some reason.
71    Connected,
72}
73
74impl Event {
75    fn device_updated(device: &Device) -> Self {
76        Event::DeviceUpdated {
77            device_id: device.id.to_owned(),
78            has_required_attributes: device.has_required_attributes(),
79        }
80    }
81
82    fn node_updated(device_id: &str, node: &Node) -> Self {
83        Event::NodeUpdated {
84            device_id: device_id.to_owned(),
85            node_id: node.id.to_owned(),
86            has_required_attributes: node.has_required_attributes(),
87        }
88    }
89
90    fn property_updated(device_id: &str, node_id: &str, property: &Property) -> Self {
91        Event::PropertyUpdated {
92            device_id: device_id.to_owned(),
93            node_id: node_id.to_owned(),
94            property_id: property.id.to_owned(),
95            has_required_attributes: property.has_required_attributes(),
96        }
97    }
98
99    fn property_value(device_id: &str, node_id: &str, property: &Property, fresh: bool) -> Self {
100        Event::PropertyValueChanged {
101            device_id: device_id.to_owned(),
102            node_id: node_id.to_owned(),
103            property_id: property.id.to_owned(),
104            value: property.value.to_owned().unwrap(),
105            fresh,
106        }
107    }
108}
109
110/// A Homie controller, which connects to an MQTT broker and interacts with Homie devices.
111#[derive(Debug)]
112pub struct HomieController {
113    mqtt_client: AsyncClient,
114    base_topic: String,
115    /// The set of Homie devices which have been discovered so far, keyed by their IDs.
116    // TODO: Consider using Mutex<im::HashMap<...>> instead.
117    devices: Mutex<Arc<HashMap<String, Device>>>,
118    /// temporarily holds retained property payloads that were received before their nodes'
119    /// $properties. The stored payloads are consumed when $properties is received.
120    early_property_values: Mutex<HashMap<String, String>>,
121}
122
123pub struct HomieEventLoop {
124    event_loop: EventLoop,
125}
126
127impl HomieEventLoop {
128    fn new(event_loop: EventLoop) -> HomieEventLoop {
129        HomieEventLoop { event_loop }
130    }
131}
132
133/// Internal struct for the return value of HomieController::handle_publish_sync()
134struct PublishResponse {
135    events: Vec<Event>,
136    topics_to_subscribe: Vec<String>,
137    topics_to_unsubscribe: Vec<String>,
138}
139
140impl HomieController {
141    /// Create a new `HomieController` connected to an MQTT broker.
142    ///
143    /// # Arguments
144    /// * `base_topic`: The Homie [base topic](https://homieiot.github.io/specification/#base-topic)
145    ///   under which to look for Homie devices. "homie" is the recommended default.
146    /// * `mqtt_options`: Options for the MQTT connection, including which broker to connect to.
147    pub fn new(mqtt_options: MqttOptions, base_topic: &str) -> (HomieController, HomieEventLoop) {
148        let (mqtt_client, event_loop) = AsyncClient::new(mqtt_options, REQUESTS_CAP);
149        let controller = HomieController {
150            mqtt_client,
151            base_topic: base_topic.to_string(),
152            devices: Mutex::new(Arc::new(HashMap::new())),
153            early_property_values: Mutex::new(HashMap::new()),
154        };
155        (controller, HomieEventLoop::new(event_loop))
156    }
157
158    /// Get a snapshot of the set of Homie devices which have been discovered so far, keyed by their
159    /// IDs.
160    pub fn devices(&self) -> Arc<HashMap<String, Device>> {
161        self.devices.lock().unwrap().clone()
162    }
163
164    /// Get the Homie base topic which the controller was configured to use.
165    pub fn base_topic(&self) -> &str {
166        &self.base_topic
167    }
168
169    /// Poll the `EventLoop`, and maybe return a Homie event.
170    pub async fn poll(&self, event_loop: &mut HomieEventLoop) -> Result<Vec<Event>, PollError> {
171        let notification = event_loop.event_loop.poll().await?;
172        log::trace!("Notification = {notification:?}");
173
174        if let rumqttc::Event::Incoming(incoming) = notification {
175            self.handle_event(incoming).await
176        } else {
177            Ok(vec![])
178        }
179    }
180
181    async fn handle_event(&self, incoming: Incoming) -> Result<Vec<Event>, PollError> {
182        match incoming {
183            Incoming::Publish(publish) => match self.handle_publish(publish).await {
184                Err(HandleError::Warning(err)) => {
185                    // These error strings indicate some issue with parsing the publish
186                    // event from the network, perhaps due to a malfunctioning device,
187                    // so should just be logged and ignored.
188                    log::warn!("{err}");
189                    Ok(vec![])
190                }
191                Err(HandleError::Fatal(e)) => Err(e.into()),
192                Ok(events) => Ok(events),
193            },
194            Incoming::ConnAck(_) => {
195                // We have connected or reconnected, so make our initial subscription to start
196                // discovering Homie devices.
197                self.start().await?;
198                Ok(vec![Event::Connected])
199            }
200            _ => Ok(vec![]),
201        }
202    }
203
204    /// Handle a publish event received from the MQTT broker, updating the devices and our
205    /// subscriptions as appropriate and possibly returning an event to send back to the controller
206    /// application.
207    async fn handle_publish(&self, publish: Publish) -> Result<Vec<Event>, HandleError> {
208        let PublishResponse {
209            events,
210            topics_to_subscribe,
211            topics_to_unsubscribe,
212        } = self.handle_publish_sync(publish)?;
213
214        for topic in topics_to_subscribe {
215            log::trace!("Subscribe to {topic}");
216            self.mqtt_client.subscribe(topic, QoS::AtLeastOnce).await?;
217        }
218        for topic in topics_to_unsubscribe {
219            log::trace!("Unsubscribe from {topic}");
220            self.mqtt_client.unsubscribe(topic).await?;
221        }
222
223        Ok(events)
224    }
225
226    /// Handle a publish event, update the devices, and return any event and any new topics which
227    /// should be subscribed to or unsubscribed from.
228    ///
229    /// This is separate from `handle_publish` because it takes the `devices` lock, to ensure that
230    /// no async operations are awaited while the lock is held.
231    fn handle_publish_sync(&self, publish: Publish) -> Result<PublishResponse, HandleError> {
232        let base_topic = format!("{}/", self.base_topic);
233        let payload = str::from_utf8(&publish.payload)
234            .map_err(|e| format!("Payload not valid UTF-8: {e}"))?;
235        let subtopic = publish
236            .topic
237            .strip_prefix(&base_topic)
238            .ok_or_else(|| format!("Publish with unexpected topic: {publish:?}"))?;
239
240        // If there are no other references to the devices this will give us a mutable reference
241        // directly. If there are other references it will clone the underlying HashMap and update
242        // our Arc to point to that, so that it is now a unique reference.
243        let devices = &mut *self.devices.lock().unwrap();
244        let devices = Arc::make_mut(devices);
245
246        let early_property_values = &mut *self.early_property_values.lock().unwrap();
247
248        // Collect MQTT topics to which we need to subscribe or unsubscribe here, so that the
249        // subscription can happen after the devices lock has been released.
250        let mut topics_to_subscribe: Vec<String> = vec![];
251        let mut topics_to_unsubscribe: Vec<String> = vec![];
252
253        let parts = subtopic.split('/').collect::<Vec<&str>>();
254        let events = match parts.as_slice() {
255            [device_id, "$homie"] => {
256                if !devices.contains_key(*device_id) {
257                    log::trace!("Homie device '{device_id}' version '{payload}'");
258                    devices.insert((*device_id).to_owned(), Device::new(device_id, payload));
259                    topics_to_subscribe.push(format!("{}/{}/+", self.base_topic, device_id));
260                    topics_to_subscribe.push(format!("{}/{}/$fw/+", self.base_topic, device_id));
261                    topics_to_subscribe.push(format!("{}/{}/$stats/+", self.base_topic, device_id));
262                    vec![Event::DeviceUpdated {
263                        device_id: (*device_id).to_owned(),
264                        has_required_attributes: false,
265                    }]
266                } else {
267                    vec![]
268                }
269            }
270            [device_id, "$name"] => {
271                let device = get_mut_device_for(devices, "Got name for", device_id)?;
272                device.name = Some(payload.to_owned());
273                vec![Event::device_updated(device)]
274            }
275            [device_id, "$state"] => {
276                let state = payload.parse()?;
277                let device = get_mut_device_for(devices, "Got state for", device_id)?;
278                device.state = state;
279                vec![Event::device_updated(device)]
280            }
281            [device_id, "$implementation"] => {
282                let device = get_mut_device_for(devices, "Got implementation for", device_id)?;
283                device.implementation = Some(payload.to_owned());
284                vec![Event::device_updated(device)]
285            }
286            [device_id, "$extensions"] => {
287                let device = get_mut_device_for(devices, "Got extensions for", device_id)?;
288                device.extensions = payload
289                    .split(',')
290                    .map(|part| part.parse())
291                    .collect::<Result<Vec<_>, _>>()?;
292                vec![Event::device_updated(device)]
293            }
294            [device_id, "$localip"] => {
295                let device = get_mut_device_for(devices, "Got localip for", device_id)?;
296                device.local_ip = Some(payload.to_owned());
297                vec![Event::device_updated(device)]
298            }
299            [device_id, "$mac"] => {
300                let device = get_mut_device_for(devices, "Got mac for", device_id)?;
301                device.mac = Some(payload.to_owned());
302                vec![Event::device_updated(device)]
303            }
304            [device_id, "$fw", "name"] => {
305                let device = get_mut_device_for(devices, "Got fw/name for", device_id)?;
306                device.firmware_name = Some(payload.to_owned());
307                vec![Event::device_updated(device)]
308            }
309            [device_id, "$fw", "version"] => {
310                let device = get_mut_device_for(devices, "Got fw/version for", device_id)?;
311                device.firmware_version = Some(payload.to_owned());
312                vec![Event::device_updated(device)]
313            }
314            [_device_id, "$stats"] => {
315                // Homie 3.0 list of available stats. We don't need this, so ignore it without
316                // logging a warning.
317                vec![]
318            }
319            [device_id, "$stats", "interval"] => {
320                let interval = payload.parse()?;
321                let device = get_mut_device_for(devices, "Got stats/interval for", device_id)?;
322                device.stats_interval = Some(Duration::from_secs(interval));
323                vec![Event::device_updated(device)]
324            }
325            [device_id, "$stats", "uptime"] => {
326                let uptime = payload.parse()?;
327                let device = get_mut_device_for(devices, "Got stats/uptime for", device_id)?;
328                device.stats_uptime = Some(Duration::from_secs(uptime));
329                vec![Event::device_updated(device)]
330            }
331            [device_id, "$stats", "signal"] => {
332                let signal = payload.parse()?;
333                let device = get_mut_device_for(devices, "Got stats/signal for", device_id)?;
334                device.stats_signal = Some(signal);
335                vec![Event::device_updated(device)]
336            }
337            [device_id, "$stats", "cputemp"] => {
338                let cputemp = payload.parse()?;
339                let device = get_mut_device_for(devices, "Got stats/cputemp for", device_id)?;
340                device.stats_cputemp = Some(cputemp);
341                vec![Event::device_updated(device)]
342            }
343            [device_id, "$stats", "cpuload"] => {
344                let cpuload = payload.parse()?;
345                let device = get_mut_device_for(devices, "Got stats/cpuload for", device_id)?;
346                device.stats_cpuload = Some(cpuload);
347                vec![Event::device_updated(device)]
348            }
349            [device_id, "$stats", "battery"] => {
350                let battery = payload.parse()?;
351                let device = get_mut_device_for(devices, "Got stats/battery for", device_id)?;
352                device.stats_battery = Some(battery);
353                vec![Event::device_updated(device)]
354            }
355            [device_id, "$stats", "freeheap"] => {
356                let freeheap = payload.parse()?;
357                let device = get_mut_device_for(devices, "Got stats/freeheap for", device_id)?;
358                device.stats_freeheap = Some(freeheap);
359                vec![Event::device_updated(device)]
360            }
361            [device_id, "$stats", "supply"] => {
362                let supply = payload.parse()?;
363                let device = get_mut_device_for(devices, "Got stats/supply for", device_id)?;
364                device.stats_supply = Some(supply);
365                vec![Event::device_updated(device)]
366            }
367            [device_id, "$nodes"] => {
368                let nodes: Vec<_> = payload.split(',').collect();
369                let device = get_mut_device_for(devices, "Got nodes for", device_id)?;
370
371                // Remove nodes which aren't in the new list.
372                device.nodes.retain(|node_id, node| {
373                    let kept = nodes.contains(&node_id.as_ref());
374                    if !kept {
375                        // The node has been removed, so unsubscribe from its topics and those of its properties
376                        let node_topic = format!("{}/{}/{}/+", self.base_topic, device_id, node_id);
377                        topics_to_unsubscribe.push(node_topic);
378                        for property_id in node.properties.keys() {
379                            let topic = format!(
380                                "{}/{}/{}/{}/+",
381                                self.base_topic, device_id, node_id, property_id
382                            );
383                            topics_to_unsubscribe.push(topic);
384                        }
385                    }
386                    kept
387                });
388
389                // Add new nodes.
390                for node_id in nodes {
391                    if !device.nodes.contains_key(node_id) {
392                        device.add_node(Node::new(node_id));
393                        let topic = format!("{}/{}/{}/+", self.base_topic, device_id, node_id);
394                        topics_to_subscribe.push(topic);
395                    }
396                }
397
398                vec![Event::device_updated(device)]
399            }
400            [device_id, node_id, "$name"] => {
401                let node = get_mut_node_for(devices, "Got node name for", device_id, node_id)?;
402                node.name = Some(payload.to_owned());
403                vec![Event::node_updated(device_id, node)]
404            }
405            [device_id, node_id, "$type"] => {
406                let node = get_mut_node_for(devices, "Got node type for", device_id, node_id)?;
407                node.node_type = Some(payload.to_owned());
408                vec![Event::node_updated(device_id, node)]
409            }
410            [device_id, node_id, "$properties"] => {
411                let properties: Vec<_> = payload.split(',').collect();
412                let node = get_mut_node_for(devices, "Got properties for", device_id, node_id)?;
413
414                // Remove properties which aren't in the new list.
415                node.properties.retain(|property_id, _| {
416                    let kept = properties.contains(&property_id.as_ref());
417                    if !kept {
418                        // The property has been removed, so unsubscribe from its topics.
419                        let topic = format!(
420                            "{}/{}/{}/{}/+",
421                            self.base_topic, device_id, node_id, property_id
422                        );
423                        topics_to_unsubscribe.push(topic);
424                    }
425                    kept
426                });
427
428                let mut events = vec![Event::node_updated(device_id, node)];
429
430                // Add new properties.
431                for property_id in properties {
432                    if !node.properties.contains_key(property_id) {
433                        let mut new_prop = Property::new(property_id);
434
435                        let key = format!("{device_id}/{node_id}/{property_id}");
436                        new_prop.value = early_property_values.remove(&key);
437
438                        if let Some(value) = new_prop.value.clone() {
439                            events.push(Event::PropertyValueChanged {
440                                device_id: device_id.to_string(),
441                                node_id: node_id.to_string(),
442                                property_id: property_id.to_string(),
443                                value,
444                                fresh: false,
445                            });
446                        }
447
448                        node.add_property(new_prop);
449                        let topic = format!(
450                            "{}/{}/{}/{}/+",
451                            self.base_topic, device_id, node_id, property_id
452                        );
453                        topics_to_subscribe.push(topic);
454                    }
455                }
456
457                events
458            }
459            [device_id, node_id, property_id, "$name"] => {
460                let property = get_mut_property_for(
461                    devices,
462                    "Got property name for",
463                    device_id,
464                    node_id,
465                    property_id,
466                )?;
467                property.name = Some(payload.to_owned());
468                vec![Event::property_updated(device_id, node_id, property)]
469            }
470            [device_id, node_id, property_id, "$datatype"] => {
471                let datatype = payload.parse()?;
472                let property = get_mut_property_for(
473                    devices,
474                    "Got property datatype for",
475                    device_id,
476                    node_id,
477                    property_id,
478                )?;
479                property.datatype = Some(datatype);
480                vec![Event::property_updated(device_id, node_id, property)]
481            }
482            [device_id, node_id, property_id, "$unit"] => {
483                let property = get_mut_property_for(
484                    devices,
485                    "Got property unit for",
486                    device_id,
487                    node_id,
488                    property_id,
489                )?;
490                property.unit = Some(payload.to_owned());
491                vec![Event::property_updated(device_id, node_id, property)]
492            }
493            [device_id, node_id, property_id, "$format"] => {
494                let property = get_mut_property_for(
495                    devices,
496                    "Got property format for",
497                    device_id,
498                    node_id,
499                    property_id,
500                )?;
501                property.format = Some(payload.to_owned());
502                vec![Event::property_updated(device_id, node_id, property)]
503            }
504            [device_id, node_id, property_id, "$settable"] => {
505                let settable = payload
506                    .parse()
507                    .map_err(|_| format!("Invalid boolean '{payload}' for $settable."))?;
508                let property = get_mut_property_for(
509                    devices,
510                    "Got property settable for",
511                    device_id,
512                    node_id,
513                    property_id,
514                )?;
515                property.settable = settable;
516                vec![Event::property_updated(device_id, node_id, property)]
517            }
518            [device_id, node_id, property_id, "$retained"] => {
519                let retained = payload
520                    .parse()
521                    .map_err(|_| format!("Invalid boolean '{payload}' for $retained."))?;
522                let property = get_mut_property_for(
523                    devices,
524                    "Got property retained for",
525                    device_id,
526                    node_id,
527                    property_id,
528                )?;
529                property.retained = retained;
530                vec![Event::property_updated(device_id, node_id, property)]
531            }
532            [device_id, node_id, property_id]
533                if !device_id.starts_with('$')
534                    && !node_id.starts_with('$')
535                    && !property_id.starts_with('$') =>
536            {
537                match get_mut_property_for(
538                    devices,
539                    "Got property value for",
540                    device_id,
541                    node_id,
542                    property_id,
543                ) {
544                    Ok(property) => {
545                        property.value = Some(payload.to_owned());
546                        vec![Event::property_value(
547                            device_id,
548                            node_id,
549                            property,
550                            !publish.retain,
551                        )]
552                    }
553
554                    Err(_) if publish.retain => {
555                        // temporarily store payloads for unknown properties to prevent
556                        // a race condition when the broker sends out the property
557                        // payloads before $properties
558                        early_property_values.insert(subtopic.to_owned(), payload.to_owned());
559
560                        vec![]
561                    }
562
563                    Err(e) => return Err(e.into()),
564                }
565            }
566            [_device_id, _node_id, _property_id, "set"] => {
567                // Value set message may have been sent by us or another controller. Either way,
568                // ignore it, it is only for the device.
569                vec![]
570            }
571            _ => {
572                log::warn!("Unexpected subtopic {subtopic} = {payload}");
573                vec![]
574            }
575        };
576
577        Ok(PublishResponse {
578            events,
579            topics_to_subscribe,
580            topics_to_unsubscribe,
581        })
582    }
583
584    /// Start discovering Homie devices.
585    async fn start(&self) -> Result<(), ClientError> {
586        // Clear set of known devices so that we correctly subscribe to their topics again.
587        *self.devices.lock().unwrap() = Arc::new(HashMap::new());
588
589        let topic = format!("{}/+/$homie", self.base_topic);
590        log::trace!("Subscribe to {topic}");
591        self.mqtt_client.subscribe(topic, QoS::AtLeastOnce).await
592    }
593
594    /// Attempt to set the state of a settable property of a device. If this succeeds the device
595    /// will update the value of the property.
596    pub async fn set(
597        &self,
598        device_id: &str,
599        node_id: &str,
600        property_id: &str,
601        value: impl Value,
602    ) -> Result<(), ClientError> {
603        let topic = format!(
604            "{}/{}/{}/{}/set",
605            self.base_topic, device_id, node_id, property_id
606        );
607        self.mqtt_client
608            .publish(topic, QoS::AtLeastOnce, false, value.to_string())
609            .await
610    }
611
612    /// Disconnect from the MQTT broker.
613    pub async fn disconnect(&self) -> Result<(), ClientError> {
614        self.mqtt_client.disconnect().await
615    }
616}
617
618fn get_mut_device_for<'a>(
619    devices: &'a mut HashMap<String, Device>,
620    err_prefix: &str,
621    device_id: &str,
622) -> Result<&'a mut Device, String> {
623    devices
624        .get_mut(device_id)
625        .ok_or_else(|| format!("{err_prefix} unknown device '{device_id}'"))
626}
627
628fn get_mut_node_for<'a>(
629    devices: &'a mut HashMap<String, Device>,
630    err_prefix: &str,
631    device_id: &str,
632    node_id: &str,
633) -> Result<&'a mut Node, String> {
634    let device = get_mut_device_for(devices, err_prefix, device_id)?;
635    device
636        .nodes
637        .get_mut(node_id)
638        .ok_or_else(|| format!("{err_prefix} unknown node '{device_id}/{node_id}'"))
639}
640
641fn get_mut_property_for<'a>(
642    devices: &'a mut HashMap<String, Device>,
643    err_prefix: &str,
644    device_id: &str,
645    node_id: &str,
646    property_id: &str,
647) -> Result<&'a mut Property, String> {
648    let node = get_mut_node_for(devices, err_prefix, device_id, node_id)?;
649    node.properties.get_mut(property_id).ok_or_else(|| {
650        format!("{err_prefix} unknown property '{device_id}/{node_id}/{property_id}'")
651    })
652}
653
654#[derive(Error, Debug)]
655enum HandleError {
656    #[error("{0}")]
657    Warning(String),
658    #[error("{0}")]
659    Fatal(#[from] ClientError),
660}
661
662impl From<String> for HandleError {
663    fn from(s: String) -> Self {
664        HandleError::Warning(s)
665    }
666}
667
668impl From<ParseStateError> for HandleError {
669    fn from(e: ParseStateError) -> Self {
670        HandleError::Warning(e.to_string())
671    }
672}
673
674impl From<ParseDatatypeError> for HandleError {
675    fn from(e: ParseDatatypeError) -> Self {
676        HandleError::Warning(e.to_string())
677    }
678}
679
680impl From<ParseExtensionError> for HandleError {
681    fn from(e: ParseExtensionError) -> Self {
682        HandleError::Warning(e.to_string())
683    }
684}
685
686impl From<ParseIntError> for HandleError {
687    fn from(e: ParseIntError) -> Self {
688        HandleError::Warning(format!("Invalid integer: {e}"))
689    }
690}
691
692impl From<ParseFloatError> for HandleError {
693    fn from(e: ParseFloatError) -> Self {
694        HandleError::Warning(format!("Invalid float: {e}"))
695    }
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use flume::Receiver;
702    use rumqttc::{ConnAck, Packet, Request, Subscribe};
703
704    fn make_test_controller() -> (HomieController, Receiver<Request>) {
705        let (requests_tx, requests_rx) = flume::unbounded();
706        let mqtt_client = AsyncClient::from_senders(requests_tx);
707        let controller = HomieController {
708            base_topic: "base_topic".to_owned(),
709            mqtt_client,
710            devices: Mutex::new(Arc::new(HashMap::new())),
711            early_property_values: Mutex::new(HashMap::new()),
712        };
713        (controller, requests_rx)
714    }
715
716    fn expect_subscriptions(requests_rx: &Receiver<Request>, subscription_topics: &[&str]) {
717        let requests: Vec<_> = subscription_topics
718            .iter()
719            .map(|_| requests_rx.try_recv().unwrap())
720            .collect();
721
722        for topic in subscription_topics {
723            let expected = Request::Subscribe(Subscribe::new(*topic, QoS::AtLeastOnce));
724            assert!(requests.contains(&expected));
725        }
726    }
727
728    async fn connect(controller: &HomieController) -> Result<Vec<Event>, PollError> {
729        controller
730            .handle_event(Packet::ConnAck(ConnAck::new(
731                rumqttc::ConnectReturnCode::Success,
732                false,
733            )))
734            .await
735    }
736
737    async fn publish(
738        controller: &HomieController,
739        topic: &str,
740        payload: &str,
741    ) -> Result<Vec<Event>, PollError> {
742        controller
743            .handle_event(Packet::Publish(Publish::new(
744                topic,
745                QoS::AtLeastOnce,
746                payload,
747            )))
748            .await
749    }
750
751    async fn publish_retained(
752        controller: &HomieController,
753        topic: &str,
754        payload: &str,
755    ) -> Result<Vec<Event>, PollError> {
756        let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
757
758        publish.retain = true;
759
760        controller.handle_event(Packet::Publish(publish)).await
761    }
762
763    fn property_set(properties: Vec<Property>) -> HashMap<String, Property> {
764        properties
765            .into_iter()
766            .map(|property| (property.id.clone(), property))
767            .collect()
768    }
769
770    fn node_set(nodes: Vec<Node>) -> HashMap<String, Node> {
771        nodes
772            .into_iter()
773            .map(|node| (node.id.clone(), node))
774            .collect()
775    }
776
777    #[tokio::test]
778    async fn subscribes_to_things() -> Result<(), Box<dyn std::error::Error>> {
779        let (controller, requests_rx) = make_test_controller();
780
781        // Connecting should start discovering.
782        connect(&controller).await?;
783        expect_subscriptions(&requests_rx, &["base_topic/+/$homie"]);
784
785        // Discover a new device.
786        publish(&controller, "base_topic/device_id/$homie", "4.0").await?;
787        expect_subscriptions(
788            &requests_rx,
789            &[
790                "base_topic/device_id/+",
791                "base_topic/device_id/$fw/+",
792                "base_topic/device_id/$stats/+",
793            ],
794        );
795
796        // Discover a node on the device.
797        publish(&controller, "base_topic/device_id/$nodes", "node_id").await?;
798        expect_subscriptions(&requests_rx, &["base_topic/device_id/node_id/+"]);
799
800        // Discover a property on the node.
801        publish(
802            &controller,
803            "base_topic/device_id/node_id/$properties",
804            "property_id",
805        )
806        .await?;
807        expect_subscriptions(
808            &requests_rx,
809            &["base_topic/device_id/node_id/property_id/+"],
810        );
811
812        // No more subscriptions.
813        assert!(requests_rx.is_empty());
814
815        Ok(())
816    }
817
818    #[tokio::test]
819    async fn retained_payloads_before_properties() -> Result<(), Box<dyn std::error::Error>> {
820        let (controller, _requests_rx) = make_test_controller();
821
822        // Connecting should start discovering.
823        connect(&controller).await?;
824
825        // Discover a new device.
826        publish_retained(&controller, "base_topic/device_id/$homie", "4.0").await?;
827
828        // Discover a node on the device.
829        publish_retained(
830            &controller,
831            "base_topic/device_id/$nodes",
832            "node_id,second_node",
833        )
834        .await?;
835
836        // Get the property payload before $properties
837        publish_retained(
838            &controller,
839            "base_topic/device_id/node_id/property_id",
840            "HELLO WORLD",
841        )
842        .await?;
843
844        // discover the property after its payload
845        publish_retained(
846            &controller,
847            "base_topic/device_id/node_id/$properties",
848            "property_id",
849        )
850        .await?;
851
852        publish_retained(
853            &controller,
854            "base_topic/device_id/second_node/property_id",
855            "hello again",
856        )
857        .await?;
858
859        // discover the property after its payload
860        publish_retained(
861            &controller,
862            "base_topic/device_id/second_node/$properties",
863            "property_id",
864        )
865        .await?;
866
867        assert_eq!(
868            controller
869                .devices()
870                .get("device_id")
871                .unwrap()
872                .nodes
873                .get("node_id")
874                .unwrap()
875                .properties
876                .get("property_id")
877                .unwrap()
878                .value
879                .as_deref(),
880            Some("HELLO WORLD")
881        );
882
883        assert_eq!(
884            controller
885                .devices()
886                .get("device_id")
887                .unwrap()
888                .nodes
889                .get("second_node")
890                .unwrap()
891                .properties
892                .get("property_id")
893                .unwrap()
894                .value
895                .as_deref(),
896            Some("hello again")
897        );
898
899        Ok(())
900    }
901
902    #[tokio::test]
903    async fn emits_appropriate_events() -> Result<(), Box<dyn std::error::Error>> {
904        let (controller, _requests_rx) = make_test_controller();
905
906        // Start discovering.
907        assert_eq!(connect(&controller).await?, vec![Event::Connected]);
908
909        // Discover a new device.
910        assert_eq!(
911            publish(&controller, "base_topic/device_id/$homie", "4.0").await?,
912            vec![Event::DeviceUpdated {
913                device_id: "device_id".to_owned(),
914                has_required_attributes: false
915            }]
916        );
917        assert_eq!(
918            publish(&controller, "base_topic/device_id/$name", "Device name").await?,
919            vec![Event::DeviceUpdated {
920                device_id: "device_id".to_owned(),
921                has_required_attributes: false
922            }]
923        );
924        assert_eq!(
925            publish(&controller, "base_topic/device_id/$state", "ready").await?,
926            vec![Event::DeviceUpdated {
927                device_id: "device_id".to_owned(),
928                has_required_attributes: true
929            }]
930        );
931        let mut expected_device = Device::new("device_id", "4.0");
932        expected_device.state = State::Ready;
933        expected_device.name = Some("Device name".to_owned());
934        assert_eq!(
935            controller.devices().get("device_id").unwrap().to_owned(),
936            expected_device
937        );
938
939        // A node on the device.
940        assert_eq!(
941            publish(&controller, "base_topic/device_id/$nodes", "node_id").await?,
942            vec![Event::DeviceUpdated {
943                device_id: "device_id".to_owned(),
944                has_required_attributes: false
945            }]
946        );
947        assert_eq!(
948            publish(
949                &controller,
950                "base_topic/device_id/node_id/$name",
951                "Node name"
952            )
953            .await?,
954            vec![Event::NodeUpdated {
955                device_id: "device_id".to_owned(),
956                node_id: "node_id".to_owned(),
957                has_required_attributes: false
958            }]
959        );
960        assert_eq!(
961            publish(
962                &controller,
963                "base_topic/device_id/node_id/$type",
964                "Node type"
965            )
966            .await?,
967            vec![Event::NodeUpdated {
968                device_id: "device_id".to_owned(),
969                node_id: "node_id".to_owned(),
970                has_required_attributes: false
971            }]
972        );
973
974        // A property on the node.
975        assert_eq!(
976            publish(
977                &controller,
978                "base_topic/device_id/node_id/$properties",
979                "property_id"
980            )
981            .await?,
982            vec![Event::NodeUpdated {
983                device_id: "device_id".to_owned(),
984                node_id: "node_id".to_owned(),
985                has_required_attributes: false
986            }]
987        );
988        assert_eq!(
989            publish(
990                &controller,
991                "base_topic/device_id/node_id/property_id/$name",
992                "Property name"
993            )
994            .await?,
995            vec![Event::PropertyUpdated {
996                device_id: "device_id".to_owned(),
997                node_id: "node_id".to_owned(),
998                property_id: "property_id".to_owned(),
999                has_required_attributes: false
1000            }]
1001        );
1002        assert_eq!(
1003            publish(
1004                &controller,
1005                "base_topic/device_id/node_id/property_id/$datatype",
1006                "integer"
1007            )
1008            .await?,
1009            vec![Event::PropertyUpdated {
1010                device_id: "device_id".to_owned(),
1011                node_id: "node_id".to_owned(),
1012                property_id: "property_id".to_owned(),
1013                has_required_attributes: true
1014            }]
1015        );
1016
1017        Ok(())
1018    }
1019
1020    #[tokio::test]
1021    async fn constructs_device_tree() -> Result<(), Box<dyn std::error::Error>> {
1022        let (controller, _requests_rx) = make_test_controller();
1023
1024        // Discover a new device with a node with a property.
1025
1026        connect(&controller).await?;
1027        publish(&controller, "base_topic/device_id/$homie", "4.0").await?;
1028        publish(&controller, "base_topic/device_id/$name", "Device name").await?;
1029        publish(&controller, "base_topic/device_id/$state", "ready").await?;
1030        publish(&controller, "base_topic/device_id/$nodes", "node_id").await?;
1031
1032        publish(
1033            &controller,
1034            "base_topic/device_id/node_id/$name",
1035            "Node name",
1036        )
1037        .await?;
1038        publish(
1039            &controller,
1040            "base_topic/device_id/node_id/$type",
1041            "Node type",
1042        )
1043        .await?;
1044        publish(
1045            &controller,
1046            "base_topic/device_id/node_id/$properties",
1047            "property_id",
1048        )
1049        .await?;
1050
1051        publish(
1052            &controller,
1053            "base_topic/device_id/node_id/property_id/$name",
1054            "Property name",
1055        )
1056        .await?;
1057        publish(
1058            &controller,
1059            "base_topic/device_id/node_id/property_id/$datatype",
1060            "integer",
1061        )
1062        .await?;
1063
1064        let expected_property = Property {
1065            name: Some("Property name".to_owned()),
1066            datatype: Some(Datatype::Integer),
1067            ..Property::new("property_id")
1068        };
1069        let expected_node = Node {
1070            name: Some("Node name".to_owned()),
1071            node_type: Some("Node type".to_owned()),
1072            properties: property_set(vec![expected_property]),
1073            ..Node::new("node_id")
1074        };
1075        let expected_device = Device {
1076            name: Some("Device name".to_owned()),
1077            state: State::Ready,
1078            nodes: node_set(vec![expected_node]),
1079            ..Device::new("device_id", "4.0")
1080        };
1081
1082        assert_eq!(
1083            controller.devices().get("device_id").unwrap().to_owned(),
1084            expected_device
1085        );
1086
1087        Ok(())
1088    }
1089}