tether_agent/agent/
mod.rs

1use ::anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::Serialize;
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11use crate::{
12    three_part_topic::{TetherOrCustomTopic, ThreePartTopic},
13    PlugDefinition, PlugDefinitionCommon,
14};
15
16const TIMEOUT_SECONDS: u64 = 3;
17const DEFAULT_USERNAME: &str = "tether";
18const DEFAULT_PASSWORD: &str = "sp_ceB0ss!";
19
20pub struct TetherAgent {
21    role: String,
22    id: String,
23    host: String,
24    port: u16,
25    protocol: String,
26    username: String,
27    password: String,
28    base_path: String,
29    mqtt_client_id: Option<String>,
30    pub(crate) client: Option<Client>,
31    message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
32    message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
33    is_connected: Arc<Mutex<bool>>,
34}
35
36#[derive(Clone)]
37pub struct TetherAgentOptionsBuilder {
38    role: String,
39    id: Option<String>,
40    protocol: Option<String>,
41    host: Option<String>,
42    port: Option<u16>,
43    username: Option<String>,
44    password: Option<String>,
45    base_path: Option<String>,
46    auto_connect: bool,
47    mqtt_client_id: Option<String>,
48}
49
50impl TetherAgentOptionsBuilder {
51    /// Initialise Tether Options struct with default options; call other methods to customise.
52    /// Call `build()` to get the actual TetherAgent instance (and connect automatically, by default)
53    pub fn new(role: &str) -> Self {
54        TetherAgentOptionsBuilder {
55            role: String::from(role),
56            id: None,
57            protocol: None,
58            host: None,
59            port: None,
60            username: None,
61            password: None,
62            base_path: None,
63            auto_connect: true,
64            mqtt_client_id: None,
65        }
66    }
67
68    /// Optionally sets the **Tether ID**, as used in auto-generating topics such as `myRole/myID/myPlug` _not_ the MQTT Client ID.
69    /// Provide Some(value) to override or None to use the default `any` (when publishing) or `+` when subscribing.
70    pub fn id(mut self, id: Option<&str>) -> Self {
71        self.id = id.map(|x| x.into());
72        self
73    }
74
75    /// Provide Some(value) to override or None to use default
76    pub fn protocol(mut self, protocol: Option<&str>) -> Self {
77        self.protocol = protocol.map(|x| x.into());
78        self
79    }
80
81    /// Optionally set the **MQTT Client ID** used when connecting to the MQTT broker. This is _not_ the same as the **Tether ID**
82    /// used for auto-generating topics.
83    ///
84    /// By default we use a UUID for this value, in order to avoid hard-to-debug issues where Tether Agent instances share
85    /// the same Client ID and therefore events/messages are not handled properly by all instances.
86    pub fn mqtt_client_id(mut self, client_id: Option<&str>) -> Self {
87        self.mqtt_client_id = client_id.map(|x| x.into());
88        self
89    }
90
91    /// Provide Some(value) to override or None to use default
92    pub fn host(mut self, host: Option<&str>) -> Self {
93        self.host = host.map(|x| x.into());
94        self
95    }
96
97    pub fn port(mut self, port: Option<u16>) -> Self {
98        self.port = port;
99        self
100    }
101
102    /// Provide Some(value) to override or None to use default
103    pub fn username(mut self, username: Option<&str>) -> Self {
104        self.username = username.map(|x| x.into());
105        self
106    }
107
108    /// Provide Some(value) to override or None to use default
109    pub fn password(mut self, password: Option<&str>) -> Self {
110        self.password = password.map(|x| x.into());
111        self
112    }
113
114    /// Provide Some(value) to override or None to use default
115    pub fn base_path(mut self, base_path: Option<&str>) -> Self {
116        self.base_path = base_path.map(|x| x.into());
117        self
118    }
119
120    pub fn auto_connect(mut self, should_auto_connect: bool) -> Self {
121        self.auto_connect = should_auto_connect;
122        self
123    }
124
125    pub fn build(self) -> anyhow::Result<TetherAgent> {
126        let protocol = self.protocol.clone().unwrap_or("mqtt".into());
127        let host = self.host.clone().unwrap_or("localhost".into());
128        let port = self.port.unwrap_or(1883);
129        let username = self.username.unwrap_or(DEFAULT_USERNAME.into());
130        let password = self.password.unwrap_or(DEFAULT_PASSWORD.into());
131        let base_path = self.base_path.unwrap_or("/".into());
132
133        debug!(
134            "final build uses options protocol = {}, host = {}, port = {}",
135            protocol, host, port
136        );
137
138        let (message_sender, message_receiver) = mpsc::channel::<(TetherOrCustomTopic, Vec<u8>)>();
139
140        let mut agent = TetherAgent {
141            role: self.role.clone(),
142            id: self.id.clone().unwrap_or("any".into()),
143            host,
144            port,
145            username,
146            password,
147            protocol,
148            base_path,
149            client: None,
150            message_sender,
151            message_receiver,
152            mqtt_client_id: self.mqtt_client_id,
153            is_connected: Arc::new(Mutex::new(false)),
154        };
155
156        if self.auto_connect {
157            match agent.connect() {
158                Ok(()) => Ok(agent),
159                Err(e) => Err(e),
160            }
161        } else {
162            warn!("Auto-connect disabled; you must call .connect explicitly");
163            Ok(agent)
164        }
165    }
166}
167
168impl TetherAgent {
169    pub fn is_connected(&self) -> bool {
170        self.client.is_some()
171    }
172
173    pub fn role(&self) -> &str {
174        &self.role
175    }
176
177    pub fn id(&self) -> &str {
178        &self.id
179    }
180
181    /// Returns the Agent Role, ID (group), Broker URI
182    pub fn description(&self) -> (String, String, String) {
183        (
184            String::from(&self.role),
185            String::from(&self.id),
186            self.broker_uri(),
187        )
188    }
189
190    /// Return the URI (protocol, IP address, port, path) that
191    /// was used to connect to the MQTT broker
192    pub fn broker_uri(&self) -> String {
193        format!(
194            "{}://{}:{}{}",
195            &self.protocol, self.host, self.port, self.base_path
196        )
197    }
198
199    pub fn set_role(&mut self, role: &str) {
200        self.role = role.into();
201    }
202
203    pub fn set_id(&mut self, id: &str) {
204        self.id = id.into();
205    }
206
207    /// Self must be mutable in order to create and assign new Client (with Connection)
208    pub fn connect(&mut self) -> anyhow::Result<()> {
209        info!(
210            "Make new connection to the MQTT server at {}://{}:{}...",
211            self.protocol, self.host, self.port
212        );
213
214        let mqtt_client_id = self
215            .mqtt_client_id
216            .clone()
217            .unwrap_or(Uuid::new_v4().to_string());
218
219        debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
220
221        let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
222            .set_credentials(&self.username, &self.password)
223            .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
224            .to_owned();
225
226        match self.protocol.as_str() {
227            "mqtts" => {
228                // Use rustls-native-certs to load root certificates from the operating system.
229                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
230                root_cert_store.add_parsable_certificates(
231                    rustls_native_certs::load_native_certs()
232                        .expect("could not load platform certs"),
233                );
234
235                let client_config = ClientConfig::builder()
236                    .with_root_certificates(root_cert_store)
237                    .with_no_client_auth();
238                mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
239            }
240            "wss" => {
241                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
242                // into the URL!
243                let full_host = format!(
244                    "{}://{}:{}{}",
245                    self.protocol, self.host, self.port, self.base_path
246                );
247                debug!("WSS using full host URL: {}", &full_host);
248                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
249                    .set_credentials(&self.username, &self.password)
250                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
251                    .to_owned();
252
253                // Use rustls-native-certs to load root certificates from the operating system.
254                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
255                root_cert_store.add_parsable_certificates(
256                    rustls_native_certs::load_native_certs()
257                        .expect("could not load platform certs"),
258                );
259
260                let client_config = ClientConfig::builder()
261                    .with_root_certificates(root_cert_store)
262                    .with_no_client_auth();
263                mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
264            }
265            "ws" => {
266                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
267                // into the URL!
268                let full_host = format!(
269                    "{}://{}:{}{}",
270                    self.protocol, self.host, self.port, self.base_path
271                );
272                debug!("WS using full host URL: {}", &full_host);
273
274                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
275                    .set_credentials(&self.username, &self.password)
276                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
277                    .to_owned();
278
279                mqtt_options.set_transport(Transport::Ws);
280            }
281            _ => {}
282        };
283
284        // Create the client connection
285        let (client, mut connection) = Client::new(mqtt_options, 10);
286
287        let message_tx = self.message_sender.clone();
288
289        let connection_state = Arc::clone(&self.is_connected);
290
291        thread::spawn(move || {
292            for event in connection.iter() {
293                match event {
294                    Ok(e) => match e {
295                        Event::Incoming(incoming) => match incoming {
296                            Packet::ConnAck(_) => {
297                                info!("(Connected) ConnAck received!");
298                                let mut is_c =
299                                    connection_state.lock().expect("failed to lock mutex");
300                                *is_c = true;
301                            }
302                            Packet::Publish(p) => {
303                                debug!("Incoming Publish packet (message received), {:?}", &p);
304                                let topic = p.topic;
305                                let payload: Vec<u8> = p.payload.into();
306                                if let Ok(t) = ThreePartTopic::try_from(topic.as_str()) {
307                                    message_tx
308                                        .send((TetherOrCustomTopic::Tether(t), payload))
309                                        .expect(
310                                        "failed to push message from thread; three-part-topic OK",
311                                    );
312                                } else {
313                                    warn!("Could not parse Three Part Topic from \"{}\"", &topic);
314                                    message_tx
315                                        .send((TetherOrCustomTopic::Custom(topic), payload))
316                                        .expect("failed to push message from thread; custom topic");
317                                }
318                            }
319                            _ => debug!("Ignore all others for now, {:?}", incoming),
320                        },
321                        Event::Outgoing(outgoing) => {
322                            debug!("Ignore outgoing events, for now, {:?}", outgoing)
323                        }
324                    },
325                    Err(e) => {
326                        error!("Connection Error: {:?}", e);
327                        std::thread::sleep(Duration::from_secs(1));
328                        // connection_status_tx
329                        //     .send(Err(anyhow!("MQTT Connection error")))
330                        //     .expect("failed to push error message from thread");
331                    }
332                }
333            }
334        });
335
336        let mut is_ready = false;
337
338        while !is_ready {
339            debug!("Check whether connected...");
340            std::thread::sleep(Duration::from_millis(1));
341            trace!("Is ready? {}", is_ready);
342            let get_state = *self.is_connected.lock().expect("failed to lock mutex");
343            if get_state {
344                info!("Connection status confirmed");
345                is_ready = true;
346            } else {
347                debug!("Not connected yet...");
348            }
349        }
350
351        self.client = Some(client);
352
353        Ok(())
354    }
355
356    /// If a message is waiting return ThreePartTopic, Message (String, Message)
357    /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
358    /// the complete Topic string instead
359    pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
360        // if let Ok(e) = self.connection_status_receiver.try_recv() {
361        //     panic!("check_messages received error: {}", e);
362        // }
363        if let Ok(message) = self.message_receiver.try_recv() {
364            debug!("Message ready on queue");
365            Some(message)
366        } else {
367            None
368        }
369    }
370
371    /// Given a plug definition and a raw (u8 buffer) payload, generate a message
372    /// on an appropriate topic and with the QOS specified in the Plug Definition
373    pub fn publish(
374        &self,
375        plug_definition: &PlugDefinition,
376        payload: Option<&[u8]>,
377    ) -> anyhow::Result<()> {
378        match plug_definition {
379            PlugDefinition::InputPlug(_) => {
380                panic!("You cannot publish using an Input Plug")
381            }
382            PlugDefinition::OutputPlug(output_plug_definition) => {
383                let topic = output_plug_definition.topic_str();
384                let qos = match output_plug_definition.qos() {
385                    0 => QoS::AtMostOnce,
386                    1 => QoS::AtLeastOnce,
387                    2 => QoS::ExactlyOnce,
388                    _ => QoS::AtMostOnce,
389                };
390
391                if let Some(client) = &self.client {
392                    let res = client
393                        .publish(
394                            topic,
395                            qos,
396                            output_plug_definition.retain(),
397                            payload.unwrap_or_default(),
398                        )
399                        .map_err(anyhow::Error::msg);
400                    debug!("Published OK");
401                    res
402                } else {
403                    Err(anyhow!("Client not ready for publish"))
404                }
405            }
406        }
407    }
408
409    /// Similar to `publish` but serializes the data automatically before sending
410    pub fn encode_and_publish<T: Serialize>(
411        &self,
412        plug_definition: &PlugDefinition,
413        data: T,
414    ) -> anyhow::Result<()> {
415        match to_vec_named(&data) {
416            Ok(payload) => self.publish(plug_definition, Some(&payload)),
417            Err(e) => {
418                error!("Failed to encode: {e:?}");
419                Err(e.into())
420            }
421        }
422    }
423
424    pub fn publish_raw(
425        &self,
426        topic: &str,
427        payload: &[u8],
428        qos: Option<i32>,
429        retained: Option<bool>,
430    ) -> anyhow::Result<()> {
431        let qos = match qos.unwrap_or(1) {
432            0 => QoS::AtMostOnce,
433            1 => QoS::AtLeastOnce,
434            2 => QoS::ExactlyOnce,
435            _ => QoS::AtMostOnce,
436        };
437        if let Some(client) = &self.client {
438            client
439                .publish(topic, qos, retained.unwrap_or_default(), payload)
440                .map_err(anyhow::Error::msg)
441        } else {
442            Err(anyhow!("Client not ready for publish"))
443        }
444    }
445}