tether_agent/agent/mod.rs
1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::{Deserialize, Serialize};
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9use uuid::Uuid;
10
11pub mod builder;
12
13pub use builder::*;
14
15use crate::definitions::receiver_def_builder::ChannelReceiverDefBuilder;
16use crate::definitions::sender_def_builder::ChannelSenderDefBuilder;
17use crate::definitions::ChannelDefBuilder;
18use crate::definitions::{ChannelDef, ChannelReceiverDef, ChannelSenderDef};
19use crate::receiver::ChannelReceiver;
20use crate::sender::ChannelSender;
21use crate::tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic};
22
23const TIMEOUT_SECONDS: u64 = 3;
24
25/**
26A Tether Agent struct encapsulates everything required to set up a single
27"Agent" as part of your Tether-based system. The only thing absolutely required is
28a "role" - everything else is optional and sensible defaults will be used when
29not explicitly specified.
30
31By default, the Agent will connect (automatically) to an MQTT Broker on localhost:1883
32
33It will **not** have an ID, and therefore publishing/subscribing topics will not append anything
34this into the topic string when ChannelSender and ChannelReceiver instances are created using
35this Tether Agent instance, unless explicitly provided on creation.
36
37Note that you should typically not construct a new TetherAgent instance yourself; rather
38use the provided TetherAgentBuilder to specify any options you might need, and call
39.build to get a well-configured TetherAgent.
40*/
41pub struct TetherAgent {
42 role: String,
43 id: Option<String>,
44 host: String,
45 port: u16,
46 protocol: String,
47 username: String,
48 password: String,
49 base_path: String,
50 mqtt_client_id: Option<String>,
51 pub(crate) client: Option<Client>,
52 message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
53 pub message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
54 is_connected: Arc<Mutex<bool>>,
55 auto_connect_enabled: bool,
56}
57
58impl<'a, 'de> TetherAgent {
59 /// The simplest way to create a ChannelSender.
60 ///
61 /// You provide only a Channel Name;
62 /// configuration derived from your Tether Agent instance is used to construct
63 /// the appropriate publishing topics.
64 pub fn create_sender<T: Serialize>(&self, name: &str) -> ChannelSender<T> {
65 ChannelSender::new(ChannelSenderDefBuilder::new(name).build(self))
66 }
67
68 /// Create a ChannelSender instance using a ChannelSenderDefinition already constructed
69 /// elsewhere.
70 pub fn create_sender_with_def<T: Serialize>(
71 &self,
72 definition: ChannelSenderDef,
73 ) -> ChannelSender<T> {
74 ChannelSender::new(definition)
75 }
76
77 /// The simplest way to create a Channel Receiver.
78 ///
79 /// You provide only a Channel Name;
80 /// configuration derived from your Tether Agent instance is used to construct
81 /// the appropriate subscribing topics.
82 ///
83 /// The actual subscription is also initiated automatically.
84 pub fn create_receiver<T: Deserialize<'de>>(
85 &'a self,
86 name: &str,
87 ) -> anyhow::Result<ChannelReceiver<'de, T>> {
88 ChannelReceiver::new(self, ChannelReceiverDefBuilder::new(name).build(self))
89 }
90
91 /// Create a ChannelReceiver instance using a ChannelReceiverDefinition already constructed
92 /// elsewhere.
93 pub fn create_receiver_with_def<T: Deserialize<'a>>(
94 &'a self,
95 definition: ChannelReceiverDef,
96 ) -> anyhow::Result<ChannelReceiver<'a, T>> {
97 ChannelReceiver::new(self, definition)
98 }
99
100 pub fn is_connected(&self) -> bool {
101 self.client.is_some()
102 }
103
104 pub fn auto_connect_enabled(&self) -> bool {
105 self.auto_connect_enabled
106 }
107
108 pub fn role(&self) -> &str {
109 &self.role
110 }
111
112 pub fn id(&self) -> Option<&str> {
113 self.id.as_deref()
114 }
115
116 /// Returns the Agent Role, ID (group), Broker URI
117 pub fn description(&self) -> (String, String, String) {
118 (
119 String::from(&self.role),
120 match &self.id {
121 Some(id) => String::from(id),
122 None => String::from("any"),
123 },
124 self.broker_uri(),
125 )
126 }
127
128 /// Get the underlying MQTT Client directly, immutable.
129 /// WARNING: This allows you to do non-Tether-compliant things!
130 pub fn client(&self) -> Option<&Client> {
131 self.client.as_ref()
132 }
133
134 /// Get the underlying MQTT Client directly, mutably.
135 /// WARNING: This allows you to do non-Tether-compliant things!
136 ///
137 /// Can be useful for subscribing to a topic directly, for example,
138 /// without knowing the message type (as would be the case with a Tether Channel).
139 pub fn client_mut(&mut self) -> Option<&mut Client> {
140 self.client.as_mut()
141 }
142
143 /// Return the URI (protocol, IP address, port, path) that
144 /// was used to connect to the MQTT broker
145 pub fn broker_uri(&self) -> String {
146 format!(
147 "{}://{}:{}{}",
148 &self.protocol, self.host, self.port, self.base_path
149 )
150 }
151
152 /// Change the role, even if it was set before. Be careful _when_ you call this,
153 /// as it could affect any new Channel Senders/Receivers created after that point.
154 pub fn set_role(&mut self, role: &str) {
155 self.role = role.into();
156 }
157
158 /// Change the ID, even if it was set (or left empty) before.
159 /// Be careful _when_ you call this,
160 /// as it could affect any new Channel Senders/Receivers created after that point.
161 pub fn set_id(&mut self, id: &str) {
162 self.id = Some(id.into());
163 }
164
165 /// Use this function yourself **only if you explicitly disallowed auto connection**.
166 /// Otherwise, this function is called automatically as part of the `.build` process.
167 ///
168 /// This function spawns a separate thread for polling the MQTT broker. Any events
169 /// and messages are relayed via mpsc channels internally; for example, you will call
170 /// `.check_messages()` to see if any messages were received and are waiting to be parsed.
171 pub fn connect(&mut self) -> anyhow::Result<()> {
172 info!(
173 "Make new connection to the MQTT server at {}://{}:{}...",
174 self.protocol, self.host, self.port
175 );
176
177 let mqtt_client_id = self
178 .mqtt_client_id
179 .clone()
180 .unwrap_or(Uuid::new_v4().to_string());
181
182 debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
183
184 let mut mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &self.host, self.port)
185 .set_credentials(&self.username, &self.password)
186 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
187 .to_owned();
188
189 match self.protocol.as_str() {
190 "mqtts" => {
191 // Use rustls-native-certs to load root certificates from the operating system.
192 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
193 root_cert_store.add_parsable_certificates(
194 rustls_native_certs::load_native_certs()
195 .expect("could not load platform certs"),
196 );
197
198 let client_config = ClientConfig::builder()
199 .with_root_certificates(root_cert_store)
200 .with_no_client_auth();
201 mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
202 }
203 "wss" => {
204 // If using websocket protocol, rumqttc does NOT automatically add protocol and port
205 // into the URL!
206 let full_host = format!(
207 "{}://{}:{}{}",
208 self.protocol, self.host, self.port, self.base_path
209 );
210 debug!("WSS using full host URL: {}", &full_host);
211 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
212 .set_credentials(&self.username, &self.password)
213 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
214 .to_owned();
215
216 // Use rustls-native-certs to load root certificates from the operating system.
217 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
218 root_cert_store.add_parsable_certificates(
219 rustls_native_certs::load_native_certs()
220 .expect("could not load platform certs"),
221 );
222
223 let client_config = ClientConfig::builder()
224 .with_root_certificates(root_cert_store)
225 .with_no_client_auth();
226 mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
227 }
228 "ws" => {
229 // If using websocket protocol, rumqttc does NOT automatically add protocol and port
230 // into the URL!
231 let full_host = format!(
232 "{}://{}:{}{}",
233 self.protocol, self.host, self.port, self.base_path
234 );
235 debug!("WS using full host URL: {}", &full_host);
236
237 mqtt_options = MqttOptions::new(mqtt_client_id.clone(), &full_host, self.port) // here, port is ignored anyway
238 .set_credentials(&self.username, &self.password)
239 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
240 .to_owned();
241
242 mqtt_options.set_transport(Transport::Ws);
243 }
244 _ => {}
245 };
246
247 // Create the client connection
248 let (client, mut connection) = Client::new(mqtt_options, 10);
249
250 let message_tx = self.message_sender.clone();
251
252 let connection_state = Arc::clone(&self.is_connected);
253
254 thread::spawn(move || {
255 for event in connection.iter() {
256 match event {
257 Ok(e) => {
258 match e {
259 Event::Incoming(incoming) => match incoming {
260 Packet::ConnAck(_) => {
261 info!("(Connected) ConnAck received!");
262 let mut is_c =
263 connection_state.lock().expect("failed to lock mutex");
264 *is_c = true;
265 }
266 Packet::Publish(p) => {
267 debug!("Incoming Publish packet (message received), {:?}", &p);
268 let topic = p.topic;
269 let payload: Vec<u8> = p.payload.into();
270 match TetherCompliantTopic::try_from(topic.as_str()) {
271 Ok(t) => {
272 message_tx
273 .send((TetherOrCustomTopic::Tether(t), payload))
274 .expect(
275 "failed to push message from thread; three-part-topic OK",
276 );
277 }
278 Err(e) => {
279 warn!(
280 "Could not parse Three Part Topic from \"{}\": {}",
281 &topic, e
282 );
283 message_tx
284 .send((TetherOrCustomTopic::Custom(topic), payload))
285 .expect("failed to push message from thread; custom topic");
286 }
287 }
288 }
289 _ => debug!("Ignore all others for now, {:?}", incoming),
290 },
291 Event::Outgoing(outgoing) => {
292 debug!("Ignore outgoing events, for now, {:?}", outgoing)
293 }
294 }
295 }
296 Err(e) => {
297 error!("Connection Error: {:?}", e);
298 std::thread::sleep(Duration::from_secs(1));
299 // connection_status_tx
300 // .send(Err(anyhow!("MQTT Connection error")))
301 // .expect("failed to push error message from thread");
302 }
303 }
304 }
305 });
306
307 let mut is_ready = false;
308
309 while !is_ready {
310 debug!("Check whether connected...");
311 std::thread::sleep(Duration::from_millis(1));
312 trace!("Is ready? {}", is_ready);
313 let get_state = *self.is_connected.lock().expect("failed to lock mutex");
314 if get_state {
315 info!("Connection status confirmed");
316 is_ready = true;
317 } else {
318 debug!("Not connected yet...");
319 }
320 }
321
322 self.client = Some(client);
323
324 Ok(())
325 }
326
327 /// If a message is waiting to be parsed by your application,
328 /// this function will return Topic, Message, i.e. `(TetherOrCustomTopic, Message)`
329 ///
330 /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
331 /// the complete Topic string instead
332 pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
333 // if let Ok(e) = self.connection_status_receiver.try_recv() {
334 // panic!("check_messages received error: {}", e);
335 // }
336 if let Ok(message) = self.message_receiver.try_recv() {
337 debug!("Message ready on queue");
338 Some(message)
339 } else {
340 None
341 }
342 }
343
344 /// Typically called via the Channel Sender itself.
345 ///
346 /// This function serializes the data (using Serde/MessagePack) automatically before publishing.
347 ///
348 /// Given a Channel Sender and serializeable data payload, publishes a message
349 /// using an appropriate topic and with the QOS specified in the Channel Definition.
350 ///
351 /// Note that this function requires that the data payload be the same type <T> as
352 /// the Channel Sender, so it will return an Error if the types do not match.
353 pub fn send<T: Serialize>(
354 &self,
355 channel_sender: &ChannelSender<T>,
356 data: &T,
357 ) -> anyhow::Result<()> {
358 match to_vec_named(&data) {
359 Ok(payload) => self.send_raw(channel_sender.definition(), Some(&payload)),
360 Err(e) => {
361 error!("Failed to encode: {e:?}");
362 Err(e.into())
363 }
364 }
365 }
366
367 /// Typically called via the Channel Sender itself.
368 ///
369 /// Unlike .send, this function does NOT serialize the data before publishing. It therefore
370 /// does no type checking of the payload.
371 ///
372 /// Given a Channel Sender and a raw (u8 buffer) payload, publishes a message
373 /// using an appropriate topic and with the QOS specified in the Channel Definition
374 pub fn send_raw(
375 &self,
376 channel_definition: &ChannelSenderDef,
377 payload: Option<&[u8]>,
378 ) -> anyhow::Result<()> {
379 let topic = channel_definition.generated_topic();
380 let qos = channel_definition.qos();
381
382 if let Some(client) = &self.client {
383 let res = client
384 .publish(
385 topic,
386 qos,
387 channel_definition.retain(),
388 payload.unwrap_or_default(),
389 )
390 .map_err(anyhow::Error::msg);
391 debug!("Published OK");
392 res
393 } else {
394 Err(anyhow!("Client not ready for publish"))
395 }
396 }
397
398 pub fn send_empty(&self, channel_definition: &ChannelSenderDef) -> anyhow::Result<()> {
399 self.send_raw(channel_definition, None)
400 }
401
402 /// Publish an already-encoded payload using a provided
403 /// **full topic string** - no need for passing a ChannelSender or
404 /// ChannelSenderDefinition reference.
405 ///
406 /// **WARNING:** This is a back door to using MQTT directly, without any
407 /// guarrantees of correctedness in a Tether-based system!
408 pub fn publish_raw(
409 &self,
410 topic: &str,
411 payload: &[u8],
412 qos: Option<i32>,
413 retained: Option<bool>,
414 ) -> anyhow::Result<()> {
415 let qos = match qos.unwrap_or(1) {
416 0 => QoS::AtMostOnce,
417 1 => QoS::AtLeastOnce,
418 2 => QoS::ExactlyOnce,
419 _ => QoS::AtMostOnce,
420 };
421 if let Some(client) = &self.client {
422 client
423 .publish(topic, qos, retained.unwrap_or_default(), payload)
424 .map_err(anyhow::Error::msg)
425 } else {
426 Err(anyhow!("Client not ready for publish"))
427 }
428 }
429}
430
431// impl From<u8> for rumqttc::QoS {
432// fn from(value: u8) -> Self {
433// match value {
434// 0 => rumqttc::QoS::AtMostOnce,
435// 1 => rumqttc::QoS::AtLeastOnce,
436// 2 => rumqttc::QoS::ExactlyOnce,
437// _ => rumqttc::QoS::AtMostOnce,
438// }
439// }
440// }