rainmaker_components/mqtt/
linux.rs1#![cfg(target_os = "linux")]
2
3use crate::error::Error;
4use crate::mqtt::base::*;
5
6impl From<&rumqttc::QoS> for QoSLevel {
7 fn from(input: &rumqttc::QoS) -> Self {
8 match input {
9 rumqttc::QoS::AtMostOnce => QoSLevel::AtMostOnce,
10 rumqttc::QoS::AtLeastOnce => QoSLevel::AtLeastOnce,
11 rumqttc::QoS::ExactlyOnce => QoSLevel::ExactlyOnce,
12 }
13 }
14}
15
16impl From<&QoSLevel> for rumqttc::QoS {
17 fn from(value: &QoSLevel) -> Self {
18 match value {
19 QoSLevel::AtMostOnce => rumqttc::QoS::AtMostOnce,
20 QoSLevel::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
21 QoSLevel::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
22 }
23 }
24}
25
26impl From<rumqttc::Publish> for ReceivedMessage {
27 fn from(value: rumqttc::Publish) -> Self {
28 Self {
29 topic: value.topic,
30 payload: value.payload.to_vec(),
31 }
32 }
33}
34
35impl From<rumqttc::Event> for MqttEvent {
36 fn from(value: rumqttc::Event) -> Self {
37 match value {
38 rumqttc::Event::Incoming(e) => match e {
39 rumqttc::Packet::ConnAck(_) => MqttEvent::Connected,
40 rumqttc::Packet::Publish(m) => MqttEvent::Received(m.into()),
41 rumqttc::Packet::Disconnect => MqttEvent::Disconnected,
42 rumqttc::Packet::Connect(_) => MqttEvent::BeforeConnect,
43 rumqttc::Packet::SubAck(_) => Self::Subscribed,
44 rumqttc::Packet::PubAck(_) => Self::Published,
45 _ => MqttEvent::Other,
46 },
47
48 rumqttc::Event::Outgoing(_) => Self::Other,
49 }
50 }
51}
52
53impl MqttClient<rumqttc::Client> {
54 pub fn new(
55 config: &MqttConfiguration,
56 tlscerts: &'static TLSconfiguration,
57 callback: Box<dyn Fn(MqttEvent) + Send>,
58 ) -> Result<Self, Error> {
59 let mut option = rumqttc::MqttOptions::new(config.clientid, config.host, config.port);
60 option.transport();
61
62 option.set_keep_alive(std::time::Duration::from_secs(60));
63
64 option.set_transport(rumqttc::Transport::tls(
65 tlscerts.server_cert.to_vec(),
66 Some((
67 tlscerts.client_cert.to_vec(),
68 rumqttc::Key::RSA(tlscerts.private_key.to_vec()),
69 )),
70 None,
71 ));
72 let (client, mut conn) = rumqttc::Client::new(option, 5);
73 std::thread::spawn(move || {
74 for notification in conn.iter() {
75 match notification {
76 Ok(notif) => callback(notif.into()),
77 Err(e) => log::error!("error while executing callback: {:?}", e),
78 };
79 }
80 });
81
82 Ok(Self { client })
83 }
84
85 pub fn publish(&mut self, topic: &str, qos: &QoSLevel, payload: Vec<u8>) -> u32 {
86 self.client
87 .publish(topic, qos.into(), false, payload)
88 .expect("unable to publish");
89
90 0
92 }
93
94 pub fn subscribe(&mut self, topic: &str, qos: &QoSLevel) -> Result<(), Error> {
95 self.client
96 .subscribe(topic, qos.into())
97 .expect("unable to subscribe");
98
99 Ok(())
100 }
101}