1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
use super::{DocumentEvent, WatcherHandle};
use crate::{backend::WatcherCommand, hash_str, WatcherError};
use rumqttc::{AsyncClient, ConnectionError, QoS};
use std::{collections::HashMap, time::Duration};
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub struct MqttPublishEvent {
pub topic: String,
pub payload: String,
pub duplicate: bool,
pub retain: bool,
pub qos: QoS,
}
#[derive(Debug)]
pub enum MqttClientEvent {
Connect,
Disconnect,
Stop,
PublishMessage(MqttPublishEvent),
Error(ConnectionError),
}
pub fn run_mqtt_watcher(
mqttoptions: rumqttc::MqttOptions,
config_topic: &str,
channel_size: usize,
) -> Result<(WatcherHandle, mpsc::Receiver<DocumentEvent>), WatcherError> {
let (event_sender, receiver) = mpsc::channel(channel_size);
let (mqtt_client, mut eventloop) = AsyncClient::new(mqttoptions, channel_size);
let (command_sender, mut command_receiver) = mpsc::channel(1);
let config_topic = format!("{}/#", config_topic.trim_end_matches('/'));
let handle = tokio::task::spawn(async move {
// Wait for a start command before we begin
match command_receiver.recv().await {
Some(WatcherCommand::Stop) | None => {
// Exit early if Stop command is received or channel is closed
log::info!("Watcher received stop command before starting or channel closed");
return Ok(());
}
_ => {}
}
let mut hashes: HashMap<String, u64> = HashMap::new();
loop {
tokio::select! {
poll_res = eventloop.poll() => {
match poll_res {
Ok(event) => match event {
rumqttc::Event::Incoming(rumqttc::Packet::Publish(p)) => {
let topic = p.topic;
if p.payload.is_empty() {
// deleted topic
if hashes.remove(&topic).is_some() {
event_sender
.send(DocumentEvent::DocumentRemoved(topic))
.await
.unwrap();
}
} else {
// published new or updated content
let content = match String::from_utf8(p.payload.to_vec()) {
Ok(payload) => payload,
Err(err) => {
log::warn!(
"Cannot parse mqtt payload for topic [{}] to string. Error: {}",
topic,
err
);
continue;
}
};
let new_hash = hash_str(&content);
if let Some(existing_hash) = hashes.get(&topic) {
// File exists: Check if the hash has changed
if existing_hash != &new_hash {
// Content changed: Update the hash and emit `ContentChanged`
hashes.insert(topic.clone(), new_hash);
event_sender
.send(DocumentEvent::ContentChanged(topic, content))
.await
.unwrap();
}
} else {
// File does not exist in `file_hashes`: It's a new file
hashes.insert(topic.clone(), new_hash);
event_sender
.send(DocumentEvent::NewDocument(topic, content))
.await
.unwrap();
}
}
}
rumqttc::Event::Incoming(rumqttc::Incoming::ConnAck(_)) => {
log::debug!("HOMIE: Connected");
// subscribe to config topic
mqtt_client
.subscribe(&config_topic, rumqttc::QoS::ExactlyOnce)
.await?;
}
rumqttc::Event::Outgoing(rumqttc::Outgoing::Disconnect) => {
log::debug!("HOMIE: Connection closed from our side.",);
break;
}
_ => {}
},
Err(err) => {
log::error!("Error connecting mqtt. {:#?}", err);
tokio::time::sleep(Duration::from_secs(5)).await;
}
};
},
// Check for control commands
Some(command) = command_receiver.recv() => {
if let WatcherCommand::Stop = command {
log::info!("Watcher received stop command");
break;
}
}
};
}
log::debug!("Exiting mqtt config watcher eventloop...");
Ok(())
});
Ok((
WatcherHandle {
handle: Some(handle),
command_sender,
},
receiver,
))
}