homie_device/
lib.rs

1//! `homie-device` is a library for creating devices implementing the
2//! [Homie convention](https://homieiot.github.io/) for IoT devices connecting to an MQTT broker.
3//!
4//! See the examples directory for examples of how to use it.
5
6use futures::FutureExt;
7use futures::future::try_join;
8
9use mac_address::get_mac_address;
10use rumqttc::{
11    self, AsyncClient, ClientError, ConnectionError, Event, EventLoop, Incoming, LastWill,
12    MqttOptions, QoS,
13};
14use std::fmt::{self, Debug, Display, Formatter};
15use std::future::Future;
16use std::pin::Pin;
17use std::str;
18use std::time::{Duration, Instant};
19use thiserror::Error;
20use tokio::task::{self, JoinError, JoinHandle};
21use tokio::time::sleep;
22
23mod types;
24pub use crate::types::{Datatype, Node, Property};
25mod values;
26pub use crate::values::{Color, ColorFormat, ColorHsv, ColorRgb};
27
28const HOMIE_VERSION: &str = "4.0";
29const HOMIE_IMPLEMENTATION: &str = "homie-rs";
30const STATS_INTERVAL: Duration = Duration::from_secs(60);
31const REQUESTS_CAP: usize = 10;
32
33/// Error type for futures representing tasks spawned by this crate.
34#[derive(Error, Debug)]
35pub enum SpawnError {
36    #[error("{0}")]
37    Client(#[from] ClientError),
38    #[error("{0}")]
39    Connection(#[from] ConnectionError),
40    #[error("Task failed: {0}")]
41    Join(#[from] JoinError),
42    #[error("Internal error: {0}")]
43    Internal(&'static str),
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
47enum State {
48    /// The device is connected to the MQTT broker but is not yet ready to operate.
49    Init,
50    /// The device is connected and operational.
51    Ready,
52    /// The device has cleanly disconnected from the MQTT broker.
53    Disconnected,
54    /// The device is currently sleeping.
55    Sleeping,
56    /// The device was uncleanly disconnected from the MQTT broker. This could happen due to a
57    /// network issue, power failure or some other unexpected failure.
58    Lost,
59    /// The device is connected to the MQTT broker but something is wrong and it may require human
60    /// intervention.
61    Alert,
62}
63
64impl State {
65    fn as_str(&self) -> &'static str {
66        match self {
67            Self::Init => "init",
68            Self::Ready => "ready",
69            Self::Disconnected => "disconnected",
70            Self::Sleeping => "sleeping",
71            Self::Lost => "lost",
72            Self::Alert => "alert",
73        }
74    }
75}
76
77impl Display for State {
78    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
79        f.write_str(self.as_str())
80    }
81}
82
83impl From<State> for Vec<u8> {
84    fn from(state: State) -> Self {
85        state.as_str().into()
86    }
87}
88
89type UpdateCallback = Box<
90    dyn FnMut(String, String, String) -> Pin<Box<dyn Future<Output = Option<String>> + Send>>
91        + Send
92        + Sync,
93>;
94
95/// Builder for `HomieDevice` and associated objects.
96pub struct HomieDeviceBuilder {
97    device_base: String,
98    device_name: String,
99    firmware_name: Option<String>,
100    firmware_version: Option<String>,
101    mqtt_options: MqttOptions,
102    update_callback: Option<UpdateCallback>,
103}
104
105impl Debug for HomieDeviceBuilder {
106    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
107        f.debug_struct("HomieDeviceBuilder")
108            .field("device_base", &self.device_base)
109            .field("device_name", &self.device_name)
110            .field("firmware_name", &self.firmware_name)
111            .field("firmware_version", &self.firmware_version)
112            .field("mqtt_options", &self.mqtt_options)
113            .field(
114                "update_callback",
115                &self.update_callback.as_ref().map(|_| "..."),
116            )
117            .finish()
118    }
119}
120
121impl HomieDeviceBuilder {
122    /// Set the firmware name and version to be advertised for the Homie device.
123    ///
124    /// If this is not set, it will default to the cargo package name and version.
125    pub fn set_firmware(&mut self, firmware_name: &str, firmware_version: &str) {
126        self.firmware_name = Some(firmware_name.to_string());
127        self.firmware_version = Some(firmware_version.to_string());
128    }
129
130    pub fn set_update_callback<F, Fut>(&mut self, mut update_callback: F)
131    where
132        F: (FnMut(String, String, String) -> Fut) + Send + Sync + 'static,
133        Fut: Future<Output = Option<String>> + Send + 'static,
134    {
135        self.update_callback = Some(Box::new(
136            move |node_id: String, property_id: String, value: String| {
137                update_callback(node_id, property_id, value).boxed()
138            },
139        ));
140    }
141
142    /// Create a new Homie device, connect to the MQTT broker, and start a task to handle the MQTT
143    /// connection.
144    ///
145    /// # Return value
146    /// A pair of the `HomieDevice` itself, and a `Future` for the tasks which handle the MQTT
147    /// connection. You should join on this future to handle any errors it returns.
148    pub async fn spawn(
149        self,
150    ) -> Result<(HomieDevice, impl Future<Output = Result<(), SpawnError>>), ClientError> {
151        let (event_loop, mut homie, stats, firmware, update_callback) = self.build();
152
153        // This needs to be spawned before we wait for anything to be sent, as the start() calls below do.
154        let event_task = homie.spawn(event_loop, update_callback);
155
156        stats.start().await?;
157        if let Some(firmware) = firmware {
158            firmware.start().await?;
159        }
160        homie.start().await?;
161
162        let stats_task = stats.spawn();
163        let join_handle = try_join(event_task, stats_task).map(simplify_unit_pair);
164
165        Ok((homie, join_handle))
166    }
167
168    fn build(
169        self,
170    ) -> (
171        EventLoop,
172        HomieDevice,
173        HomieStats,
174        Option<HomieFirmware>,
175        Option<UpdateCallback>,
176    ) {
177        let mut mqtt_options = self.mqtt_options;
178        let last_will = LastWill::new(
179            format!("{}/$state", self.device_base),
180            State::Lost,
181            QoS::AtLeastOnce,
182            true,
183        );
184        mqtt_options.set_last_will(last_will);
185        let (client, event_loop) = AsyncClient::new(mqtt_options, REQUESTS_CAP);
186
187        let publisher = DevicePublisher::new(client, self.device_base);
188
189        let mut extension_ids = vec![HomieStats::EXTENSION_ID];
190        let stats = HomieStats::new(publisher.clone());
191        let firmware = if let (Some(firmware_name), Some(firmware_version)) =
192            (self.firmware_name, self.firmware_version)
193        {
194            extension_ids.push(HomieFirmware::EXTENSION_ID);
195            Some(HomieFirmware::new(
196                publisher.clone(),
197                firmware_name,
198                firmware_version,
199            ))
200        } else {
201            None
202        };
203
204        let homie = HomieDevice::new(publisher, self.device_name, &extension_ids);
205
206        (event_loop, homie, stats, firmware, self.update_callback)
207    }
208}
209
210/// A Homie [device](https://homieiot.github.io/specification/#devices). This corresponds to a
211/// single MQTT connection.
212#[derive(Debug)]
213pub struct HomieDevice {
214    publisher: DevicePublisher,
215    device_name: String,
216    nodes: Vec<Node>,
217    state: State,
218    extension_ids: String,
219}
220
221impl HomieDevice {
222    /// Create a builder to construct a new Homie device.
223    ///
224    /// # Arguments
225    /// * `device_base`: The base topic ID for the device, including the Homie base topic. This
226    ///   might be something like "homie/my-device-id" if you are using the default Homie
227    ///   [base topic](https://homieiot.github.io/specification/#base-topic). This must be
228    ///   unique per MQTT broker.
229    /// * `device_name`: The human-readable name of the device.
230    /// * `mqtt_options`: Options for the MQTT connection, including which server to connect to.
231    pub fn builder(
232        device_base: &str,
233        device_name: &str,
234        mqtt_options: MqttOptions,
235    ) -> HomieDeviceBuilder {
236        HomieDeviceBuilder {
237            device_base: device_base.to_string(),
238            device_name: device_name.to_string(),
239            firmware_name: None,
240            firmware_version: None,
241            mqtt_options,
242            update_callback: None,
243        }
244    }
245
246    fn new(publisher: DevicePublisher, device_name: String, extension_ids: &[&str]) -> HomieDevice {
247        HomieDevice {
248            publisher,
249            device_name,
250            nodes: vec![],
251            state: State::Disconnected,
252            extension_ids: extension_ids.join(","),
253        }
254    }
255
256    async fn start(&mut self) -> Result<(), ClientError> {
257        assert_eq!(self.state, State::Disconnected);
258        self.publisher
259            .publish_retained("$homie", HOMIE_VERSION)
260            .await?;
261        self.publisher
262            .publish_retained("$extensions", self.extension_ids.as_str())
263            .await?;
264        self.publisher
265            .publish_retained("$implementation", HOMIE_IMPLEMENTATION)
266            .await?;
267        self.publisher
268            .publish_retained("$name", self.device_name.as_str())
269            .await?;
270        self.set_state(State::Init).await?;
271        Ok(())
272    }
273
274    /// Spawn a task to handle the EventLoop.
275    fn spawn(
276        &self,
277        mut event_loop: EventLoop,
278        mut update_callback: Option<UpdateCallback>,
279    ) -> impl Future<Output = Result<(), SpawnError>> + use<> {
280        let device_base = format!("{}/", self.publisher.device_base);
281        let (incoming_tx, incoming_rx) = flume::unbounded();
282
283        let mqtt_task = task::spawn(async move {
284            loop {
285                let notification = event_loop.poll().await?;
286                log::trace!("Notification = {notification:?}");
287
288                if let Event::Incoming(incoming) = notification {
289                    incoming_tx.send_async(incoming).await.map_err(|_| {
290                        SpawnError::Internal("Incoming event channel receiver closed.")
291                    })?;
292                }
293            }
294        });
295
296        let publisher = self.publisher.clone();
297        let incoming_task: JoinHandle<Result<(), SpawnError>> = task::spawn(async move {
298            loop {
299                if let Incoming::Publish(publish) = incoming_rx
300                    .recv_async()
301                    .await
302                    .map_err(|_| SpawnError::Internal("Incoming event channel sender closed."))?
303                {
304                    if let Some(rest) = publish.topic.strip_prefix(&device_base) {
305                        if let ([node_id, property_id, "set"], Ok(payload)) = (
306                            rest.split('/').collect::<Vec<&str>>().as_slice(),
307                            str::from_utf8(&publish.payload),
308                        ) {
309                            log::trace!(
310                                "set node {node_id:?} property {property_id:?} to {payload:?}"
311                            );
312                            if let Some(callback) = update_callback.as_mut() {
313                                if let Some(value) = callback(
314                                    node_id.to_string(),
315                                    property_id.to_string(),
316                                    payload.to_string(),
317                                )
318                                .await
319                                {
320                                    publisher
321                                        .publish_retained(
322                                            &format!("{node_id}/{property_id}"),
323                                            value,
324                                        )
325                                        .await?;
326                                }
327                            }
328                        }
329                    } else {
330                        log::warn!("Unexpected publish: {publish:?}");
331                    }
332                }
333            }
334        });
335        try_join_unit_handles(mqtt_task, incoming_task)
336    }
337
338    /// Check whether a node with the given ID currently exists on the device.
339    pub fn has_node(&self, node_id: &str) -> bool {
340        self.nodes.iter().any(|n| n.id == node_id)
341    }
342
343    /// Add a node to the Homie device. It will immediately be published.
344    ///
345    /// This will panic if you attempt to add a node with the same ID as a node which was previously
346    /// added.
347    pub async fn add_node(&mut self, node: Node) -> Result<(), ClientError> {
348        // First check that there isn't already a node with the same ID.
349        if self.has_node(&node.id) {
350            panic!("Tried to add node with duplicate ID: {node:?}");
351        }
352        self.nodes.push(node);
353        // `node` was moved into the `nodes` vector, but we can safely get a reference to it because
354        // nothing else can modify `nodes` in the meantime.
355        let node = &self.nodes[self.nodes.len() - 1];
356
357        self.publish_node(node).await?;
358        self.publish_nodes().await
359    }
360
361    /// Remove the node with the given ID.
362    pub async fn remove_node(&mut self, node_id: &str) -> Result<(), ClientError> {
363        // Panic on attempt to remove a node which was never added.
364        let index = self.nodes.iter().position(|n| n.id == node_id).unwrap();
365        self.unpublish_node(&self.nodes[index]).await?;
366        self.nodes.remove(index);
367        self.publish_nodes().await
368    }
369
370    async fn publish_node(&self, node: &Node) -> Result<(), ClientError> {
371        self.publisher
372            .publish_retained(&format!("{}/$name", node.id), node.name.as_str())
373            .await?;
374        self.publisher
375            .publish_retained(&format!("{}/$type", node.id), node.node_type.as_str())
376            .await?;
377        let mut property_ids: Vec<&str> = vec![];
378        for property in &node.properties {
379            property_ids.push(&property.id);
380            self.publisher
381                .publish_retained(
382                    &format!("{}/{}/$name", node.id, property.id),
383                    property.name.as_str(),
384                )
385                .await?;
386            self.publisher
387                .publish_retained(
388                    &format!("{}/{}/$datatype", node.id, property.id),
389                    property.datatype,
390                )
391                .await?;
392            self.publisher
393                .publish_retained(
394                    &format!("{}/{}/$settable", node.id, property.id),
395                    if property.settable { "true" } else { "false" },
396                )
397                .await?;
398            self.publisher
399                .publish_retained(
400                    &format!("{}/{}/$retained", node.id, property.id),
401                    if property.retained { "true" } else { "false" },
402                )
403                .await?;
404            if let Some(unit) = &property.unit {
405                self.publisher
406                    .publish_retained(&format!("{}/{}/$unit", node.id, property.id), unit.as_str())
407                    .await?;
408            }
409            if let Some(format) = &property.format {
410                self.publisher
411                    .publish_retained(
412                        &format!("{}/{}/$format", node.id, property.id),
413                        format.as_str(),
414                    )
415                    .await?;
416            }
417            if property.settable {
418                self.publisher
419                    .subscribe(&format!("{}/{}/set", node.id, property.id))
420                    .await?;
421            }
422        }
423        self.publisher
424            .publish_retained(&format!("{}/$properties", node.id), property_ids.join(","))
425            .await?;
426        Ok(())
427    }
428
429    async fn unpublish_node(&self, node: &Node) -> Result<(), ClientError> {
430        for property in &node.properties {
431            if property.settable {
432                self.publisher
433                    .unsubscribe(&format!("{}/{}/set", node.id, property.id))
434                    .await?;
435            }
436        }
437        Ok(())
438    }
439
440    async fn publish_nodes(&mut self) -> Result<(), ClientError> {
441        let node_ids = self
442            .nodes
443            .iter()
444            .map(|node| node.id.as_str())
445            .collect::<Vec<&str>>()
446            .join(",");
447        self.publisher.publish_retained("$nodes", node_ids).await
448    }
449
450    async fn set_state(&mut self, state: State) -> Result<(), ClientError> {
451        self.state = state;
452        self.publisher.publish_retained("$state", self.state).await
453    }
454
455    /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie
456    /// device to 'ready'. This should be called once it is ready to begin normal operation, or to
457    /// return to normal operation after calling `sleep()` or `alert()`.
458    pub async fn ready(&mut self) -> Result<(), ClientError> {
459        assert!(&[State::Init, State::Sleeping, State::Alert].contains(&self.state));
460        self.set_state(State::Ready).await
461    }
462
463    /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie
464    /// device to 'sleeping'. This should be only be called after `ready()`, otherwise it will panic.
465    pub async fn sleep(&mut self) -> Result<(), ClientError> {
466        assert_eq!(self.state, State::Ready);
467        self.set_state(State::Sleeping).await
468    }
469
470    /// Update the [state](https://homieiot.github.io/specification/#device-lifecycle) of the Homie
471    /// device to 'alert', to indicate that something wrong is happening and manual intervention may
472    /// be required. This should be only be called after `ready()`, otherwise it will panic.
473    pub async fn alert(&mut self) -> Result<(), ClientError> {
474        assert_eq!(self.state, State::Ready);
475        self.set_state(State::Alert).await
476    }
477
478    /// Disconnect cleanly from the MQTT broker, after updating the state of the Homie device to
479    // 'disconnected'.
480    pub async fn disconnect(mut self) -> Result<(), ClientError> {
481        self.set_state(State::Disconnected).await?;
482        self.publisher.client.disconnect().await
483    }
484
485    /// Publish a new value for the given retained property of the given node of this device. The
486    /// caller is responsible for ensuring that the value is of the correct type.
487    pub async fn publish_value(
488        &self,
489        node_id: &str,
490        property_id: &str,
491        value: impl ToString,
492    ) -> Result<(), ClientError> {
493        self.publisher
494            .publish_retained(&format!("{node_id}/{property_id}"), value.to_string())
495            .await
496    }
497
498    /// Publish a new value for the given non-retained property of the given node of this device. The
499    /// caller is responsible for ensuring that the value is of the correct type.
500    pub async fn publish_nonretained_value(
501        &self,
502        node_id: &str,
503        property_id: &str,
504        value: impl ToString,
505    ) -> Result<(), ClientError> {
506        self.publisher
507            .publish_nonretained(&format!("{node_id}/{property_id}"), value.to_string())
508            .await
509    }
510}
511
512#[derive(Clone, Debug)]
513struct DevicePublisher {
514    pub client: AsyncClient,
515    device_base: String,
516}
517
518impl DevicePublisher {
519    fn new(client: AsyncClient, device_base: String) -> Self {
520        Self {
521            client,
522            device_base,
523        }
524    }
525
526    async fn publish_retained(
527        &self,
528        subtopic: &str,
529        value: impl Into<Vec<u8>>,
530    ) -> Result<(), ClientError> {
531        let topic = format!("{}/{}", self.device_base, subtopic);
532        self.client
533            .publish(topic, QoS::AtLeastOnce, true, value)
534            .await
535    }
536
537    async fn publish_nonretained(
538        &self,
539        subtopic: &str,
540        value: impl Into<Vec<u8>>,
541    ) -> Result<(), ClientError> {
542        let topic = format!("{}/{}", self.device_base, subtopic);
543        self.client
544            .publish(topic, QoS::AtLeastOnce, false, value)
545            .await
546    }
547
548    async fn subscribe(&self, subtopic: &str) -> Result<(), ClientError> {
549        let topic = format!("{}/{}", self.device_base, subtopic);
550        self.client.subscribe(topic, QoS::AtLeastOnce).await
551    }
552
553    async fn unsubscribe(&self, subtopic: &str) -> Result<(), ClientError> {
554        let topic = format!("{}/{}", self.device_base, subtopic);
555        self.client.unsubscribe(topic).await
556    }
557}
558
559/// Legacy stats extension.
560#[derive(Debug)]
561struct HomieStats {
562    publisher: DevicePublisher,
563    start_time: Instant,
564}
565
566impl HomieStats {
567    const EXTENSION_ID: &'static str = "org.homie.legacy-stats:0.1.1:[4.x]";
568
569    fn new(publisher: DevicePublisher) -> Self {
570        let now = Instant::now();
571        Self {
572            publisher,
573            start_time: now,
574        }
575    }
576
577    /// Send initial topics.
578    async fn start(&self) -> Result<(), ClientError> {
579        self.publisher
580            .publish_retained("$stats/interval", STATS_INTERVAL.as_secs().to_string())
581            .await
582    }
583
584    /// Periodically send stats.
585    fn spawn(self) -> impl Future<Output = Result<(), SpawnError>> {
586        let task: JoinHandle<Result<(), SpawnError>> = task::spawn(async move {
587            loop {
588                let uptime = Instant::now() - self.start_time;
589                self.publisher
590                    .publish_retained("$stats/uptime", uptime.as_secs().to_string())
591                    .await?;
592                sleep(STATS_INTERVAL).await;
593            }
594        });
595        task.map(|res| res?)
596    }
597}
598
599/// Legacy firmware extension.
600#[derive(Debug)]
601struct HomieFirmware {
602    publisher: DevicePublisher,
603    firmware_name: String,
604    firmware_version: String,
605}
606
607impl HomieFirmware {
608    const EXTENSION_ID: &'static str = "org.homie.legacy-firmware:0.1.1:[4.x]";
609
610    fn new(publisher: DevicePublisher, firmware_name: String, firmware_version: String) -> Self {
611        Self {
612            publisher,
613            firmware_name,
614            firmware_version,
615        }
616    }
617
618    /// Send initial topics.
619    async fn start(&self) -> Result<(), ClientError> {
620        self.publisher
621            .publish_retained("$localip", local_ipaddress::get().unwrap())
622            .await?;
623        self.publisher
624            .publish_retained("$mac", get_mac_address().unwrap().unwrap().to_string())
625            .await?;
626        self.publisher
627            .publish_retained("$fw/name", self.firmware_name.as_str())
628            .await?;
629        self.publisher
630            .publish_retained("$fw/version", self.firmware_version.as_str())
631            .await?;
632        Ok(())
633    }
634}
635
636fn try_join_handles<A, B, E>(
637    a: JoinHandle<Result<A, E>>,
638    b: JoinHandle<Result<B, E>>,
639) -> impl Future<Output = Result<(A, B), E>>
640where
641    E: From<JoinError>,
642{
643    // Unwrap the JoinHandle results to get to the real results.
644    try_join(a.map(|res| res?), b.map(|res| res?))
645}
646
647fn try_join_unit_handles<E>(
648    a: JoinHandle<Result<(), E>>,
649    b: JoinHandle<Result<(), E>>,
650) -> impl Future<Output = Result<(), E>>
651where
652    E: From<JoinError>,
653{
654    try_join_handles(a, b).map(simplify_unit_pair)
655}
656
657fn simplify_unit_pair<E>(m: Result<((), ()), E>) -> Result<(), E> {
658    m.map(|((), ())| ())
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use flume::Receiver;
665    use rumqttc::Request;
666
667    fn make_test_device() -> (HomieDevice, Receiver<Request>) {
668        let (requests_tx, requests_rx) = flume::unbounded();
669        let client = AsyncClient::from_senders(requests_tx);
670        let publisher = DevicePublisher::new(client, "homie/test-device".to_string());
671        let device = HomieDevice::new(publisher, "Test device".to_string(), &[]);
672        (device, requests_rx)
673    }
674
675    #[tokio::test]
676    #[should_panic(expected = "Tried to add node with duplicate ID")]
677    async fn add_node_fails_given_duplicate_id() {
678        let (mut device, rx) = make_test_device();
679
680        device
681            .add_node(Node::new("id", "Name", "type", vec![]))
682            .await
683            .unwrap();
684        device
685            .add_node(Node::new("id", "Name 2", "type2", vec![]))
686            .await
687            .unwrap();
688
689        // Need to keep rx alive until here so that the channel isn't closed.
690        drop(rx);
691    }
692
693    #[tokio::test]
694    #[should_panic(expected = "Init")]
695    async fn ready_fails_if_called_before_start() {
696        let (mut device, rx) = make_test_device();
697
698        device.ready().await.unwrap();
699
700        // Need to keep rx alive until here so that the channel isn't closed.
701        drop(rx);
702    }
703
704    #[tokio::test]
705    async fn start_succeeds_with_no_nodes() -> Result<(), ClientError> {
706        let (mut device, rx) = make_test_device();
707
708        device.start().await?;
709        device.ready().await?;
710
711        // Need to keep rx alive until here so that the channel isn't closed.
712        drop(rx);
713        Ok(())
714    }
715
716    #[tokio::test]
717    async fn sleep_then_ready_again_succeeds() -> Result<(), ClientError> {
718        let (mut device, rx) = make_test_device();
719
720        device.start().await?;
721        device.ready().await?;
722        device.sleep().await?;
723        device.ready().await?;
724
725        // Need to keep rx alive until here so that the channel isn't closed.
726        drop(rx);
727        Ok(())
728    }
729
730    #[tokio::test]
731    async fn alert_then_ready_again_succeeds() -> Result<(), ClientError> {
732        let (mut device, rx) = make_test_device();
733
734        device.start().await?;
735        device.ready().await?;
736        device.alert().await?;
737        device.ready().await?;
738
739        // Need to keep rx alive until here so that the channel isn't closed.
740        drop(rx);
741        Ok(())
742    }
743
744    #[tokio::test]
745    async fn disconnect_succeeds_before_ready() -> Result<(), ClientError> {
746        let (mut device, rx) = make_test_device();
747
748        device.start().await?;
749        device.disconnect().await?;
750
751        // Need to keep rx alive until here so that the channel isn't closed.
752        drop(rx);
753        Ok(())
754    }
755
756    #[tokio::test]
757    async fn disconnect_succeeds_after_ready() -> Result<(), ClientError> {
758        let (mut device, rx) = make_test_device();
759
760        device.start().await?;
761        device.ready().await?;
762        device.disconnect().await?;
763
764        // Need to keep rx alive until here so that the channel isn't closed.
765        drop(rx);
766        Ok(())
767    }
768
769    #[tokio::test]
770    async fn minimal_build_succeeds() -> Result<(), ClientError> {
771        let builder = HomieDevice::builder(
772            "homie/test-device",
773            "Test device",
774            MqttOptions::new("client_id", "hostname", 1234),
775        );
776
777        let (_event_loop, homie, _stats, firmware, _callback) = builder.build();
778
779        assert_eq!(homie.device_name, "Test device");
780        assert_eq!(homie.publisher.device_base, "homie/test-device");
781        assert!(firmware.is_none());
782
783        Ok(())
784    }
785
786    #[tokio::test]
787    async fn set_firmware_build_succeeds() -> Result<(), ClientError> {
788        let mut builder = HomieDevice::builder(
789            "homie/test-device",
790            "Test device",
791            MqttOptions::new("client_id", "hostname", 1234),
792        );
793
794        builder.set_firmware("firmware_name", "firmware_version");
795
796        let (_event_loop, homie, _stats, firmware, _callback) = builder.build();
797
798        assert_eq!(homie.device_name, "Test device");
799        assert_eq!(homie.publisher.device_base, "homie/test-device");
800        let firmware = firmware.unwrap();
801        assert_eq!(firmware.firmware_name, "firmware_name");
802        assert_eq!(firmware.firmware_version, "firmware_version");
803
804        Ok(())
805    }
806
807    #[tokio::test]
808    async fn add_node_succeeds_before_and_after_start() -> Result<(), ClientError> {
809        let (mut device, rx) = make_test_device();
810
811        device
812            .add_node(Node::new("id", "Name", "type", vec![]))
813            .await?;
814
815        device.start().await?;
816        device.ready().await?;
817
818        // Add another node after starting.
819        device
820            .add_node(Node::new("id2", "Name 2", "type2", vec![]))
821            .await?;
822
823        // Need to keep rx alive until here so that the channel isn't closed.
824        drop(rx);
825        Ok(())
826    }
827
828    /// Add a node, remove it, and add it back again.
829    #[tokio::test]
830    async fn add_node_succeeds_after_remove() -> Result<(), ClientError> {
831        let (mut device, rx) = make_test_device();
832
833        device
834            .add_node(Node::new("id", "Name", "type", vec![]))
835            .await?;
836
837        device.remove_node("id").await?;
838
839        // Adding it back shouldn't give an error.
840        device
841            .add_node(Node::new("id", "Name", "type", vec![]))
842            .await?;
843
844        // Need to keep rx alive until here so that the channel isn't closed.
845        drop(rx);
846        Ok(())
847    }
848
849    /// Check that `has_node` works as expected.
850    #[tokio::test]
851    async fn has_node() -> Result<(), ClientError> {
852        let (mut device, rx) = make_test_device();
853
854        assert!(!device.has_node("id"));
855
856        device
857            .add_node(Node::new("id", "Name", "type", vec![]))
858            .await?;
859        assert!(device.has_node("id"));
860
861        device.remove_node("id").await?;
862        assert!(!device.has_node("id"));
863
864        // Need to keep rx alive until here so that the channel isn't closed.
865        drop(rx);
866        Ok(())
867    }
868}