1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::Serialize;
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11use crate::ChannelDefinition;
12use crate::{
13 tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic},
14 ChannelDefinitionCommon,
15};
16
17const TIMEOUT_SECONDS: u64 = 3;
18const DEFAULT_USERNAME: &str = "tether";
19const DEFAULT_PASSWORD: &str = "sp_ceB0ss!";
20
21pub struct TetherAgent {
22 role: String,
23 id: Option<String>,
24 host: String,
25 port: u16,
26 protocol: String,
27 username: String,
28 password: String,
29 base_path: String,
30 mqtt_client_id: Option<String>,
31 pub(crate) client: Option<Client>,
32 message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
33 message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
34 is_connected: Arc<Mutex<bool>>,
35 auto_connect_enabled: bool,
36}
37
38#[derive(Clone)]
39pub struct TetherAgentOptionsBuilder {
40 role: String,
41 id: Option<String>,
42 protocol: Option<String>,
43 host: Option<String>,
44 port: Option<u16>,
45 username: Option<String>,
46 password: Option<String>,
47 base_path: Option<String>,
48 auto_connect: bool,
49 mqtt_client_id: Option<String>,
50}
51
52impl TetherAgentOptionsBuilder {
53 pub fn new(role: &str) -> Self {
56 TetherAgentOptionsBuilder {
57 role: String::from(role),
58 id: None,
59 protocol: None,
60 host: None,
61 port: None,
62 username: None,
63 password: None,
64 base_path: None,
65 auto_connect: true,
66 mqtt_client_id: None,
67 }
68 }
69
70 pub fn id(mut self, id: Option<&str>) -> Self {
73 self.id = id.map(|x| x.into());
74 self
75 }
76
77 pub fn protocol(mut self, protocol: Option<&str>) -> Self {
79 self.protocol = protocol.map(|x| x.into());
80 self
81 }
82
83 pub fn mqtt_client_id(mut self, client_id: Option<&str>) -> Self {
89 self.mqtt_client_id = client_id.map(|x| x.into());
90 self
91 }
92
93 pub fn host(mut self, host: Option<&str>) -> Self {
95 self.host = host.map(|x| x.into());
96 self
97 }
98
99 pub fn port(mut self, port: Option<u16>) -> Self {
100 self.port = port;
101 self
102 }
103
104 pub fn username(mut self, username: Option<&str>) -> Self {
106 self.username = username.map(|x| x.into());
107 self
108 }
109
110 pub fn password(mut self, password: Option<&str>) -> Self {
112 self.password = password.map(|x| x.into());
113 self
114 }
115
116 pub fn base_path(mut self, base_path: Option<&str>) -> Self {
118 self.base_path = base_path.map(|x| x.into());
119 self
120 }
121
122 pub fn auto_connect(mut self, should_auto_connect: bool) -> Self {
123 self.auto_connect = should_auto_connect;
124 self
125 }
126
127 pub fn build(self) -> anyhow::Result<TetherAgent> {
128 let protocol = self.protocol.clone().unwrap_or("mqtt".into());
129 let host = self.host.clone().unwrap_or("localhost".into());
130 let port = self.port.unwrap_or(1883);
131 let username = self.username.unwrap_or(DEFAULT_USERNAME.into());
132 let password = self.password.unwrap_or(DEFAULT_PASSWORD.into());
133 let base_path = self.base_path.unwrap_or("/".into());
134
135 debug!(
136 "final build uses options protocol = {}, host = {}, port = {}",
137 protocol, host, port
138 );
139
140 let (message_sender, message_receiver) = mpsc::channel::<(TetherOrCustomTopic, Vec<u8>)>();
141
142 let mut agent = TetherAgent {
143 role: self.role.clone(),
144 id: self.id,
145 host,
146 port,
147 username,
148 password,
149 protocol,
150 base_path,
151 client: None,
152 message_sender,
153 message_receiver,
154 mqtt_client_id: self.mqtt_client_id,
155 is_connected: Arc::new(Mutex::new(false)),
156 auto_connect_enabled: self.auto_connect,
157 };
158
159 if self.auto_connect {
160 match agent.connect() {
161 Ok(()) => Ok(agent),
162 Err(e) => Err(e),
163 }
164 } else {
165 warn!("Auto-connect disabled; you must call .connect explicitly");
166 Ok(agent)
167 }
168 }
169}
170
171impl TetherAgent {
172 pub fn is_connected(&self) -> bool {
173 self.client.is_some()
174 }
175
176 pub fn auto_connect_enabled(&self) -> bool {
177 self.auto_connect_enabled
178 }
179
180 pub fn role(&self) -> &str {
181 &self.role
182 }
183
184 pub fn id(&self) -> Option<&str> {
185 self.id.as_deref()
186 }
187
188 pub fn description(&self) -> (String, String, String) {
190 (
191 String::from(&self.role),
192 match &self.id {
193 Some(id) => String::from(id),
194 None => String::from("any"),
195 },
196 self.broker_uri(),
197 )
198 }
199
200 pub fn broker_uri(&self) -> String {
203 format!(
204 "{}://{}:{}{}",
205 &self.protocol, self.host, self.port, self.base_path
206 )
207 }
208
209 pub fn set_role(&mut self, role: &str) {
210 self.role = role.into();
211 }
212
213 pub fn set_id(&mut self, id: &str) {
214 self.id = Some(id.into());
215 }
216
217 pub fn connect(&mut self) -> anyhow::Result<()> {
219 info!(
220 "Make new connection to the MQTT server at {}://{}:{}...",
221 self.protocol, self.host, self.port
222 );
223
224 let mqtt_client_id = self
225 .mqtt_client_id
226 .clone()
227 .unwrap_or(Uuid::new_v4().to_string());
228
229 debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
230
231 let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
232 .set_credentials(&self.username, &self.password)
233 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
234 .to_owned();
235
236 match self.protocol.as_str() {
237 "mqtts" => {
238 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
240 root_cert_store.add_parsable_certificates(
241 rustls_native_certs::load_native_certs()
242 .expect("could not load platform certs"),
243 );
244
245 let client_config = ClientConfig::builder()
246 .with_root_certificates(root_cert_store)
247 .with_no_client_auth();
248 mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
249 }
250 "wss" => {
251 let full_host = format!(
254 "{}://{}:{}{}",
255 self.protocol, self.host, self.port, self.base_path
256 );
257 debug!("WSS using full host URL: {}", &full_host);
258 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) .set_credentials(&self.username, &self.password)
260 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
261 .to_owned();
262
263 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
265 root_cert_store.add_parsable_certificates(
266 rustls_native_certs::load_native_certs()
267 .expect("could not load platform certs"),
268 );
269
270 let client_config = ClientConfig::builder()
271 .with_root_certificates(root_cert_store)
272 .with_no_client_auth();
273 mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
274 }
275 "ws" => {
276 let full_host = format!(
279 "{}://{}:{}{}",
280 self.protocol, self.host, self.port, self.base_path
281 );
282 debug!("WS using full host URL: {}", &full_host);
283
284 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) .set_credentials(&self.username, &self.password)
286 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
287 .to_owned();
288
289 mqtt_options.set_transport(Transport::Ws);
290 }
291 _ => {}
292 };
293
294 let (client, mut connection) = Client::new(mqtt_options, 10);
296
297 let message_tx = self.message_sender.clone();
298
299 let connection_state = Arc::clone(&self.is_connected);
300
301 thread::spawn(move || {
302 for event in connection.iter() {
303 match event {
304 Ok(e) => {
305 match e {
306 Event::Incoming(incoming) => match incoming {
307 Packet::ConnAck(_) => {
308 info!("(Connected) ConnAck received!");
309 let mut is_c =
310 connection_state.lock().expect("failed to lock mutex");
311 *is_c = true;
312 }
313 Packet::Publish(p) => {
314 debug!("Incoming Publish packet (message received), {:?}", &p);
315 let topic = p.topic;
316 let payload: Vec<u8> = p.payload.into();
317 match TetherCompliantTopic::try_from(topic.as_str()) {
318 Ok(t) => {
319 message_tx
320 .send((TetherOrCustomTopic::Tether(t), payload))
321 .expect(
322 "failed to push message from thread; three-part-topic OK",
323 );
324 }
325 Err(e) => {
326 warn!(
327 "Could not parse Three Part Topic from \"{}\": {}",
328 &topic, e
329 );
330 message_tx
331 .send((TetherOrCustomTopic::Custom(topic), payload))
332 .expect("failed to push message from thread; custom topic");
333 }
334 }
335 }
336 _ => debug!("Ignore all others for now, {:?}", incoming),
337 },
338 Event::Outgoing(outgoing) => {
339 debug!("Ignore outgoing events, for now, {:?}", outgoing)
340 }
341 }
342 }
343 Err(e) => {
344 error!("Connection Error: {:?}", e);
345 std::thread::sleep(Duration::from_secs(1));
346 }
350 }
351 }
352 });
353
354 let mut is_ready = false;
355
356 while !is_ready {
357 debug!("Check whether connected...");
358 std::thread::sleep(Duration::from_millis(1));
359 trace!("Is ready? {}", is_ready);
360 let get_state = *self.is_connected.lock().expect("failed to lock mutex");
361 if get_state {
362 info!("Connection status confirmed");
363 is_ready = true;
364 } else {
365 debug!("Not connected yet...");
366 }
367 }
368
369 self.client = Some(client);
370
371 Ok(())
372 }
373
374 pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
378 if let Ok(message) = self.message_receiver.try_recv() {
382 debug!("Message ready on queue");
383 Some(message)
384 } else {
385 None
386 }
387 }
388
389 pub fn send_raw(
394 &self,
395 channel_definition: &ChannelDefinition,
396 payload: Option<&[u8]>,
397 ) -> anyhow::Result<()> {
398 match channel_definition {
399 ChannelDefinition::ChannelReceiver(_) => {
400 panic!("You cannot publish using a Channel Receiver")
401 }
402 ChannelDefinition::ChannelSender(channel_sender_definition) => {
403 let topic = channel_sender_definition.generated_topic();
404 let qos = match channel_sender_definition.qos() {
405 0 => QoS::AtMostOnce,
406 1 => QoS::AtLeastOnce,
407 2 => QoS::ExactlyOnce,
408 _ => QoS::AtMostOnce,
409 };
410
411 if let Some(client) = &self.client {
412 let res = client
413 .publish(
414 topic,
415 qos,
416 channel_sender_definition.retain(),
417 payload.unwrap_or_default(),
418 )
419 .map_err(anyhow::Error::msg);
420 debug!("Published OK");
421 res
422 } else {
423 Err(anyhow!("Client not ready for publish"))
424 }
425 }
426 }
427 }
428
429 pub fn send<T: Serialize>(
434 &self,
435 channel_definition: &ChannelDefinition,
436 data: T,
437 ) -> anyhow::Result<()> {
438 match to_vec_named(&data) {
439 Ok(payload) => self.send_raw(channel_definition, Some(&payload)),
440 Err(e) => {
441 error!("Failed to encode: {e:?}");
442 Err(e.into())
443 }
444 }
445 }
446
447 pub fn send_empty(&self, channel_definition: &ChannelDefinition) -> anyhow::Result<()> {
448 self.send_raw(channel_definition, None)
449 }
450
451 pub fn publish_raw(
452 &self,
453 topic: &str,
454 payload: &[u8],
455 qos: Option<i32>,
456 retained: Option<bool>,
457 ) -> anyhow::Result<()> {
458 let qos = match qos.unwrap_or(1) {
459 0 => QoS::AtMostOnce,
460 1 => QoS::AtLeastOnce,
461 2 => QoS::ExactlyOnce,
462 _ => QoS::AtMostOnce,
463 };
464 if let Some(client) = &self.client {
465 client
466 .publish(topic, qos, retained.unwrap_or_default(), payload)
467 .map_err(anyhow::Error::msg)
468 } else {
469 Err(anyhow!("Client not ready for publish"))
470 }
471 }
472}