1use rumqttc::{
5 AsyncClient, ClientError, ConnectionError, EventLoop, Incoming, MqttOptions, Publish, QoS,
6};
7use std::collections::HashMap;
8use std::num::{ParseFloatError, ParseIntError};
9use std::str;
10use std::sync::{Arc, Mutex};
11use std::time::Duration;
12use thiserror::Error;
13
14mod types;
15pub use types::{Datatype, Device, Extension, Node, Property, State};
16use types::{ParseDatatypeError, ParseExtensionError, ParseStateError};
17
18mod values;
19pub use values::{
20 ColorFormat, ColorHsv, ColorRgb, EnumValue, ParseColorError, ParseEnumError, Value, ValueError,
21};
22
23const REQUESTS_CAP: usize = 1000;
24
25#[derive(Error, Debug)]
27pub enum PollError {
28 #[error("{0}")]
30 Client(#[from] ClientError),
31 #[error("{0}")]
33 Connection(#[from] ConnectionError),
34}
35
36#[derive(Clone, Debug, Eq, PartialEq)]
39pub enum Event {
40 DeviceUpdated {
42 device_id: String,
43 has_required_attributes: bool,
44 },
45 NodeUpdated {
47 device_id: String,
48 node_id: String,
49 has_required_attributes: bool,
50 },
51 PropertyUpdated {
53 device_id: String,
54 node_id: String,
55 property_id: String,
56 has_required_attributes: bool,
57 },
58 PropertyValueChanged {
60 device_id: String,
61 node_id: String,
62 property_id: String,
63 value: String,
65 fresh: bool,
68 },
69 Connected,
72}
73
74impl Event {
75 fn device_updated(device: &Device) -> Self {
76 Event::DeviceUpdated {
77 device_id: device.id.to_owned(),
78 has_required_attributes: device.has_required_attributes(),
79 }
80 }
81
82 fn node_updated(device_id: &str, node: &Node) -> Self {
83 Event::NodeUpdated {
84 device_id: device_id.to_owned(),
85 node_id: node.id.to_owned(),
86 has_required_attributes: node.has_required_attributes(),
87 }
88 }
89
90 fn property_updated(device_id: &str, node_id: &str, property: &Property) -> Self {
91 Event::PropertyUpdated {
92 device_id: device_id.to_owned(),
93 node_id: node_id.to_owned(),
94 property_id: property.id.to_owned(),
95 has_required_attributes: property.has_required_attributes(),
96 }
97 }
98
99 fn property_value(device_id: &str, node_id: &str, property: &Property, fresh: bool) -> Self {
100 Event::PropertyValueChanged {
101 device_id: device_id.to_owned(),
102 node_id: node_id.to_owned(),
103 property_id: property.id.to_owned(),
104 value: property.value.to_owned().unwrap(),
105 fresh,
106 }
107 }
108}
109
110#[derive(Debug)]
112pub struct HomieController {
113 mqtt_client: AsyncClient,
114 base_topic: String,
115 devices: Mutex<Arc<HashMap<String, Device>>>,
118 early_property_values: Mutex<HashMap<String, String>>,
121}
122
123pub struct HomieEventLoop {
124 event_loop: EventLoop,
125}
126
127impl HomieEventLoop {
128 fn new(event_loop: EventLoop) -> HomieEventLoop {
129 HomieEventLoop { event_loop }
130 }
131}
132
133struct PublishResponse {
135 events: Vec<Event>,
136 topics_to_subscribe: Vec<String>,
137 topics_to_unsubscribe: Vec<String>,
138}
139
140impl HomieController {
141 pub fn new(mqtt_options: MqttOptions, base_topic: &str) -> (HomieController, HomieEventLoop) {
148 let (mqtt_client, event_loop) = AsyncClient::new(mqtt_options, REQUESTS_CAP);
149 let controller = HomieController {
150 mqtt_client,
151 base_topic: base_topic.to_string(),
152 devices: Mutex::new(Arc::new(HashMap::new())),
153 early_property_values: Mutex::new(HashMap::new()),
154 };
155 (controller, HomieEventLoop::new(event_loop))
156 }
157
158 pub fn devices(&self) -> Arc<HashMap<String, Device>> {
161 self.devices.lock().unwrap().clone()
162 }
163
164 pub fn base_topic(&self) -> &str {
166 &self.base_topic
167 }
168
169 pub async fn poll(&self, event_loop: &mut HomieEventLoop) -> Result<Vec<Event>, PollError> {
171 let notification = event_loop.event_loop.poll().await?;
172 log::trace!("Notification = {notification:?}");
173
174 if let rumqttc::Event::Incoming(incoming) = notification {
175 self.handle_event(incoming).await
176 } else {
177 Ok(vec![])
178 }
179 }
180
181 async fn handle_event(&self, incoming: Incoming) -> Result<Vec<Event>, PollError> {
182 match incoming {
183 Incoming::Publish(publish) => match self.handle_publish(publish).await {
184 Err(HandleError::Warning(err)) => {
185 log::warn!("{err}");
189 Ok(vec![])
190 }
191 Err(HandleError::Fatal(e)) => Err(e.into()),
192 Ok(events) => Ok(events),
193 },
194 Incoming::ConnAck(_) => {
195 self.start().await?;
198 Ok(vec![Event::Connected])
199 }
200 _ => Ok(vec![]),
201 }
202 }
203
204 async fn handle_publish(&self, publish: Publish) -> Result<Vec<Event>, HandleError> {
208 let PublishResponse {
209 events,
210 topics_to_subscribe,
211 topics_to_unsubscribe,
212 } = self.handle_publish_sync(publish)?;
213
214 for topic in topics_to_subscribe {
215 log::trace!("Subscribe to {topic}");
216 self.mqtt_client.subscribe(topic, QoS::AtLeastOnce).await?;
217 }
218 for topic in topics_to_unsubscribe {
219 log::trace!("Unsubscribe from {topic}");
220 self.mqtt_client.unsubscribe(topic).await?;
221 }
222
223 Ok(events)
224 }
225
226 fn handle_publish_sync(&self, publish: Publish) -> Result<PublishResponse, HandleError> {
232 let base_topic = format!("{}/", self.base_topic);
233 let payload = str::from_utf8(&publish.payload)
234 .map_err(|e| format!("Payload not valid UTF-8: {e}"))?;
235 let subtopic = publish
236 .topic
237 .strip_prefix(&base_topic)
238 .ok_or_else(|| format!("Publish with unexpected topic: {publish:?}"))?;
239
240 let devices = &mut *self.devices.lock().unwrap();
244 let devices = Arc::make_mut(devices);
245
246 let early_property_values = &mut *self.early_property_values.lock().unwrap();
247
248 let mut topics_to_subscribe: Vec<String> = vec![];
251 let mut topics_to_unsubscribe: Vec<String> = vec![];
252
253 let parts = subtopic.split('/').collect::<Vec<&str>>();
254 let events = match parts.as_slice() {
255 [device_id, "$homie"] => {
256 if !devices.contains_key(*device_id) {
257 log::trace!("Homie device '{device_id}' version '{payload}'");
258 devices.insert((*device_id).to_owned(), Device::new(device_id, payload));
259 topics_to_subscribe.push(format!("{}/{}/+", self.base_topic, device_id));
260 topics_to_subscribe.push(format!("{}/{}/$fw/+", self.base_topic, device_id));
261 topics_to_subscribe.push(format!("{}/{}/$stats/+", self.base_topic, device_id));
262 vec![Event::DeviceUpdated {
263 device_id: (*device_id).to_owned(),
264 has_required_attributes: false,
265 }]
266 } else {
267 vec![]
268 }
269 }
270 [device_id, "$name"] => {
271 let device = get_mut_device_for(devices, "Got name for", device_id)?;
272 device.name = Some(payload.to_owned());
273 vec![Event::device_updated(device)]
274 }
275 [device_id, "$state"] => {
276 let state = payload.parse()?;
277 let device = get_mut_device_for(devices, "Got state for", device_id)?;
278 device.state = state;
279 vec![Event::device_updated(device)]
280 }
281 [device_id, "$implementation"] => {
282 let device = get_mut_device_for(devices, "Got implementation for", device_id)?;
283 device.implementation = Some(payload.to_owned());
284 vec![Event::device_updated(device)]
285 }
286 [device_id, "$extensions"] => {
287 let device = get_mut_device_for(devices, "Got extensions for", device_id)?;
288 device.extensions = payload
289 .split(',')
290 .map(|part| part.parse())
291 .collect::<Result<Vec<_>, _>>()?;
292 vec![Event::device_updated(device)]
293 }
294 [device_id, "$localip"] => {
295 let device = get_mut_device_for(devices, "Got localip for", device_id)?;
296 device.local_ip = Some(payload.to_owned());
297 vec![Event::device_updated(device)]
298 }
299 [device_id, "$mac"] => {
300 let device = get_mut_device_for(devices, "Got mac for", device_id)?;
301 device.mac = Some(payload.to_owned());
302 vec![Event::device_updated(device)]
303 }
304 [device_id, "$fw", "name"] => {
305 let device = get_mut_device_for(devices, "Got fw/name for", device_id)?;
306 device.firmware_name = Some(payload.to_owned());
307 vec![Event::device_updated(device)]
308 }
309 [device_id, "$fw", "version"] => {
310 let device = get_mut_device_for(devices, "Got fw/version for", device_id)?;
311 device.firmware_version = Some(payload.to_owned());
312 vec![Event::device_updated(device)]
313 }
314 [_device_id, "$stats"] => {
315 vec![]
318 }
319 [device_id, "$stats", "interval"] => {
320 let interval = payload.parse()?;
321 let device = get_mut_device_for(devices, "Got stats/interval for", device_id)?;
322 device.stats_interval = Some(Duration::from_secs(interval));
323 vec![Event::device_updated(device)]
324 }
325 [device_id, "$stats", "uptime"] => {
326 let uptime = payload.parse()?;
327 let device = get_mut_device_for(devices, "Got stats/uptime for", device_id)?;
328 device.stats_uptime = Some(Duration::from_secs(uptime));
329 vec![Event::device_updated(device)]
330 }
331 [device_id, "$stats", "signal"] => {
332 let signal = payload.parse()?;
333 let device = get_mut_device_for(devices, "Got stats/signal for", device_id)?;
334 device.stats_signal = Some(signal);
335 vec![Event::device_updated(device)]
336 }
337 [device_id, "$stats", "cputemp"] => {
338 let cputemp = payload.parse()?;
339 let device = get_mut_device_for(devices, "Got stats/cputemp for", device_id)?;
340 device.stats_cputemp = Some(cputemp);
341 vec![Event::device_updated(device)]
342 }
343 [device_id, "$stats", "cpuload"] => {
344 let cpuload = payload.parse()?;
345 let device = get_mut_device_for(devices, "Got stats/cpuload for", device_id)?;
346 device.stats_cpuload = Some(cpuload);
347 vec![Event::device_updated(device)]
348 }
349 [device_id, "$stats", "battery"] => {
350 let battery = payload.parse()?;
351 let device = get_mut_device_for(devices, "Got stats/battery for", device_id)?;
352 device.stats_battery = Some(battery);
353 vec![Event::device_updated(device)]
354 }
355 [device_id, "$stats", "freeheap"] => {
356 let freeheap = payload.parse()?;
357 let device = get_mut_device_for(devices, "Got stats/freeheap for", device_id)?;
358 device.stats_freeheap = Some(freeheap);
359 vec![Event::device_updated(device)]
360 }
361 [device_id, "$stats", "supply"] => {
362 let supply = payload.parse()?;
363 let device = get_mut_device_for(devices, "Got stats/supply for", device_id)?;
364 device.stats_supply = Some(supply);
365 vec![Event::device_updated(device)]
366 }
367 [device_id, "$nodes"] => {
368 let nodes: Vec<_> = payload.split(',').collect();
369 let device = get_mut_device_for(devices, "Got nodes for", device_id)?;
370
371 device.nodes.retain(|node_id, node| {
373 let kept = nodes.contains(&node_id.as_ref());
374 if !kept {
375 let node_topic = format!("{}/{}/{}/+", self.base_topic, device_id, node_id);
377 topics_to_unsubscribe.push(node_topic);
378 for property_id in node.properties.keys() {
379 let topic = format!(
380 "{}/{}/{}/{}/+",
381 self.base_topic, device_id, node_id, property_id
382 );
383 topics_to_unsubscribe.push(topic);
384 }
385 }
386 kept
387 });
388
389 for node_id in nodes {
391 if !device.nodes.contains_key(node_id) {
392 device.add_node(Node::new(node_id));
393 let topic = format!("{}/{}/{}/+", self.base_topic, device_id, node_id);
394 topics_to_subscribe.push(topic);
395 }
396 }
397
398 vec![Event::device_updated(device)]
399 }
400 [device_id, node_id, "$name"] => {
401 let node = get_mut_node_for(devices, "Got node name for", device_id, node_id)?;
402 node.name = Some(payload.to_owned());
403 vec![Event::node_updated(device_id, node)]
404 }
405 [device_id, node_id, "$type"] => {
406 let node = get_mut_node_for(devices, "Got node type for", device_id, node_id)?;
407 node.node_type = Some(payload.to_owned());
408 vec![Event::node_updated(device_id, node)]
409 }
410 [device_id, node_id, "$properties"] => {
411 let properties: Vec<_> = payload.split(',').collect();
412 let node = get_mut_node_for(devices, "Got properties for", device_id, node_id)?;
413
414 node.properties.retain(|property_id, _| {
416 let kept = properties.contains(&property_id.as_ref());
417 if !kept {
418 let topic = format!(
420 "{}/{}/{}/{}/+",
421 self.base_topic, device_id, node_id, property_id
422 );
423 topics_to_unsubscribe.push(topic);
424 }
425 kept
426 });
427
428 let mut events = vec![Event::node_updated(device_id, node)];
429
430 for property_id in properties {
432 if !node.properties.contains_key(property_id) {
433 let mut new_prop = Property::new(property_id);
434
435 let key = format!("{device_id}/{node_id}/{property_id}");
436 new_prop.value = early_property_values.remove(&key);
437
438 if let Some(value) = new_prop.value.clone() {
439 events.push(Event::PropertyValueChanged {
440 device_id: device_id.to_string(),
441 node_id: node_id.to_string(),
442 property_id: property_id.to_string(),
443 value,
444 fresh: false,
445 });
446 }
447
448 node.add_property(new_prop);
449 let topic = format!(
450 "{}/{}/{}/{}/+",
451 self.base_topic, device_id, node_id, property_id
452 );
453 topics_to_subscribe.push(topic);
454 }
455 }
456
457 events
458 }
459 [device_id, node_id, property_id, "$name"] => {
460 let property = get_mut_property_for(
461 devices,
462 "Got property name for",
463 device_id,
464 node_id,
465 property_id,
466 )?;
467 property.name = Some(payload.to_owned());
468 vec![Event::property_updated(device_id, node_id, property)]
469 }
470 [device_id, node_id, property_id, "$datatype"] => {
471 let datatype = payload.parse()?;
472 let property = get_mut_property_for(
473 devices,
474 "Got property datatype for",
475 device_id,
476 node_id,
477 property_id,
478 )?;
479 property.datatype = Some(datatype);
480 vec![Event::property_updated(device_id, node_id, property)]
481 }
482 [device_id, node_id, property_id, "$unit"] => {
483 let property = get_mut_property_for(
484 devices,
485 "Got property unit for",
486 device_id,
487 node_id,
488 property_id,
489 )?;
490 property.unit = Some(payload.to_owned());
491 vec![Event::property_updated(device_id, node_id, property)]
492 }
493 [device_id, node_id, property_id, "$format"] => {
494 let property = get_mut_property_for(
495 devices,
496 "Got property format for",
497 device_id,
498 node_id,
499 property_id,
500 )?;
501 property.format = Some(payload.to_owned());
502 vec![Event::property_updated(device_id, node_id, property)]
503 }
504 [device_id, node_id, property_id, "$settable"] => {
505 let settable = payload
506 .parse()
507 .map_err(|_| format!("Invalid boolean '{payload}' for $settable."))?;
508 let property = get_mut_property_for(
509 devices,
510 "Got property settable for",
511 device_id,
512 node_id,
513 property_id,
514 )?;
515 property.settable = settable;
516 vec![Event::property_updated(device_id, node_id, property)]
517 }
518 [device_id, node_id, property_id, "$retained"] => {
519 let retained = payload
520 .parse()
521 .map_err(|_| format!("Invalid boolean '{payload}' for $retained."))?;
522 let property = get_mut_property_for(
523 devices,
524 "Got property retained for",
525 device_id,
526 node_id,
527 property_id,
528 )?;
529 property.retained = retained;
530 vec![Event::property_updated(device_id, node_id, property)]
531 }
532 [device_id, node_id, property_id]
533 if !device_id.starts_with('$')
534 && !node_id.starts_with('$')
535 && !property_id.starts_with('$') =>
536 {
537 match get_mut_property_for(
538 devices,
539 "Got property value for",
540 device_id,
541 node_id,
542 property_id,
543 ) {
544 Ok(property) => {
545 property.value = Some(payload.to_owned());
546 vec![Event::property_value(
547 device_id,
548 node_id,
549 property,
550 !publish.retain,
551 )]
552 }
553
554 Err(_) if publish.retain => {
555 early_property_values.insert(subtopic.to_owned(), payload.to_owned());
559
560 vec![]
561 }
562
563 Err(e) => return Err(e.into()),
564 }
565 }
566 [_device_id, _node_id, _property_id, "set"] => {
567 vec![]
570 }
571 _ => {
572 log::warn!("Unexpected subtopic {subtopic} = {payload}");
573 vec![]
574 }
575 };
576
577 Ok(PublishResponse {
578 events,
579 topics_to_subscribe,
580 topics_to_unsubscribe,
581 })
582 }
583
584 async fn start(&self) -> Result<(), ClientError> {
586 *self.devices.lock().unwrap() = Arc::new(HashMap::new());
588
589 let topic = format!("{}/+/$homie", self.base_topic);
590 log::trace!("Subscribe to {topic}");
591 self.mqtt_client.subscribe(topic, QoS::AtLeastOnce).await
592 }
593
594 pub async fn set(
597 &self,
598 device_id: &str,
599 node_id: &str,
600 property_id: &str,
601 value: impl Value,
602 ) -> Result<(), ClientError> {
603 let topic = format!(
604 "{}/{}/{}/{}/set",
605 self.base_topic, device_id, node_id, property_id
606 );
607 self.mqtt_client
608 .publish(topic, QoS::AtLeastOnce, false, value.to_string())
609 .await
610 }
611
612 pub async fn disconnect(&self) -> Result<(), ClientError> {
614 self.mqtt_client.disconnect().await
615 }
616}
617
618fn get_mut_device_for<'a>(
619 devices: &'a mut HashMap<String, Device>,
620 err_prefix: &str,
621 device_id: &str,
622) -> Result<&'a mut Device, String> {
623 devices
624 .get_mut(device_id)
625 .ok_or_else(|| format!("{err_prefix} unknown device '{device_id}'"))
626}
627
628fn get_mut_node_for<'a>(
629 devices: &'a mut HashMap<String, Device>,
630 err_prefix: &str,
631 device_id: &str,
632 node_id: &str,
633) -> Result<&'a mut Node, String> {
634 let device = get_mut_device_for(devices, err_prefix, device_id)?;
635 device
636 .nodes
637 .get_mut(node_id)
638 .ok_or_else(|| format!("{err_prefix} unknown node '{device_id}/{node_id}'"))
639}
640
641fn get_mut_property_for<'a>(
642 devices: &'a mut HashMap<String, Device>,
643 err_prefix: &str,
644 device_id: &str,
645 node_id: &str,
646 property_id: &str,
647) -> Result<&'a mut Property, String> {
648 let node = get_mut_node_for(devices, err_prefix, device_id, node_id)?;
649 node.properties.get_mut(property_id).ok_or_else(|| {
650 format!("{err_prefix} unknown property '{device_id}/{node_id}/{property_id}'")
651 })
652}
653
654#[derive(Error, Debug)]
655enum HandleError {
656 #[error("{0}")]
657 Warning(String),
658 #[error("{0}")]
659 Fatal(#[from] ClientError),
660}
661
662impl From<String> for HandleError {
663 fn from(s: String) -> Self {
664 HandleError::Warning(s)
665 }
666}
667
668impl From<ParseStateError> for HandleError {
669 fn from(e: ParseStateError) -> Self {
670 HandleError::Warning(e.to_string())
671 }
672}
673
674impl From<ParseDatatypeError> for HandleError {
675 fn from(e: ParseDatatypeError) -> Self {
676 HandleError::Warning(e.to_string())
677 }
678}
679
680impl From<ParseExtensionError> for HandleError {
681 fn from(e: ParseExtensionError) -> Self {
682 HandleError::Warning(e.to_string())
683 }
684}
685
686impl From<ParseIntError> for HandleError {
687 fn from(e: ParseIntError) -> Self {
688 HandleError::Warning(format!("Invalid integer: {e}"))
689 }
690}
691
692impl From<ParseFloatError> for HandleError {
693 fn from(e: ParseFloatError) -> Self {
694 HandleError::Warning(format!("Invalid float: {e}"))
695 }
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use flume::Receiver;
702 use rumqttc::{ConnAck, Packet, Request, Subscribe};
703
704 fn make_test_controller() -> (HomieController, Receiver<Request>) {
705 let (requests_tx, requests_rx) = flume::unbounded();
706 let mqtt_client = AsyncClient::from_senders(requests_tx);
707 let controller = HomieController {
708 base_topic: "base_topic".to_owned(),
709 mqtt_client,
710 devices: Mutex::new(Arc::new(HashMap::new())),
711 early_property_values: Mutex::new(HashMap::new()),
712 };
713 (controller, requests_rx)
714 }
715
716 fn expect_subscriptions(requests_rx: &Receiver<Request>, subscription_topics: &[&str]) {
717 let requests: Vec<_> = subscription_topics
718 .iter()
719 .map(|_| requests_rx.try_recv().unwrap())
720 .collect();
721
722 for topic in subscription_topics {
723 let expected = Request::Subscribe(Subscribe::new(*topic, QoS::AtLeastOnce));
724 assert!(requests.contains(&expected));
725 }
726 }
727
728 async fn connect(controller: &HomieController) -> Result<Vec<Event>, PollError> {
729 controller
730 .handle_event(Packet::ConnAck(ConnAck::new(
731 rumqttc::ConnectReturnCode::Success,
732 false,
733 )))
734 .await
735 }
736
737 async fn publish(
738 controller: &HomieController,
739 topic: &str,
740 payload: &str,
741 ) -> Result<Vec<Event>, PollError> {
742 controller
743 .handle_event(Packet::Publish(Publish::new(
744 topic,
745 QoS::AtLeastOnce,
746 payload,
747 )))
748 .await
749 }
750
751 async fn publish_retained(
752 controller: &HomieController,
753 topic: &str,
754 payload: &str,
755 ) -> Result<Vec<Event>, PollError> {
756 let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
757
758 publish.retain = true;
759
760 controller.handle_event(Packet::Publish(publish)).await
761 }
762
763 fn property_set(properties: Vec<Property>) -> HashMap<String, Property> {
764 properties
765 .into_iter()
766 .map(|property| (property.id.clone(), property))
767 .collect()
768 }
769
770 fn node_set(nodes: Vec<Node>) -> HashMap<String, Node> {
771 nodes
772 .into_iter()
773 .map(|node| (node.id.clone(), node))
774 .collect()
775 }
776
777 #[tokio::test]
778 async fn subscribes_to_things() -> Result<(), Box<dyn std::error::Error>> {
779 let (controller, requests_rx) = make_test_controller();
780
781 connect(&controller).await?;
783 expect_subscriptions(&requests_rx, &["base_topic/+/$homie"]);
784
785 publish(&controller, "base_topic/device_id/$homie", "4.0").await?;
787 expect_subscriptions(
788 &requests_rx,
789 &[
790 "base_topic/device_id/+",
791 "base_topic/device_id/$fw/+",
792 "base_topic/device_id/$stats/+",
793 ],
794 );
795
796 publish(&controller, "base_topic/device_id/$nodes", "node_id").await?;
798 expect_subscriptions(&requests_rx, &["base_topic/device_id/node_id/+"]);
799
800 publish(
802 &controller,
803 "base_topic/device_id/node_id/$properties",
804 "property_id",
805 )
806 .await?;
807 expect_subscriptions(
808 &requests_rx,
809 &["base_topic/device_id/node_id/property_id/+"],
810 );
811
812 assert!(requests_rx.is_empty());
814
815 Ok(())
816 }
817
818 #[tokio::test]
819 async fn retained_payloads_before_properties() -> Result<(), Box<dyn std::error::Error>> {
820 let (controller, _requests_rx) = make_test_controller();
821
822 connect(&controller).await?;
824
825 publish_retained(&controller, "base_topic/device_id/$homie", "4.0").await?;
827
828 publish_retained(
830 &controller,
831 "base_topic/device_id/$nodes",
832 "node_id,second_node",
833 )
834 .await?;
835
836 publish_retained(
838 &controller,
839 "base_topic/device_id/node_id/property_id",
840 "HELLO WORLD",
841 )
842 .await?;
843
844 publish_retained(
846 &controller,
847 "base_topic/device_id/node_id/$properties",
848 "property_id",
849 )
850 .await?;
851
852 publish_retained(
853 &controller,
854 "base_topic/device_id/second_node/property_id",
855 "hello again",
856 )
857 .await?;
858
859 publish_retained(
861 &controller,
862 "base_topic/device_id/second_node/$properties",
863 "property_id",
864 )
865 .await?;
866
867 assert_eq!(
868 controller
869 .devices()
870 .get("device_id")
871 .unwrap()
872 .nodes
873 .get("node_id")
874 .unwrap()
875 .properties
876 .get("property_id")
877 .unwrap()
878 .value
879 .as_deref(),
880 Some("HELLO WORLD")
881 );
882
883 assert_eq!(
884 controller
885 .devices()
886 .get("device_id")
887 .unwrap()
888 .nodes
889 .get("second_node")
890 .unwrap()
891 .properties
892 .get("property_id")
893 .unwrap()
894 .value
895 .as_deref(),
896 Some("hello again")
897 );
898
899 Ok(())
900 }
901
902 #[tokio::test]
903 async fn emits_appropriate_events() -> Result<(), Box<dyn std::error::Error>> {
904 let (controller, _requests_rx) = make_test_controller();
905
906 assert_eq!(connect(&controller).await?, vec![Event::Connected]);
908
909 assert_eq!(
911 publish(&controller, "base_topic/device_id/$homie", "4.0").await?,
912 vec![Event::DeviceUpdated {
913 device_id: "device_id".to_owned(),
914 has_required_attributes: false
915 }]
916 );
917 assert_eq!(
918 publish(&controller, "base_topic/device_id/$name", "Device name").await?,
919 vec![Event::DeviceUpdated {
920 device_id: "device_id".to_owned(),
921 has_required_attributes: false
922 }]
923 );
924 assert_eq!(
925 publish(&controller, "base_topic/device_id/$state", "ready").await?,
926 vec![Event::DeviceUpdated {
927 device_id: "device_id".to_owned(),
928 has_required_attributes: true
929 }]
930 );
931 let mut expected_device = Device::new("device_id", "4.0");
932 expected_device.state = State::Ready;
933 expected_device.name = Some("Device name".to_owned());
934 assert_eq!(
935 controller.devices().get("device_id").unwrap().to_owned(),
936 expected_device
937 );
938
939 assert_eq!(
941 publish(&controller, "base_topic/device_id/$nodes", "node_id").await?,
942 vec![Event::DeviceUpdated {
943 device_id: "device_id".to_owned(),
944 has_required_attributes: false
945 }]
946 );
947 assert_eq!(
948 publish(
949 &controller,
950 "base_topic/device_id/node_id/$name",
951 "Node name"
952 )
953 .await?,
954 vec![Event::NodeUpdated {
955 device_id: "device_id".to_owned(),
956 node_id: "node_id".to_owned(),
957 has_required_attributes: false
958 }]
959 );
960 assert_eq!(
961 publish(
962 &controller,
963 "base_topic/device_id/node_id/$type",
964 "Node type"
965 )
966 .await?,
967 vec![Event::NodeUpdated {
968 device_id: "device_id".to_owned(),
969 node_id: "node_id".to_owned(),
970 has_required_attributes: false
971 }]
972 );
973
974 assert_eq!(
976 publish(
977 &controller,
978 "base_topic/device_id/node_id/$properties",
979 "property_id"
980 )
981 .await?,
982 vec![Event::NodeUpdated {
983 device_id: "device_id".to_owned(),
984 node_id: "node_id".to_owned(),
985 has_required_attributes: false
986 }]
987 );
988 assert_eq!(
989 publish(
990 &controller,
991 "base_topic/device_id/node_id/property_id/$name",
992 "Property name"
993 )
994 .await?,
995 vec![Event::PropertyUpdated {
996 device_id: "device_id".to_owned(),
997 node_id: "node_id".to_owned(),
998 property_id: "property_id".to_owned(),
999 has_required_attributes: false
1000 }]
1001 );
1002 assert_eq!(
1003 publish(
1004 &controller,
1005 "base_topic/device_id/node_id/property_id/$datatype",
1006 "integer"
1007 )
1008 .await?,
1009 vec![Event::PropertyUpdated {
1010 device_id: "device_id".to_owned(),
1011 node_id: "node_id".to_owned(),
1012 property_id: "property_id".to_owned(),
1013 has_required_attributes: true
1014 }]
1015 );
1016
1017 Ok(())
1018 }
1019
1020 #[tokio::test]
1021 async fn constructs_device_tree() -> Result<(), Box<dyn std::error::Error>> {
1022 let (controller, _requests_rx) = make_test_controller();
1023
1024 connect(&controller).await?;
1027 publish(&controller, "base_topic/device_id/$homie", "4.0").await?;
1028 publish(&controller, "base_topic/device_id/$name", "Device name").await?;
1029 publish(&controller, "base_topic/device_id/$state", "ready").await?;
1030 publish(&controller, "base_topic/device_id/$nodes", "node_id").await?;
1031
1032 publish(
1033 &controller,
1034 "base_topic/device_id/node_id/$name",
1035 "Node name",
1036 )
1037 .await?;
1038 publish(
1039 &controller,
1040 "base_topic/device_id/node_id/$type",
1041 "Node type",
1042 )
1043 .await?;
1044 publish(
1045 &controller,
1046 "base_topic/device_id/node_id/$properties",
1047 "property_id",
1048 )
1049 .await?;
1050
1051 publish(
1052 &controller,
1053 "base_topic/device_id/node_id/property_id/$name",
1054 "Property name",
1055 )
1056 .await?;
1057 publish(
1058 &controller,
1059 "base_topic/device_id/node_id/property_id/$datatype",
1060 "integer",
1061 )
1062 .await?;
1063
1064 let expected_property = Property {
1065 name: Some("Property name".to_owned()),
1066 datatype: Some(Datatype::Integer),
1067 ..Property::new("property_id")
1068 };
1069 let expected_node = Node {
1070 name: Some("Node name".to_owned()),
1071 node_type: Some("Node type".to_owned()),
1072 properties: property_set(vec![expected_property]),
1073 ..Node::new("node_id")
1074 };
1075 let expected_device = Device {
1076 name: Some("Device name".to_owned()),
1077 state: State::Ready,
1078 nodes: node_set(vec![expected_node]),
1079 ..Device::new("device_id", "4.0")
1080 };
1081
1082 assert_eq!(
1083 controller.devices().get("device_id").unwrap().to_owned(),
1084 expected_device
1085 );
1086
1087 Ok(())
1088 }
1089}