tether_agent/agent/mod.rs
1use anyhow::anyhow;
2use log::{debug, error, info, trace, warn};
3use rmp_serde::to_vec_named;
4use rumqttc::tokio_rustls::rustls::ClientConfig;
5use rumqttc::{Client, Event, MqttOptions, Packet, QoS, Transport};
6use serde::{Deserialize, Serialize};
7use std::sync::{Arc, Mutex};
8use std::{sync::mpsc, thread, time::Duration};
9
10pub mod builder;
11
12pub use builder::*;
13
14use crate::definitions::receiver_def_builder::ChannelReceiverDefBuilder;
15use crate::definitions::sender_def_builder::ChannelSenderDefBuilder;
16use crate::definitions::ChannelDefBuilder;
17use crate::definitions::{ChannelDef, ChannelReceiverDef, ChannelSenderDef};
18use crate::receiver::ChannelReceiver;
19use crate::sender::ChannelSender;
20use crate::tether_compliant_topic::{TetherCompliantTopic, TetherOrCustomTopic};
21
22const TIMEOUT_SECONDS: u64 = 3;
23
24#[derive(Clone, Debug)]
25pub struct AgentConfig {
26 pub role: String,
27 pub id: Option<String>,
28 pub host: String,
29 pub port: u16,
30 pub protocol: String,
31 pub username: String,
32 pub password: String,
33 pub url_base_path: String,
34 pub mqtt_client_id: String,
35 pub auto_connect_enabled: bool,
36}
37
38/**
39A Tether Agent struct encapsulates everything required to set up a single
40"Agent" as part of your Tether-based system. The only thing absolutely required is
41a "role" - everything else is optional and sensible defaults will be used when
42not explicitly specified.
43
44By default, the Agent will connect (automatically) to an MQTT Broker on localhost:1883
45
46It will **not** have an ID, and therefore publishing/subscribing topics will not append anything
47this into the topic string when ChannelSender and ChannelReceiver instances are created using
48this Tether Agent instance, unless explicitly provided on creation.
49
50Note that you should typically not construct a new TetherAgent instance yourself; rather
51use the provided TetherAgentBuilder to specify any options you might need, and call
52.build to get a well-configured TetherAgent.
53*/
54pub struct TetherAgent {
55 config: AgentConfig,
56 pub(crate) client: Option<Client>,
57 message_sender: mpsc::Sender<(TetherOrCustomTopic, Vec<u8>)>,
58 pub message_receiver: mpsc::Receiver<(TetherOrCustomTopic, Vec<u8>)>,
59 is_connected: Arc<Mutex<bool>>,
60}
61
62impl<'a, 'de> TetherAgent {
63 /// The simplest way to create a ChannelSender.
64 ///
65 /// You provide only a Channel Name;
66 /// configuration derived from your Tether Agent instance is used to construct
67 /// the appropriate publishing topics.
68 pub fn create_sender<T: Serialize>(&self, name: &str) -> ChannelSender<T> {
69 ChannelSender::new(ChannelSenderDefBuilder::new(name).build(&self.config))
70 }
71
72 /// Create a ChannelSender instance using a ChannelSenderDefinition already constructed
73 /// elsewhere.
74 pub fn create_sender_with_def<T: Serialize>(
75 &self,
76 definition: ChannelSenderDef,
77 ) -> ChannelSender<T> {
78 ChannelSender::new(definition)
79 }
80
81 /// The simplest way to create a Channel Receiver.
82 ///
83 /// You provide only a Channel Name;
84 /// configuration derived from your Tether Agent instance is used to construct
85 /// the appropriate subscribing topics.
86 ///
87 /// The actual subscription is also initiated automatically.
88 pub fn create_receiver<T: Deserialize<'de>>(
89 &'a self,
90 name: &str,
91 ) -> anyhow::Result<ChannelReceiver<'de, T>> {
92 ChannelReceiver::new(
93 self,
94 ChannelReceiverDefBuilder::new(name).build(&self.config),
95 )
96 }
97
98 /// Create a ChannelReceiver instance using a ChannelReceiverDefinition already constructed
99 /// elsewhere.
100 pub fn create_receiver_with_def<T: Deserialize<'a>>(
101 &'a self,
102 definition: ChannelReceiverDef,
103 ) -> anyhow::Result<ChannelReceiver<'a, T>> {
104 ChannelReceiver::new(self, definition)
105 }
106
107 pub fn is_connected(&self) -> bool {
108 self.client.is_some()
109 }
110
111 pub fn config(&self) -> &AgentConfig {
112 &self.config
113 }
114
115 // pub fn auto_connect_enabled(&self) -> bool {
116 // self.auto_connect_enabled
117 // }
118
119 // pub fn role(&self) -> &str {
120 // &self.role
121 // }
122
123 // pub fn id(&self) -> Option<&str> {
124 // self.id.as_deref()
125 // }
126
127 // /// Returns the Agent Role, ID (group), Broker URI
128 // pub fn description(&self) -> (String, String, String) {
129 // (
130 // String::from(&self.role),
131 // match &self.id {
132 // Some(id) => String::from(id),
133 // None => String::from("any"),
134 // },
135 // self.broker_uri(),
136 // )
137 // }
138
139 /// Get the underlying MQTT Client directly, immutable.
140 /// WARNING: This allows you to do non-Tether-compliant things!
141 pub fn client(&self) -> Option<&Client> {
142 self.client.as_ref()
143 }
144
145 /// Get the underlying MQTT Client directly, mutably.
146 /// WARNING: This allows you to do non-Tether-compliant things!
147 ///
148 /// Can be useful for subscribing to a topic directly, for example,
149 /// without knowing the message type (as would be the case with a Tether Channel).
150 pub fn client_mut(&mut self) -> Option<&mut Client> {
151 self.client.as_mut()
152 }
153
154 /// Return the URI (protocol, IP address, port, path) that
155 /// was used to connect to the MQTT broker
156 pub fn broker_uri(&self) -> String {
157 format!(
158 "{}://{}:{}{}",
159 &self.config.protocol, self.config.host, self.config.port, self.config.url_base_path
160 )
161 }
162
163 /// Change the role, even if it was set before. Be careful _when_ you call this,
164 /// as it could affect any new Channel Senders/Receivers created after that point.
165 pub fn set_role(&mut self, role: &str) {
166 self.config.role = role.into();
167 }
168
169 /// Change the ID, even if it was set (or left empty) before.
170 /// Be careful _when_ you call this,
171 /// as it could affect any new Channel Senders/Receivers created after that point.
172 pub fn set_id(&mut self, id: &str) {
173 self.config.id = Some(id.into());
174 }
175
176 /// Use this function yourself **only if you explicitly disallowed auto connection**.
177 /// Otherwise, this function is called automatically as part of the `.build` process.
178 ///
179 /// This function spawns a separate thread for polling the MQTT broker. Any events
180 /// and messages are relayed via mpsc channels internally; for example, you will call
181 /// `.check_messages()` to see if any messages were received and are waiting to be parsed.
182 pub fn connect(&mut self) -> anyhow::Result<()> {
183 info!(
184 "Make new connection to the MQTT server at {}://{}:{}...",
185 self.config.protocol, self.config.host, self.config.port
186 );
187
188 let mqtt_client_id = self.config.mqtt_client_id.clone();
189
190 debug!("Using MQTT Client ID \"{}\"", mqtt_client_id);
191
192 let mut mqtt_options =
193 MqttOptions::new(mqtt_client_id.clone(), &self.config.host, self.config.port)
194 .set_credentials(&self.config.username, &self.config.password)
195 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
196 .to_owned();
197
198 match self.config.protocol.as_str() {
199 "mqtts" => {
200 // Use rustls-native-certs to load root certificates from the operating system.
201 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
202 root_cert_store.add_parsable_certificates(
203 rustls_native_certs::load_native_certs()
204 .expect("could not load platform certs"),
205 );
206
207 let client_config = ClientConfig::builder()
208 .with_root_certificates(root_cert_store)
209 .with_no_client_auth();
210 mqtt_options.set_transport(Transport::tls_with_config(client_config.into()));
211 }
212 "wss" => {
213 // If using websocket protocol, rumqttc does NOT automatically add protocol and port
214 // into the URL!
215 let full_host = format!(
216 "{}://{}:{}{}",
217 self.config.protocol,
218 self.config.host,
219 self.config.port,
220 self.config.url_base_path
221 );
222 debug!("WSS using full host URL: {}", &full_host);
223 mqtt_options = MqttOptions::new(
224 mqtt_client_id.clone(),
225 &full_host,
226 self.config.port,
227 ) // here, port is ignored anyway
228 .set_credentials(&self.config.username, &self.config.password)
229 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
230 .to_owned();
231
232 // Use rustls-native-certs to load root certificates from the operating system.
233 let mut root_cert_store = rumqttc::tokio_rustls::rustls::RootCertStore::empty();
234 root_cert_store.add_parsable_certificates(
235 rustls_native_certs::load_native_certs()
236 .expect("could not load platform certs"),
237 );
238
239 let client_config = ClientConfig::builder()
240 .with_root_certificates(root_cert_store)
241 .with_no_client_auth();
242 mqtt_options.set_transport(Transport::wss_with_config(client_config.into()));
243 }
244 "ws" => {
245 // If using websocket protocol, rumqttc does NOT automatically add protocol and port
246 // into the URL!
247 let full_host = format!(
248 "{}://{}:{}{}",
249 self.config.protocol,
250 self.config.host,
251 self.config.port,
252 self.config.url_base_path
253 );
254 debug!("WS using full host URL: {}", &full_host);
255
256 mqtt_options = MqttOptions::new(
257 mqtt_client_id.clone(),
258 &full_host,
259 self.config.port,
260 ) // here, port is ignored anyway
261 .set_credentials(&self.config.username, &self.config.password)
262 .set_keep_alive(Duration::from_secs(TIMEOUT_SECONDS))
263 .to_owned();
264
265 mqtt_options.set_transport(Transport::Ws);
266 }
267 _ => {}
268 };
269
270 // Create the client connection
271 let (client, mut connection) = Client::new(mqtt_options, 10);
272
273 let message_tx = self.message_sender.clone();
274
275 let connection_state = Arc::clone(&self.is_connected);
276
277 thread::spawn(move || {
278 for event in connection.iter() {
279 match event {
280 Ok(e) => {
281 match e {
282 Event::Incoming(incoming) => match incoming {
283 Packet::ConnAck(_) => {
284 info!("(Connected) ConnAck received!");
285 let mut is_c =
286 connection_state.lock().expect("failed to lock mutex");
287 *is_c = true;
288 }
289 Packet::Publish(p) => {
290 debug!("Incoming Publish packet (message received), {:?}", &p);
291 let topic = p.topic;
292 let payload: Vec<u8> = p.payload.into();
293 match TetherCompliantTopic::try_from(topic.as_str()) {
294 Ok(t) => {
295 message_tx
296 .send((TetherOrCustomTopic::Tether(t), payload))
297 .expect(
298 "failed to push message from thread; three-part-topic OK",
299 );
300 }
301 Err(e) => {
302 warn!(
303 "Could not parse Three Part Topic from \"{}\": {}",
304 &topic, e
305 );
306 message_tx
307 .send((TetherOrCustomTopic::Custom(topic), payload))
308 .expect("failed to push message from thread; custom topic");
309 }
310 }
311 }
312 _ => debug!("Ignore all others for now, {:?}", incoming),
313 },
314 Event::Outgoing(outgoing) => {
315 debug!("Ignore outgoing events, for now, {:?}", outgoing)
316 }
317 }
318 }
319 Err(e) => {
320 error!("Connection Error: {:?}", e);
321 std::thread::sleep(Duration::from_secs(1));
322 // connection_status_tx
323 // .send(Err(anyhow!("MQTT Connection error")))
324 // .expect("failed to push error message from thread");
325 }
326 }
327 }
328 });
329
330 let mut is_ready = false;
331
332 while !is_ready {
333 debug!("Check whether connected...");
334 std::thread::sleep(Duration::from_millis(1));
335 trace!("Is ready? {}", is_ready);
336 let get_state = *self.is_connected.lock().expect("failed to lock mutex");
337 if get_state {
338 info!("Connection status confirmed");
339 is_ready = true;
340 } else {
341 debug!("Not connected yet...");
342 }
343 }
344
345 self.client = Some(client);
346
347 Ok(())
348 }
349
350 /// If a message is waiting to be parsed by your application,
351 /// this function will return Topic, Message, i.e. `(TetherOrCustomTopic, Message)`
352 ///
353 /// Messages received on topics that are not parseable as Tether Three Part Topics will be returned with
354 /// the complete Topic string instead
355 pub fn check_messages(&self) -> Option<(TetherOrCustomTopic, Vec<u8>)> {
356 // if let Ok(e) = self.connection_status_receiver.try_recv() {
357 // panic!("check_messages received error: {}", e);
358 // }
359 if let Ok(message) = self.message_receiver.try_recv() {
360 debug!("Message ready on queue");
361 Some(message)
362 } else {
363 None
364 }
365 }
366
367 /// Typically called via the Channel Sender itself.
368 ///
369 /// This function serializes the data (using Serde/MessagePack) automatically before publishing.
370 ///
371 /// Given a Channel Sender and serializeable data payload, publishes a message
372 /// using an appropriate topic and with the QOS specified in the Channel Definition.
373 ///
374 /// Note that this function requires that the data payload be the same type <T> as
375 /// the Channel Sender, so it will return an Error if the types do not match.
376 pub fn send<T: Serialize>(
377 &self,
378 channel_sender: &ChannelSender<T>,
379 data: &T,
380 ) -> anyhow::Result<()> {
381 match to_vec_named(&data) {
382 Ok(payload) => self.send_raw(channel_sender.definition(), Some(&payload)),
383 Err(e) => {
384 error!("Failed to encode: {e:?}");
385 Err(e.into())
386 }
387 }
388 }
389
390 /// Typically called via the Channel Sender itself.
391 ///
392 /// Unlike .send, this function does NOT serialize the data before publishing. It therefore
393 /// does no type checking of the payload.
394 ///
395 /// Given a Channel Sender and a raw (u8 buffer) payload, publishes a message
396 /// using an appropriate topic and with the QOS specified in the Channel Definition
397 pub fn send_raw(
398 &self,
399 channel_definition: &ChannelSenderDef,
400 payload: Option<&[u8]>,
401 ) -> anyhow::Result<()> {
402 let topic = channel_definition.generated_topic();
403 let qos = channel_definition.qos();
404
405 if let Some(client) = &self.client {
406 let res = client
407 .publish(
408 topic,
409 qos,
410 channel_definition.retain(),
411 payload.unwrap_or_default(),
412 )
413 .map_err(anyhow::Error::msg);
414 debug!("Published OK");
415 res
416 } else {
417 Err(anyhow!("Client not ready for publish"))
418 }
419 }
420
421 pub fn send_empty(&self, channel_definition: &ChannelSenderDef) -> anyhow::Result<()> {
422 self.send_raw(channel_definition, None)
423 }
424
425 /// Publish an already-encoded payload using a provided
426 /// **full topic string** - no need for passing a ChannelSender or
427 /// ChannelSenderDefinition reference.
428 ///
429 /// **WARNING:** This is a back door to using MQTT directly, without any
430 /// guarrantees of correctedness in a Tether-based system!
431 pub fn publish_raw(
432 &self,
433 topic: &str,
434 payload: &[u8],
435 qos: Option<i32>,
436 retained: Option<bool>,
437 ) -> anyhow::Result<()> {
438 let qos = match qos.unwrap_or(1) {
439 0 => QoS::AtMostOnce,
440 1 => QoS::AtLeastOnce,
441 2 => QoS::ExactlyOnce,
442 _ => QoS::AtMostOnce,
443 };
444 if let Some(client) = &self.client {
445 client
446 .publish(topic, qos, retained.unwrap_or_default(), payload)
447 .map_err(anyhow::Error::msg)
448 } else {
449 Err(anyhow!("Client not ready for publish"))
450 }
451 }
452}
453
454// impl From<u8> for rumqttc::QoS {
455// fn from(value: u8) -> Self {
456// match value {
457// 0 => rumqttc::QoS::AtMostOnce,
458// 1 => rumqttc::QoS::AtLeastOnce,
459// 2 => rumqttc::QoS::ExactlyOnce,
460// _ => rumqttc::QoS::AtMostOnce,
461// }
462// }
463// }