tether_agent/agent/
mod.rs

1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::Serialize;
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11use crate::ChannelDefinition;
12use crate::{
13    tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic},
14    ChannelDefinitionCommon,
15};
16
17const TIMEOUT_SECONDS: u64 = 3;
18const DEFAULT_USERNAME: &str = "tether";
19const DEFAULT_PASSWORD: &str = "sp_ceB0ss!";
20
21pub struct TetherAgent {
22    role: String,
23    id: Option<String>,
24    host: String,
25    port: u16,
26    protocol: String,
27    username: String,
28    password: String,
29    base_path: String,
30    mqtt_client_id: Option<String>,
31    pub(crate) client: Option<Client>,
32    message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
33    message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
34    is_connected: Arc<Mutex<bool>>,
35    auto_connect_enabled: bool,
36}
37
38#[derive(Clone)]
39pub struct TetherAgentOptionsBuilder {
40    role: String,
41    id: Option<String>,
42    protocol: Option<String>,
43    host: Option<String>,
44    port: Option<u16>,
45    username: Option<String>,
46    password: Option<String>,
47    base_path: Option<String>,
48    auto_connect: bool,
49    mqtt_client_id: Option<String>,
50}
51
52impl TetherAgentOptionsBuilder {
53    /// Initialise Tether Options struct with default options; call other methods to customise.
54    /// Call `build()` to get the actual TetherAgent instance (and connect automatically, by default)
55    pub fn new(role: &str) -> Self {
56        TetherAgentOptionsBuilder {
57            role: String::from(role),
58            id: None,
59            protocol: None,
60            host: None,
61            port: None,
62            username: None,
63            password: None,
64            base_path: None,
65            auto_connect: true,
66            mqtt_client_id: None,
67        }
68    }
69
70    /// Optionally sets the **Tether ID**, as used in auto-generating topics such as `myRole/myID/myChannel` _not_ the MQTT Client ID.
71    /// Provide Some(value) to override or None to use the default `any` (when publishing) or `+` when subscribing.
72    pub fn id(mut self, id: Option<&str>) -> Self {
73        self.id = id.map(|x| x.into());
74        self
75    }
76
77    /// Provide Some(value) to override or None to use default
78    pub fn protocol(mut self, protocol: Option<&str>) -> Self {
79        self.protocol = protocol.map(|x| x.into());
80        self
81    }
82
83    /// Optionally set the **MQTT Client ID** used when connecting to the MQTT broker. This is _not_ the same as the **Tether ID**
84    /// used for auto-generating topics.
85    ///
86    /// By default we use a UUID for this value, in order to avoid hard-to-debug issues where Tether Agent instances share
87    /// the same Client ID and therefore events/messages are not handled properly by all instances.
88    pub fn mqtt_client_id(mut self, client_id: Option<&str>) -> Self {
89        self.mqtt_client_id = client_id.map(|x| x.into());
90        self
91    }
92
93    /// Provide Some(value) to override or None to use default
94    pub fn host(mut self, host: Option<&str>) -> Self {
95        self.host = host.map(|x| x.into());
96        self
97    }
98
99    pub fn port(mut self, port: Option<u16>) -> Self {
100        self.port = port;
101        self
102    }
103
104    /// Provide Some(value) to override or None to use default
105    pub fn username(mut self, username: Option<&str>) -> Self {
106        self.username = username.map(|x| x.into());
107        self
108    }
109
110    /// Provide Some(value) to override or None to use default
111    pub fn password(mut self, password: Option<&str>) -> Self {
112        self.password = password.map(|x| x.into());
113        self
114    }
115
116    /// Provide Some(value) to override or None to use default
117    pub fn base_path(mut self, base_path: Option<&str>) -> Self {
118        self.base_path = base_path.map(|x| x.into());
119        self
120    }
121
122    pub fn auto_connect(mut self, should_auto_connect: bool) -> Self {
123        self.auto_connect = should_auto_connect;
124        self
125    }
126
127    pub fn build(self) -> anyhow::Result<TetherAgent> {
128        let protocol = self.protocol.clone().unwrap_or("mqtt".into());
129        let host = self.host.clone().unwrap_or("localhost".into());
130        let port = self.port.unwrap_or(1883);
131        let username = self.username.unwrap_or(DEFAULT_USERNAME.into());
132        let password = self.password.unwrap_or(DEFAULT_PASSWORD.into());
133        let base_path = self.base_path.unwrap_or("/".into());
134
135        debug!(
136            "final build uses options protocol = {}, host = {}, port = {}",
137            protocol, host, port
138        );
139
140        let (message_sender, message_receiver) = mpsc::channel::<(TetherOrCustomTopic, Vec<u8>)>();
141
142        let mut agent = TetherAgent {
143            role: self.role.clone(),
144            id: self.id,
145            host,
146            port,
147            username,
148            password,
149            protocol,
150            base_path,
151            client: None,
152            message_sender,
153            message_receiver,
154            mqtt_client_id: self.mqtt_client_id,
155            is_connected: Arc::new(Mutex::new(false)),
156            auto_connect_enabled: self.auto_connect,
157        };
158
159        if self.auto_connect {
160            match agent.connect() {
161                Ok(()) => Ok(agent),
162                Err(e) => Err(e),
163            }
164        } else {
165            warn!("Auto-connect disabled; you must call .connect explicitly");
166            Ok(agent)
167        }
168    }
169}
170
171impl TetherAgent {
172    pub fn is_connected(&self) -> bool {
173        self.client.is_some()
174    }
175
176    pub fn auto_connect_enabled(&self) -> bool {
177        self.auto_connect_enabled
178    }
179
180    pub fn role(&self) -> &str {
181        &self.role
182    }
183
184    pub fn id(&self) -> Option<&str> {
185        self.id.as_deref()
186    }
187
188    /// Returns the Agent Role, ID (group), Broker URI
189    pub fn description(&self) -> (String, String, String) {
190        (
191            String::from(&self.role),
192            match &self.id {
193                Some(id) => String::from(id),
194                None => String::from("any"),
195            },
196            self.broker_uri(),
197        )
198    }
199
200    /// Return the URI (protocol, IP address, port, path) that
201    /// was used to connect to the MQTT broker
202    pub fn broker_uri(&self) -> String {
203        format!(
204            "{}://{}:{}{}",
205            &self.protocol, self.host, self.port, self.base_path
206        )
207    }
208
209    pub fn set_role(&mut self, role: &str) {
210        self.role = role.into();
211    }
212
213    pub fn set_id(&mut self, id: &str) {
214        self.id = Some(id.into());
215    }
216
217    /// Self must be mutable in order to create and assign new Client (with Connection)
218    pub fn connect(&mut self) -> anyhow::Result<()> {
219        info!(
220            "Make new connection to the MQTT server at {}://{}:{}...",
221            self.protocol, self.host, self.port
222        );
223
224        let mqtt_client_id = self
225            .mqtt_client_id
226            .clone()
227            .unwrap_or(Uuid::new_v4().to_string());
228
229        debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
230
231        let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
232            .set_credentials(&self.username, &self.password)
233            .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
234            .to_owned();
235
236        match self.protocol.as_str() {
237            "mqtts" => {
238                // Use rustls-native-certs to load root certificates from the operating system.
239                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
240                root_cert_store.add_parsable_certificates(
241                    rustls_native_certs::load_native_certs()
242                        .expect("could not load platform certs"),
243                );
244
245                let client_config = ClientConfig::builder()
246                    .with_root_certificates(root_cert_store)
247                    .with_no_client_auth();
248                mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
249            }
250            "wss" => {
251                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
252                // into the URL!
253                let full_host = format!(
254                    "{}://{}:{}{}",
255                    self.protocol, self.host, self.port, self.base_path
256                );
257                debug!("WSS using full host URL: {}", &full_host);
258                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
259                    .set_credentials(&self.username, &self.password)
260                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
261                    .to_owned();
262
263                // Use rustls-native-certs to load root certificates from the operating system.
264                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
265                root_cert_store.add_parsable_certificates(
266                    rustls_native_certs::load_native_certs()
267                        .expect("could not load platform certs"),
268                );
269
270                let client_config = ClientConfig::builder()
271                    .with_root_certificates(root_cert_store)
272                    .with_no_client_auth();
273                mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
274            }
275            "ws" => {
276                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
277                // into the URL!
278                let full_host = format!(
279                    "{}://{}:{}{}",
280                    self.protocol, self.host, self.port, self.base_path
281                );
282                debug!("WS using full host URL: {}", &full_host);
283
284                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
285                    .set_credentials(&self.username, &self.password)
286                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
287                    .to_owned();
288
289                mqtt_options.set_transport(Transport::Ws);
290            }
291            _ => {}
292        };
293
294        // Create the client connection
295        let (client, mut connection) = Client::new(mqtt_options, 10);
296
297        let message_tx = self.message_sender.clone();
298
299        let connection_state = Arc::clone(&self.is_connected);
300
301        thread::spawn(move || {
302            for event in connection.iter() {
303                match event {
304                    Ok(e) => {
305                        match e {
306                            Event::Incoming(incoming) => match incoming {
307                                Packet::ConnAck(_) => {
308                                    info!("(Connected) ConnAck received!");
309                                    let mut is_c =
310                                        connection_state.lock().expect("failed to lock mutex");
311                                    *is_c = true;
312                                }
313                                Packet::Publish(p) => {
314                                    debug!("Incoming Publish packet (message received), {:?}", &p);
315                                    let topic = p.topic;
316                                    let payload: Vec<u8> = p.payload.into();
317                                    match TetherCompliantTopic::try_from(topic.as_str()) {
318                                        Ok(t) => {
319                                            message_tx
320                                            .send((TetherOrCustomTopic::Tether(t), payload))
321                                            .expect(
322                                            "failed to push message from thread; three-part-topic OK",
323                                        );
324                                        }
325                                        Err(e) => {
326                                            warn!(
327                                                "Could not parse Three Part Topic from \"{}\": {}",
328                                                &topic, e
329                                            );
330                                            message_tx
331                                        .send((TetherOrCustomTopic::Custom(topic), payload))
332                                        .expect("failed to push message from thread; custom topic");
333                                        }
334                                    }
335                                }
336                                _ => debug!("Ignore all others for now, {:?}", incoming),
337                            },
338                            Event::Outgoing(outgoing) => {
339                                debug!("Ignore outgoing events, for now, {:?}", outgoing)
340                            }
341                        }
342                    }
343                    Err(e) => {
344                        error!("Connection Error: {:?}", e);
345                        std::thread::sleep(Duration::from_secs(1));
346                        // connection_status_tx
347                        //     .send(Err(anyhow!("MQTT Connection error")))
348                        //     .expect("failed to push error message from thread");
349                    }
350                }
351            }
352        });
353
354        let mut is_ready = false;
355
356        while !is_ready {
357            debug!("Check whether connected...");
358            std::thread::sleep(Duration::from_millis(1));
359            trace!("Is ready? {}", is_ready);
360            let get_state = *self.is_connected.lock().expect("failed to lock mutex");
361            if get_state {
362                info!("Connection status confirmed");
363                is_ready = true;
364            } else {
365                debug!("Not connected yet...");
366            }
367        }
368
369        self.client = Some(client);
370
371        Ok(())
372    }
373
374    /// If a message is waiting return ThreePartTopic, Message (String, Message)
375    /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
376    /// the complete Topic string instead
377    pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
378        // if let Ok(e) = self.connection_status_receiver.try_recv() {
379        //     panic!("check_messages received error: {}", e);
380        // }
381        if let Ok(message) = self.message_receiver.try_recv() {
382            debug!("Message ready on queue");
383            Some(message)
384        } else {
385            None
386        }
387    }
388
389    /// Unlike .send, this function does NOT serialize the data before publishing.
390    ///
391    /// Given a channel definition and a raw (u8 buffer) payload, publishes a message
392    /// using an appropriate topic and with the QOS specified in the Channel Definition
393    pub fn send_raw(
394        &self,
395        channel_definition: &ChannelDefinition,
396        payload: Option<&[u8]>,
397    ) -> anyhow::Result<()> {
398        match channel_definition {
399            ChannelDefinition::ChannelReceiver(_) => {
400                panic!("You cannot publish using a Channel Receiver")
401            }
402            ChannelDefinition::ChannelSender(channel_sender_definition) => {
403                let topic = channel_sender_definition.generated_topic();
404                let qos = match channel_sender_definition.qos() {
405                    0 => QoS::AtMostOnce,
406                    1 => QoS::AtLeastOnce,
407                    2 => QoS::ExactlyOnce,
408                    _ => QoS::AtMostOnce,
409                };
410
411                if let Some(client) = &self.client {
412                    let res = client
413                        .publish(
414                            topic,
415                            qos,
416                            channel_sender_definition.retain(),
417                            payload.unwrap_or_default(),
418                        )
419                        .map_err(anyhow::Error::msg);
420                    debug!("Published OK");
421                    res
422                } else {
423                    Err(anyhow!("Client not ready for publish"))
424                }
425            }
426        }
427    }
428
429    /// Serializes the data automatically before publishing.
430    ///
431    /// Given a channel definition and serializeable data payload, publishes a message
432    /// using an appropriate topic and with the QOS specified in the Channel Definition
433    pub fn send<T: Serialize>(
434        &self,
435        channel_definition: &ChannelDefinition,
436        data: T,
437    ) -> anyhow::Result<()> {
438        match to_vec_named(&data) {
439            Ok(payload) => self.send_raw(channel_definition, Some(&payload)),
440            Err(e) => {
441                error!("Failed to encode: {e:?}");
442                Err(e.into())
443            }
444        }
445    }
446
447    pub fn send_empty(&self, channel_definition: &ChannelDefinition) -> anyhow::Result<()> {
448        self.send_raw(channel_definition, None)
449    }
450
451    pub fn publish_raw(
452        &self,
453        topic: &str,
454        payload: &[u8],
455        qos: Option<i32>,
456        retained: Option<bool>,
457    ) -> anyhow::Result<()> {
458        let qos = match qos.unwrap_or(1) {
459            0 => QoS::AtMostOnce,
460            1 => QoS::AtLeastOnce,
461            2 => QoS::ExactlyOnce,
462            _ => QoS::AtMostOnce,
463        };
464        if let Some(client) = &self.client {
465            client
466                .publish(topic, qos, retained.unwrap_or_default(), payload)
467                .map_err(anyhow::Error::msg)
468        } else {
469            Err(anyhow!("Client not ready for publish"))
470        }
471    }
472}