1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use std::{fs::read, thread, collections::HashMap, sync::{Arc, Mutex}};
use rumqtt::{QoS, ConnectError, ReconnectOptions, Receiver, Notification, MqttClient, MqttOptions};
pub struct AWSIoTClient {
pub aws_iot_client: MqttClient,
receiver: Receiver<Notification>,
callback_map: Arc<Mutex<HashMap<String, fn(String)>>>,
}
impl AWSIoTClient {
pub fn new(
client_id: &str,
ca_path: &str,
client_cert_path: &str,
client_key_path: &str,
aws_iot_endpoint: &str) -> Result<Self, ConnectError> {
let mqtt_options = MqttOptions::new(client_id, aws_iot_endpoint, 8883)
.set_ca(read(ca_path)?)
.set_client_auth(read(client_cert_path)?, read(client_key_path)?)
.set_keep_alive(10)
.set_reconnect_opts(ReconnectOptions::Always(5));
let mqtt_client = MqttClient::start(mqtt_options)?;
Ok(AWSIoTClient {
aws_iot_client: mqtt_client.0,
receiver: mqtt_client.1,
callback_map: Arc::new(Mutex::new(HashMap::new()))
})
}
pub fn add_callback(&self, topic_name: String, callback: fn(String)) -> Option<fn(String)> {
self.callback_map
.lock()
.unwrap()
.insert(topic_name, callback)
}
pub fn remove_callback(&self, topic_name: String) -> Option<fn(String)> {
self.callback_map
.lock()
.unwrap()
.remove(&topic_name)
}
pub fn start_listening(&mut self) {
let callback_map = Arc::clone(&self.callback_map);
let receiver = self.receiver.clone();
thread::spawn(move || loop {
for notification in &receiver {
match notification {
rumqtt::client::Notification::Publish(packet) => {
match callback_map
.lock()
.unwrap()
.get(&packet.topic_name) {
Some(&func) => {
func(String::from_utf8(packet.payload.to_vec()).unwrap());
}
_ => (),
}
},
_ => (),
}
}
});
}
pub fn subscribe (&mut self, topic_name: String, qos: QoS) {
self.aws_iot_client.subscribe(topic_name, qos).unwrap();
}
pub fn publish (&mut self, topic_name: String, qos: QoS, payload: &str) {
self.aws_iot_client.publish(topic_name, qos, false, payload).unwrap();
}
}