1use ::anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::Serialize;
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11use crate::{
12 three_part_topic::{TetherOrCustomTopic, ThreePartTopic},
13 PlugDefinition, PlugDefinitionCommon,
14};
15
16const TIMEOUT_SECONDS: u64 = 3;
17const DEFAULT_USERNAME: &str = "tether";
18const DEFAULT_PASSWORD: &str = "sp_ceB0ss!";
19
20pub struct TetherAgent {
21 role: String,
22 id: String,
23 host: String,
24 port: u16,
25 protocol: String,
26 username: String,
27 password: String,
28 base_path: String,
29 mqtt_client_id: Option<String>,
30 pub(crate) client: Option<Client>,
31 message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
32 message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
33 is_connected: Arc<Mutex<bool>>,
34}
35
36#[derive(Clone)]
37pub struct TetherAgentOptionsBuilder {
38 role: String,
39 id: Option<String>,
40 protocol: Option<String>,
41 host: Option<String>,
42 port: Option<u16>,
43 username: Option<String>,
44 password: Option<String>,
45 base_path: Option<String>,
46 auto_connect: bool,
47 mqtt_client_id: Option<String>,
48}
49
50impl TetherAgentOptionsBuilder {
51 pub fn new(role: &str) -> Self {
54 TetherAgentOptionsBuilder {
55 role: String::from(role),
56 id: None,
57 protocol: None,
58 host: None,
59 port: None,
60 username: None,
61 password: None,
62 base_path: None,
63 auto_connect: true,
64 mqtt_client_id: None,
65 }
66 }
67
68 pub fn id(mut self, id: Option<&str>) -> Self {
71 self.id = id.map(|x| x.into());
72 self
73 }
74
75 pub fn protocol(mut self, protocol: Option<&str>) -> Self {
77 self.protocol = protocol.map(|x| x.into());
78 self
79 }
80
81 pub fn mqtt_client_id(mut self, client_id: Option<&str>) -> Self {
87 self.mqtt_client_id = client_id.map(|x| x.into());
88 self
89 }
90
91 pub fn host(mut self, host: Option<&str>) -> Self {
93 self.host = host.map(|x| x.into());
94 self
95 }
96
97 pub fn port(mut self, port: Option<u16>) -> Self {
98 self.port = port;
99 self
100 }
101
102 pub fn username(mut self, username: Option<&str>) -> Self {
104 self.username = username.map(|x| x.into());
105 self
106 }
107
108 pub fn password(mut self, password: Option<&str>) -> Self {
110 self.password = password.map(|x| x.into());
111 self
112 }
113
114 pub fn base_path(mut self, base_path: Option<&str>) -> Self {
116 self.base_path = base_path.map(|x| x.into());
117 self
118 }
119
120 pub fn auto_connect(mut self, should_auto_connect: bool) -> Self {
121 self.auto_connect = should_auto_connect;
122 self
123 }
124
125 pub fn build(self) -> anyhow::Result<TetherAgent> {
126 let protocol = self.protocol.clone().unwrap_or("mqtt".into());
127 let host = self.host.clone().unwrap_or("localhost".into());
128 let port = self.port.unwrap_or(1883);
129 let username = self.username.unwrap_or(DEFAULT_USERNAME.into());
130 let password = self.password.unwrap_or(DEFAULT_PASSWORD.into());
131 let base_path = self.base_path.unwrap_or("/".into());
132
133 debug!(
134 "final build uses options protocol = {}, host = {}, port = {}",
135 protocol, host, port
136 );
137
138 let (message_sender, message_receiver) = mpsc::channel::<(TetherOrCustomTopic, Vec<u8>)>();
139
140 let mut agent = TetherAgent {
141 role: self.role.clone(),
142 id: self.id.clone().unwrap_or("any".into()),
143 host,
144 port,
145 username,
146 password,
147 protocol,
148 base_path,
149 client: None,
150 message_sender,
151 message_receiver,
152 mqtt_client_id: self.mqtt_client_id,
153 is_connected: Arc::new(Mutex::new(false)),
154 };
155
156 if self.auto_connect {
157 match agent.connect() {
158 Ok(()) => Ok(agent),
159 Err(e) => Err(e),
160 }
161 } else {
162 warn!("Auto-connect disabled; you must call .connect explicitly");
163 Ok(agent)
164 }
165 }
166}
167
168impl TetherAgent {
169 pub fn is_connected(&self) -> bool {
170 self.client.is_some()
171 }
172
173 pub fn role(&self) -> &str {
174 &self.role
175 }
176
177 pub fn id(&self) -> &str {
178 &self.id
179 }
180
181 pub fn description(&self) -> (String, String, String) {
183 (
184 String::from(&self.role),
185 String::from(&self.id),
186 self.broker_uri(),
187 )
188 }
189
190 pub fn broker_uri(&self) -> String {
193 format!(
194 "{}://{}:{}{}",
195 &self.protocol, self.host, self.port, self.base_path
196 )
197 }
198
199 pub fn set_role(&mut self, role: &str) {
200 self.role = role.into();
201 }
202
203 pub fn set_id(&mut self, id: &str) {
204 self.id = id.into();
205 }
206
207 pub fn connect(&mut self) -> anyhow::Result<()> {
209 info!(
210 "Make new connection to the MQTT server at {}://{}:{}...",
211 self.protocol, self.host, self.port
212 );
213
214 let mqtt_client_id = self
215 .mqtt_client_id
216 .clone()
217 .unwrap_or(Uuid::new_v4().to_string());
218
219 debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
220
221 let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
222 .set_credentials(&self.username, &self.password)
223 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
224 .to_owned();
225
226 match self.protocol.as_str() {
227 "mqtts" => {
228 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
230 root_cert_store.add_parsable_certificates(
231 rustls_native_certs::load_native_certs()
232 .expect("could not load platform certs"),
233 );
234
235 let client_config = ClientConfig::builder()
236 .with_root_certificates(root_cert_store)
237 .with_no_client_auth();
238 mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
239 }
240 "wss" => {
241 let full_host = format!(
244 "{}://{}:{}{}",
245 self.protocol, self.host, self.port, self.base_path
246 );
247 debug!("WSS using full host URL: {}", &full_host);
248 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) .set_credentials(&self.username, &self.password)
250 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
251 .to_owned();
252
253 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
255 root_cert_store.add_parsable_certificates(
256 rustls_native_certs::load_native_certs()
257 .expect("could not load platform certs"),
258 );
259
260 let client_config = ClientConfig::builder()
261 .with_root_certificates(root_cert_store)
262 .with_no_client_auth();
263 mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
264 }
265 "ws" => {
266 let full_host = format!(
269 "{}://{}:{}{}",
270 self.protocol, self.host, self.port, self.base_path
271 );
272 debug!("WS using full host URL: {}", &full_host);
273
274 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) .set_credentials(&self.username, &self.password)
276 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
277 .to_owned();
278
279 mqtt_options.set_transport(Transport::Ws);
280 }
281 _ => {}
282 };
283
284 let (client, mut connection) = Client::new(mqtt_options, 10);
286
287 let message_tx = self.message_sender.clone();
288
289 let connection_state = Arc::clone(&self.is_connected);
290
291 thread::spawn(move || {
292 for event in connection.iter() {
293 match event {
294 Ok(e) => match e {
295 Event::Incoming(incoming) => match incoming {
296 Packet::ConnAck(_) => {
297 info!("(Connected) ConnAck received!");
298 let mut is_c =
299 connection_state.lock().expect("failed to lock mutex");
300 *is_c = true;
301 }
302 Packet::Publish(p) => {
303 debug!("Incoming Publish packet (message received), {:?}", &p);
304 let topic = p.topic;
305 let payload: Vec<u8> = p.payload.into();
306 if let Ok(t) = ThreePartTopic::try_from(topic.as_str()) {
307 message_tx
308 .send((TetherOrCustomTopic::Tether(t), payload))
309 .expect(
310 "failed to push message from thread; three-part-topic OK",
311 );
312 } else {
313 warn!("Could not parse Three Part Topic from \"{}\"", &topic);
314 message_tx
315 .send((TetherOrCustomTopic::Custom(topic), payload))
316 .expect("failed to push message from thread; custom topic");
317 }
318 }
319 _ => debug!("Ignore all others for now, {:?}", incoming),
320 },
321 Event::Outgoing(outgoing) => {
322 debug!("Ignore outgoing events, for now, {:?}", outgoing)
323 }
324 },
325 Err(e) => {
326 error!("Connection Error: {:?}", e);
327 std::thread::sleep(Duration::from_secs(1));
328 }
332 }
333 }
334 });
335
336 let mut is_ready = false;
337
338 while !is_ready {
339 debug!("Check whether connected...");
340 std::thread::sleep(Duration::from_millis(1));
341 trace!("Is ready? {}", is_ready);
342 let get_state = *self.is_connected.lock().expect("failed to lock mutex");
343 if get_state {
344 info!("Connection status confirmed");
345 is_ready = true;
346 } else {
347 debug!("Not connected yet...");
348 }
349 }
350
351 self.client = Some(client);
352
353 Ok(())
354 }
355
356 pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
360 if let Ok(message) = self.message_receiver.try_recv() {
364 debug!("Message ready on queue");
365 Some(message)
366 } else {
367 None
368 }
369 }
370
371 pub fn publish(
374 &self,
375 plug_definition: &PlugDefinition,
376 payload: Option<&[u8]>,
377 ) -> anyhow::Result<()> {
378 match plug_definition {
379 PlugDefinition::InputPlug(_) => {
380 panic!("You cannot publish using an Input Plug")
381 }
382 PlugDefinition::OutputPlug(output_plug_definition) => {
383 let topic = output_plug_definition.topic_str();
384 let qos = match output_plug_definition.qos() {
385 0 => QoS::AtMostOnce,
386 1 => QoS::AtLeastOnce,
387 2 => QoS::ExactlyOnce,
388 _ => QoS::AtMostOnce,
389 };
390
391 if let Some(client) = &self.client {
392 let res = client
393 .publish(
394 topic,
395 qos,
396 output_plug_definition.retain(),
397 payload.unwrap_or_default(),
398 )
399 .map_err(anyhow::Error::msg);
400 debug!("Published OK");
401 res
402 } else {
403 Err(anyhow!("Client not ready for publish"))
404 }
405 }
406 }
407 }
408
409 pub fn encode_and_publish<T: Serialize>(
411 &self,
412 plug_definition: &PlugDefinition,
413 data: T,
414 ) -> anyhow::Result<()> {
415 match to_vec_named(&data) {
416 Ok(payload) => self.publish(plug_definition, Some(&payload)),
417 Err(e) => {
418 error!("Failed to encode: {e:?}");
419 Err(e.into())
420 }
421 }
422 }
423
424 pub fn publish_raw(
425 &self,
426 topic: &str,
427 payload: &[u8],
428 qos: Option<i32>,
429 retained: Option<bool>,
430 ) -> anyhow::Result<()> {
431 let qos = match qos.unwrap_or(1) {
432 0 => QoS::AtMostOnce,
433 1 => QoS::AtLeastOnce,
434 2 => QoS::ExactlyOnce,
435 _ => QoS::AtMostOnce,
436 };
437 if let Some(client) = &self.client {
438 client
439 .publish(topic, qos, retained.unwrap_or_default(), payload)
440 .map_err(anyhow::Error::msg)
441 } else {
442 Err(anyhow!("Client not ready for publish"))
443 }
444 }
445}