tether_agent/agent/
mod.rs

1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::{Deserialize, Serialize};
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11pub mod builder;
12
13pub use builder::*;
14
15use crate::definitions::receiver_def_builder::ChannelReceiverDefBuilder;
16use crate::definitions::sender_def_builder::ChannelSenderDefBuilder;
17use crate::definitions::ChannelDefBuilder;
18use crate::definitions::{ChannelDef, ChannelReceiverDef, ChannelSenderDef};
19use crate::receiver::ChannelReceiver;
20use crate::sender::ChannelSender;
21use crate::tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic};
22
23const TIMEOUT_SECONDS: u64 = 3;
24
25/**
26A Tether Agent struct encapsulates everything required to set up a single
27"Agent" as part of your Tether-based system. The only thing absolutely required is
28a "role" - everything else is optional and sensible defaults will be used when
29not explicitly specified.
30
31By default, the Agent will connect (automatically) to an MQTT Broker on localhost:1883
32
33It will **not** have an ID, and therefore publishing/subscribing topics will not append anything
34this into the topic string when ChannelSender and ChannelReceiver instances are created using
35this Tether Agent instance, unless explicitly provided on creation.
36
37Note that you should typically not construct a new TetherAgent instance yourself; rather
38use the provided TetherAgentBuilder to specify any options you might need, and call
39.build to get a well-configured TetherAgent.
40*/
41pub struct TetherAgent {
42    role: String,
43    id: Option<String>,
44    host: String,
45    port: u16,
46    protocol: String,
47    username: String,
48    password: String,
49    base_path: String,
50    mqtt_client_id: Option<String>,
51    pub(crate) client: Option<Client>,
52    message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
53    pub message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
54    is_connected: Arc<Mutex<bool>>,
55    auto_connect_enabled: bool,
56}
57
58impl<'a, 'de> TetherAgent {
59    /// The simplest way to create a ChannelSender.
60    ///
61    /// You provide only a Channel Name;
62    /// configuration derived from your Tether Agent instance is used to construct
63    /// the appropriate publishing topics.
64    pub fn create_sender<T: Serialize>(&self, name: &str) -> ChannelSender<T> {
65        ChannelSender::new(ChannelSenderDefBuilder::new(name).build(self))
66    }
67
68    /// Create a ChannelSender instance using a ChannelSenderDefinition already constructed
69    /// elsewhere.
70    pub fn create_sender_with_def<T: Serialize>(
71        &self,
72        definition: ChannelSenderDef,
73    ) -> ChannelSender<T> {
74        ChannelSender::new(definition)
75    }
76
77    /// The simplest way to create a Channel Receiver.
78    ///
79    /// You provide only a Channel Name;
80    /// configuration derived from your Tether Agent instance is used to construct
81    /// the appropriate subscribing topics.
82    ///
83    /// The actual subscription is also initiated automatically.
84    pub fn create_receiver<T: Deserialize<'de>>(
85        &'a self,
86        name: &str,
87    ) -> anyhow::Result<ChannelReceiver<'de, T>> {
88        ChannelReceiver::new(self, ChannelReceiverDefBuilder::new(name).build(self))
89    }
90
91    /// Create a ChannelReceiver instance using a ChannelReceiverDefinition already constructed
92    /// elsewhere.
93    pub fn create_receiver_with_def<T: Deserialize<'a>>(
94        &'a self,
95        definition: ChannelReceiverDef,
96    ) -> anyhow::Result<ChannelReceiver<'a, T>> {
97        ChannelReceiver::new(self, definition)
98    }
99
100    pub fn is_connected(&self) -> bool {
101        self.client.is_some()
102    }
103
104    pub fn auto_connect_enabled(&self) -> bool {
105        self.auto_connect_enabled
106    }
107
108    pub fn role(&self) -> &str {
109        &self.role
110    }
111
112    pub fn id(&self) -> Option<&str> {
113        self.id.as_deref()
114    }
115
116    /// Returns the Agent Role, ID (group), Broker URI
117    pub fn description(&self) -> (String, String, String) {
118        (
119            String::from(&self.role),
120            match &self.id {
121                Some(id) => String::from(id),
122                None => String::from("any"),
123            },
124            self.broker_uri(),
125        )
126    }
127
128    /// Get the underlying MQTT Client directly, immutable.
129    /// WARNING: This allows you to do non-Tether-compliant things!
130    pub fn client(&self) -> Option<&Client> {
131        self.client.as_ref()
132    }
133
134    /// Get the underlying MQTT Client directly, mutably.
135    /// WARNING: This allows you to do non-Tether-compliant things!
136    ///
137    /// Can be useful for subscribing to a topic directly, for example,
138    /// without knowing the message type (as would be the case with a Tether Channel).
139    pub fn client_mut(&mut self) -> Option<&mut Client> {
140        self.client.as_mut()
141    }
142
143    /// Return the URI (protocol, IP address, port, path) that
144    /// was used to connect to the MQTT broker
145    pub fn broker_uri(&self) -> String {
146        format!(
147            "{}://{}:{}{}",
148            &self.protocol, self.host, self.port, self.base_path
149        )
150    }
151
152    /// Change the role, even if it was set before. Be careful _when_ you call this,
153    /// as it could affect any new Channel Senders/Receivers created after that point.
154    pub fn set_role(&mut self, role: &str) {
155        self.role = role.into();
156    }
157
158    /// Change the ID, even if it was set (or left empty) before.
159    /// Be careful _when_ you call this,
160    /// as it could affect any new Channel Senders/Receivers created after that point.
161    pub fn set_id(&mut self, id: &str) {
162        self.id = Some(id.into());
163    }
164
165    /// Use this function yourself **only if you explicitly disallowed auto connection**.
166    /// Otherwise, this function is called automatically as part of the `.build` process.
167    ///
168    /// This function spawns a separate thread for polling the MQTT broker. Any events
169    /// and messages are relayed via mpsc channels internally; for example, you will call
170    /// `.check_messages()` to see if any messages were received and are waiting to be parsed.
171    pub fn connect(&mut self) -> anyhow::Result<()> {
172        info!(
173            "Make new connection to the MQTT server at {}://{}:{}...",
174            self.protocol, self.host, self.port
175        );
176
177        let mqtt_client_id = self
178            .mqtt_client_id
179            .clone()
180            .unwrap_or(Uuid::new_v4().to_string());
181
182        debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
183
184        let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
185            .set_credentials(&self.username, &self.password)
186            .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
187            .to_owned();
188
189        match self.protocol.as_str() {
190            "mqtts" => {
191                // Use rustls-native-certs to load root certificates from the operating system.
192                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
193                root_cert_store.add_parsable_certificates(
194                    rustls_native_certs::load_native_certs()
195                        .expect("could not load platform certs"),
196                );
197
198                let client_config = ClientConfig::builder()
199                    .with_root_certificates(root_cert_store)
200                    .with_no_client_auth();
201                mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
202            }
203            "wss" => {
204                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
205                // into the URL!
206                let full_host = format!(
207                    "{}://{}:{}{}",
208                    self.protocol, self.host, self.port, self.base_path
209                );
210                debug!("WSS using full host URL: {}", &full_host);
211                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
212                    .set_credentials(&self.username, &self.password)
213                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
214                    .to_owned();
215
216                // Use rustls-native-certs to load root certificates from the operating system.
217                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
218                root_cert_store.add_parsable_certificates(
219                    rustls_native_certs::load_native_certs()
220                        .expect("could not load platform certs"),
221                );
222
223                let client_config = ClientConfig::builder()
224                    .with_root_certificates(root_cert_store)
225                    .with_no_client_auth();
226                mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
227            }
228            "ws" => {
229                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
230                // into the URL!
231                let full_host = format!(
232                    "{}://{}:{}{}",
233                    self.protocol, self.host, self.port, self.base_path
234                );
235                debug!("WS using full host URL: {}", &full_host);
236
237                mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
238                    .set_credentials(&self.username, &self.password)
239                    .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
240                    .to_owned();
241
242                mqtt_options.set_transport(Transport::Ws);
243            }
244            _ => {}
245        };
246
247        // Create the client connection
248        let (client, mut connection) = Client::new(mqtt_options, 10);
249
250        let message_tx = self.message_sender.clone();
251
252        let connection_state = Arc::clone(&self.is_connected);
253
254        thread::spawn(move || {
255            for event in connection.iter() {
256                match event {
257                    Ok(e) => {
258                        match e {
259                            Event::Incoming(incoming) => match incoming {
260                                Packet::ConnAck(_) => {
261                                    info!("(Connected) ConnAck received!");
262                                    let mut is_c =
263                                        connection_state.lock().expect("failed to lock mutex");
264                                    *is_c = true;
265                                }
266                                Packet::Publish(p) => {
267                                    debug!("Incoming Publish packet (message received), {:?}", &p);
268                                    let topic = p.topic;
269                                    let payload: Vec<u8> = p.payload.into();
270                                    match TetherCompliantTopic::try_from(topic.as_str()) {
271                                        Ok(t) => {
272                                            message_tx
273                                            .send((TetherOrCustomTopic::Tether(t), payload))
274                                            .expect(
275                                            "failed to push message from thread; three-part-topic OK",
276                                        );
277                                        }
278                                        Err(e) => {
279                                            warn!(
280                                                "Could not parse Three Part Topic from \"{}\": {}",
281                                                &topic, e
282                                            );
283                                            message_tx
284                                        .send((TetherOrCustomTopic::Custom(topic), payload))
285                                        .expect("failed to push message from thread; custom topic");
286                                        }
287                                    }
288                                }
289                                _ => debug!("Ignore all others for now, {:?}", incoming),
290                            },
291                            Event::Outgoing(outgoing) => {
292                                debug!("Ignore outgoing events, for now, {:?}", outgoing)
293                            }
294                        }
295                    }
296                    Err(e) => {
297                        error!("Connection Error: {:?}", e);
298                        std::thread::sleep(Duration::from_secs(1));
299                        // connection_status_tx
300                        //     .send(Err(anyhow!("MQTT Connection error")))
301                        //     .expect("failed to push error message from thread");
302                    }
303                }
304            }
305        });
306
307        let mut is_ready = false;
308
309        while !is_ready {
310            debug!("Check whether connected...");
311            std::thread::sleep(Duration::from_millis(1));
312            trace!("Is ready? {}", is_ready);
313            let get_state = *self.is_connected.lock().expect("failed to lock mutex");
314            if get_state {
315                info!("Connection status confirmed");
316                is_ready = true;
317            } else {
318                debug!("Not connected yet...");
319            }
320        }
321
322        self.client = Some(client);
323
324        Ok(())
325    }
326
327    /// If a message is waiting to be parsed by your application,
328    /// this function will return Topic, Message, i.e. `(TetherOrCustomTopic, Message)`
329    ///
330    /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
331    /// the complete Topic string instead
332    pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
333        // if let Ok(e) = self.connection_status_receiver.try_recv() {
334        //     panic!("check_messages received error: {}", e);
335        // }
336        if let Ok(message) = self.message_receiver.try_recv() {
337            debug!("Message ready on queue");
338            Some(message)
339        } else {
340            None
341        }
342    }
343
344    /// Typically called via the Channel Sender itself.
345    ///
346    /// This function serializes the data (using Serde/MessagePack) automatically before publishing.
347    ///
348    /// Given a Channel Sender and serializeable data payload, publishes a message
349    /// using an appropriate topic and with the QOS specified in the Channel Definition.
350    ///
351    /// Note that this function requires that the data payload be the same type <T> as
352    /// the Channel Sender, so it will return an Error if the types do not match.
353    pub fn send<T: Serialize>(
354        &self,
355        channel_sender: &ChannelSender<T>,
356        data: &T,
357    ) -> anyhow::Result<()> {
358        match to_vec_named(&data) {
359            Ok(payload) => self.send_raw(channel_sender.definition(), Some(&payload)),
360            Err(e) => {
361                error!("Failed to encode: {e:?}");
362                Err(e.into())
363            }
364        }
365    }
366
367    /// Typically called via the Channel Sender itself.
368    ///
369    /// Unlike .send, this function does NOT serialize the data before publishing. It therefore
370    /// does no type checking of the payload.
371    ///
372    /// Given a Channel Sender and a raw (u8 buffer) payload, publishes a message
373    /// using an appropriate topic and with the QOS specified in the Channel Definition
374    pub fn send_raw(
375        &self,
376        channel_definition: &ChannelSenderDef,
377        payload: Option<&[u8]>,
378    ) -> anyhow::Result<()> {
379        let topic = channel_definition.generated_topic();
380        let qos = channel_definition.qos();
381
382        if let Some(client) = &self.client {
383            let res = client
384                .publish(
385                    topic,
386                    qos,
387                    channel_definition.retain(),
388                    payload.unwrap_or_default(),
389                )
390                .map_err(anyhow::Error::msg);
391            debug!("Published OK");
392            res
393        } else {
394            Err(anyhow!("Client not ready for publish"))
395        }
396    }
397
398    pub fn send_empty(&self, channel_definition: &ChannelSenderDef) -> anyhow::Result<()> {
399        self.send_raw(channel_definition, None)
400    }
401
402    /// Publish an already-encoded payload using a provided
403    /// **full topic string** - no need for passing a ChannelSender or
404    /// ChannelSenderDefinition reference.
405    ///
406    /// **WARNING:** This is a back door to using MQTT directly, without any
407    /// guarrantees of correctedness in a Tether-based system!
408    pub fn publish_raw(
409        &self,
410        topic: &str,
411        payload: &[u8],
412        qos: Option<i32>,
413        retained: Option<bool>,
414    ) -> anyhow::Result<()> {
415        let qos = match qos.unwrap_or(1) {
416            0 => QoS::AtMostOnce,
417            1 => QoS::AtLeastOnce,
418            2 => QoS::ExactlyOnce,
419            _ => QoS::AtMostOnce,
420        };
421        if let Some(client) = &self.client {
422            client
423                .publish(topic, qos, retained.unwrap_or_default(), payload)
424                .map_err(anyhow::Error::msg)
425        } else {
426            Err(anyhow!("Client not ready for publish"))
427        }
428    }
429}
430
431// impl From<u8> for rumqttc::QoS {
432//     fn from(value: u8) -> Self {
433//         match value {
434//             0 => rumqttc::QoS::AtMostOnce,
435//             1 => rumqttc::QoS::AtLeastOnce,
436//             2 => rumqttc::QoS::ExactlyOnce,
437//             _ => rumqttc::QoS::AtMostOnce,
438//         }
439//     }
440// }