tether_agent/agent/
mod.rs

1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::{Deserialize, Serialize};
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9
10pub mod builder;
11
12pub use builder::*;
13
14use crate::definitions::receiver_def_builder::ChannelReceiverDefBuilder;
15use crate::definitions::sender_def_builder::ChannelSenderDefBuilder;
16use crate::definitions::ChannelDefBuilder;
17use crate::definitions::{ChannelDef, ChannelReceiverDef, ChannelSenderDef};
18use crate::receiver::ChannelReceiver;
19use crate::sender::ChannelSender;
20use crate::tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic};
21
22const TIMEOUT_SECONDS: u64 = 3;
23
24#[derive(Clone, Debug)]
25pub struct AgentConfig {
26    pub role: String,
27    pub id: Option<String>,
28    pub host: String,
29    pub port: u16,
30    pub protocol: String,
31    pub username: String,
32    pub password: String,
33    pub url_base_path: String,
34    pub mqtt_client_id: String,
35    pub auto_connect_enabled: bool,
36}
37
38/**
39A Tether Agent struct encapsulates everything required to set up a single
40"Agent" as part of your Tether-based system. The only thing absolutely required is
41a "role" - everything else is optional and sensible defaults will be used when
42not explicitly specified.
43
44By default, the Agent will connect (automatically) to an MQTT Broker on localhost:1883
45
46It will **not** have an ID, and therefore publishing/subscribing topics will not append anything
47this into the topic string when ChannelSender and ChannelReceiver instances are created using
48this Tether Agent instance, unless explicitly provided on creation.
49
50Note that you should typically not construct a new TetherAgent instance yourself; rather
51use the provided TetherAgentBuilder to specify any options you might need, and call
52.build to get a well-configured TetherAgent.
53*/
54pub struct TetherAgent {
55    config: AgentConfig,
56    pub(crate) client: Option<Client>,
57    message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
58    pub message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
59    is_connected: Arc<Mutex<bool>>,
60}
61
62impl<'a, 'de> TetherAgent {
63    /// The simplest way to create a ChannelSender.
64    ///
65    /// You provide only a Channel Name;
66    /// configuration derived from your Tether Agent instance is used to construct
67    /// the appropriate publishing topics.
68    pub fn create_sender<T: Serialize>(&self, name: &str) -> ChannelSender<T> {
69        ChannelSender::new(ChannelSenderDefBuilder::new(name).build(&self.config))
70    }
71
72    /// Create a ChannelSender instance using a ChannelSenderDefinition already constructed
73    /// elsewhere.
74    pub fn create_sender_with_def<T: Serialize>(
75        &self,
76        definition: ChannelSenderDef,
77    ) -> ChannelSender<T> {
78        ChannelSender::new(definition)
79    }
80
81    /// The simplest way to create a Channel Receiver.
82    ///
83    /// You provide only a Channel Name;
84    /// configuration derived from your Tether Agent instance is used to construct
85    /// the appropriate subscribing topics.
86    ///
87    /// The actual subscription is also initiated automatically.
88    pub fn create_receiver<T: Deserialize<'de>>(
89        &'a self,
90        name: &str,
91    ) -> anyhow::Result<ChannelReceiver<'de, T>> {
92        ChannelReceiver::new(
93            self,
94            ChannelReceiverDefBuilder::new(name).build(&self.config),
95        )
96    }
97
98    /// Create a ChannelReceiver instance using a ChannelReceiverDefinition already constructed
99    /// elsewhere.
100    pub fn create_receiver_with_def<T: Deserialize<'a>>(
101        &'a self,
102        definition: ChannelReceiverDef,
103    ) -> anyhow::Result<ChannelReceiver<'a, T>> {
104        ChannelReceiver::new(self, definition)
105    }
106
107    pub fn is_connected(&self) -> bool {
108        self.client.is_some()
109    }
110
111    pub fn config(&self) -> &AgentConfig {
112        &self.config
113    }
114
115    // pub fn auto_connect_enabled(&self) -> bool {
116    //     self.auto_connect_enabled
117    // }
118
119    // pub fn role(&self) -> &str {
120    //     &self.role
121    // }
122
123    // pub fn id(&self) -> Option<&str> {
124    //     self.id.as_deref()
125    // }
126
127    // /// Returns the Agent Role, ID (group), Broker URI
128    // pub fn description(&self) -> (String, String, String) {
129    //     (
130    //         String::from(&self.role),
131    //         match &self.id {
132    //             Some(id) => String::from(id),
133    //             None => String::from("any"),
134    //         },
135    //         self.broker_uri(),
136    //     )
137    // }
138
139    /// Get the underlying MQTT Client directly, immutable.
140    /// WARNING: This allows you to do non-Tether-compliant things!
141    pub fn client(&self) -> Option<&Client> {
142        self.client.as_ref()
143    }
144
145    /// Get the underlying MQTT Client directly, mutably.
146    /// WARNING: This allows you to do non-Tether-compliant things!
147    ///
148    /// Can be useful for subscribing to a topic directly, for example,
149    /// without knowing the message type (as would be the case with a Tether Channel).
150    pub fn client_mut(&mut self) -> Option<&mut Client> {
151        self.client.as_mut()
152    }
153
154    /// Return the URI (protocol, IP address, port, path) that
155    /// was used to connect to the MQTT broker
156    pub fn broker_uri(&self) -> String {
157        format!(
158            "{}://{}:{}{}",
159            &self.config.protocol, self.config.host, self.config.port, self.config.url_base_path
160        )
161    }
162
163    /// Change the role, even if it was set before. Be careful _when_ you call this,
164    /// as it could affect any new Channel Senders/Receivers created after that point.
165    pub fn set_role(&mut self, role: &str) {
166        self.config.role = role.into();
167    }
168
169    /// Change the ID, even if it was set (or left empty) before.
170    /// Be careful _when_ you call this,
171    /// as it could affect any new Channel Senders/Receivers created after that point.
172    pub fn set_id(&mut self, id: &str) {
173        self.config.id = Some(id.into());
174    }
175
176    /// Use this function yourself **only if you explicitly disallowed auto connection**.
177    /// Otherwise, this function is called automatically as part of the `.build` process.
178    ///
179    /// This function spawns a separate thread for polling the MQTT broker. Any events
180    /// and messages are relayed via mpsc channels internally; for example, you will call
181    /// `.check_messages()` to see if any messages were received and are waiting to be parsed.
182    pub fn connect(&mut self) -> anyhow::Result<()> {
183        info!(
184            "Make new connection to the MQTT server at {}://{}:{}...",
185            self.config.protocol, self.config.host, self.config.port
186        );
187
188        let mqtt_client_id = self.config.mqtt_client_id.clone();
189
190        debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
191
192        let mut mqtt_options =
193            MqttOptions::new(mqtt_client_id.clone(), &self.config.host, self.config.port)
194                .set_credentials(&self.config.username, &self.config.password)
195                .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
196                .to_owned();
197
198        match self.config.protocol.as_str() {
199            "mqtts" => {
200                // Use rustls-native-certs to load root certificates from the operating system.
201                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
202                root_cert_store.add_parsable_certificates(
203                    rustls_native_certs::load_native_certs()
204                        .expect("could not load platform certs"),
205                );
206
207                let client_config = ClientConfig::builder()
208                    .with_root_certificates(root_cert_store)
209                    .with_no_client_auth();
210                mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
211            }
212            "wss" => {
213                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
214                // into the URL!
215                let full_host = format!(
216                    "{}://{}:{}{}",
217                    self.config.protocol,
218                    self.config.host,
219                    self.config.port,
220                    self.config.url_base_path
221                );
222                debug!("WSS using full host URL: {}", &full_host);
223                mqtt_options = MqttOptions::new(
224                    mqtt_client_id.clone(),
225                    &full_host,
226                    self.config.port,
227                ) // here, port is ignored anyway
228                .set_credentials(&self.config.username, &self.config.password)
229                .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
230                .to_owned();
231
232                // Use rustls-native-certs to load root certificates from the operating system.
233                let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
234                root_cert_store.add_parsable_certificates(
235                    rustls_native_certs::load_native_certs()
236                        .expect("could not load platform certs"),
237                );
238
239                let client_config = ClientConfig::builder()
240                    .with_root_certificates(root_cert_store)
241                    .with_no_client_auth();
242                mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
243            }
244            "ws" => {
245                // If using websocket protocol, rumqttc does NOT automatically add protocol and port
246                // into the URL!
247                let full_host = format!(
248                    "{}://{}:{}{}",
249                    self.config.protocol,
250                    self.config.host,
251                    self.config.port,
252                    self.config.url_base_path
253                );
254                debug!("WS using full host URL: {}", &full_host);
255
256                mqtt_options = MqttOptions::new(
257                    mqtt_client_id.clone(),
258                    &full_host,
259                    self.config.port,
260                ) // here, port is ignored anyway
261                .set_credentials(&self.config.username, &self.config.password)
262                .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
263                .to_owned();
264
265                mqtt_options.set_transport(Transport::Ws);
266            }
267            _ => {}
268        };
269
270        // Create the client connection
271        let (client, mut connection) = Client::new(mqtt_options, 10);
272
273        let message_tx = self.message_sender.clone();
274
275        let connection_state = Arc::clone(&self.is_connected);
276
277        thread::spawn(move || {
278            for event in connection.iter() {
279                match event {
280                    Ok(e) => {
281                        match e {
282                            Event::Incoming(incoming) => match incoming {
283                                Packet::ConnAck(_) => {
284                                    info!("(Connected) ConnAck received!");
285                                    let mut is_c =
286                                        connection_state.lock().expect("failed to lock mutex");
287                                    *is_c = true;
288                                }
289                                Packet::Publish(p) => {
290                                    debug!("Incoming Publish packet (message received), {:?}", &p);
291                                    let topic = p.topic;
292                                    let payload: Vec<u8> = p.payload.into();
293                                    match TetherCompliantTopic::try_from(topic.as_str()) {
294                                        Ok(t) => {
295                                            message_tx
296                                            .send((TetherOrCustomTopic::Tether(t), payload))
297                                            .expect(
298                                            "failed to push message from thread; three-part-topic OK",
299                                        );
300                                        }
301                                        Err(e) => {
302                                            warn!(
303                                                "Could not parse Three Part Topic from \"{}\": {}",
304                                                &topic, e
305                                            );
306                                            message_tx
307                                        .send((TetherOrCustomTopic::Custom(topic), payload))
308                                        .expect("failed to push message from thread; custom topic");
309                                        }
310                                    }
311                                }
312                                _ => debug!("Ignore all others for now, {:?}", incoming),
313                            },
314                            Event::Outgoing(outgoing) => {
315                                debug!("Ignore outgoing events, for now, {:?}", outgoing)
316                            }
317                        }
318                    }
319                    Err(e) => {
320                        error!("Connection Error: {:?}", e);
321                        std::thread::sleep(Duration::from_secs(1));
322                        // connection_status_tx
323                        //     .send(Err(anyhow!("MQTT Connection error")))
324                        //     .expect("failed to push error message from thread");
325                    }
326                }
327            }
328        });
329
330        let mut is_ready = false;
331
332        while !is_ready {
333            debug!("Check whether connected...");
334            std::thread::sleep(Duration::from_millis(1));
335            trace!("Is ready? {}", is_ready);
336            let get_state = *self.is_connected.lock().expect("failed to lock mutex");
337            if get_state {
338                info!("Connection status confirmed");
339                is_ready = true;
340            } else {
341                debug!("Not connected yet...");
342            }
343        }
344
345        self.client = Some(client);
346
347        Ok(())
348    }
349
350    /// If a message is waiting to be parsed by your application,
351    /// this function will return Topic, Message, i.e. `(TetherOrCustomTopic, Message)`
352    ///
353    /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
354    /// the complete Topic string instead
355    pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
356        // if let Ok(e) = self.connection_status_receiver.try_recv() {
357        //     panic!("check_messages received error: {}", e);
358        // }
359        if let Ok(message) = self.message_receiver.try_recv() {
360            debug!("Message ready on queue");
361            Some(message)
362        } else {
363            None
364        }
365    }
366
367    /// Typically called via the Channel Sender itself.
368    ///
369    /// This function serializes the data (using Serde/MessagePack) automatically before publishing.
370    ///
371    /// Given a Channel Sender and serializeable data payload, publishes a message
372    /// using an appropriate topic and with the QOS specified in the Channel Definition.
373    ///
374    /// Note that this function requires that the data payload be the same type <T> as
375    /// the Channel Sender, so it will return an Error if the types do not match.
376    pub fn send<T: Serialize>(
377        &self,
378        channel_sender: &ChannelSender<T>,
379        data: &T,
380    ) -> anyhow::Result<()> {
381        match to_vec_named(&data) {
382            Ok(payload) => self.send_raw(channel_sender.definition(), Some(&payload)),
383            Err(e) => {
384                error!("Failed to encode: {e:?}");
385                Err(e.into())
386            }
387        }
388    }
389
390    /// Typically called via the Channel Sender itself.
391    ///
392    /// Unlike .send, this function does NOT serialize the data before publishing. It therefore
393    /// does no type checking of the payload.
394    ///
395    /// Given a Channel Sender and a raw (u8 buffer) payload, publishes a message
396    /// using an appropriate topic and with the QOS specified in the Channel Definition
397    pub fn send_raw(
398        &self,
399        channel_definition: &ChannelSenderDef,
400        payload: Option<&[u8]>,
401    ) -> anyhow::Result<()> {
402        let topic = channel_definition.generated_topic();
403        let qos = channel_definition.qos();
404
405        if let Some(client) = &self.client {
406            let res = client
407                .publish(
408                    topic,
409                    qos,
410                    channel_definition.retain(),
411                    payload.unwrap_or_default(),
412                )
413                .map_err(anyhow::Error::msg);
414            debug!("Published OK");
415            res
416        } else {
417            Err(anyhow!("Client not ready for publish"))
418        }
419    }
420
421    pub fn send_empty(&self, channel_definition: &ChannelSenderDef) -> anyhow::Result<()> {
422        self.send_raw(channel_definition, None)
423    }
424
425    /// Publish an already-encoded payload using a provided
426    /// **full topic string** - no need for passing a ChannelSender or
427    /// ChannelSenderDefinition reference.
428    ///
429    /// **WARNING:** This is a back door to using MQTT directly, without any
430    /// guarrantees of correctedness in a Tether-based system!
431    pub fn publish_raw(
432        &self,
433        topic: &str,
434        payload: &[u8],
435        qos: Option<i32>,
436        retained: Option<bool>,
437    ) -> anyhow::Result<()> {
438        let qos = match qos.unwrap_or(1) {
439            0 => QoS::AtMostOnce,
440            1 => QoS::AtLeastOnce,
441            2 => QoS::ExactlyOnce,
442            _ => QoS::AtMostOnce,
443        };
444        if let Some(client) = &self.client {
445            client
446                .publish(topic, qos, retained.unwrap_or_default(), payload)
447                .map_err(anyhow::Error::msg)
448        } else {
449            Err(anyhow!("Client not ready for publish"))
450        }
451    }
452}
453
454// impl From<u8> for rumqttc::QoS {
455//     fn from(value: u8) -> Self {
456//         match value {
457//             0 => rumqttc::QoS::AtMostOnce,
458//             1 => rumqttc::QoS::AtLeastOnce,
459//             2 => rumqttc::QoS::ExactlyOnce,
460//             _ => rumqttc::QoS::AtMostOnce,
461//         }
462//     }
463// }