rainmaker_components/mqtt/
linux.rs

1#![cfg(target_os = "linux")]
2
3use crate::error::Error;
4use crate::mqtt::base::*;
5
6impl From<&rumqttc::QoS> for QoSLevel {
7    fn from(input: &rumqttc::QoS) -> Self {
8        match input {
9            rumqttc::QoS::AtMostOnce => QoSLevel::AtMostOnce,
10            rumqttc::QoS::AtLeastOnce => QoSLevel::AtLeastOnce,
11            rumqttc::QoS::ExactlyOnce => QoSLevel::ExactlyOnce,
12        }
13    }
14}
15
16impl From<&QoSLevel> for rumqttc::QoS {
17    fn from(value: &QoSLevel) -> Self {
18        match value {
19            QoSLevel::AtMostOnce => rumqttc::QoS::AtMostOnce,
20            QoSLevel::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
21            QoSLevel::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
22        }
23    }
24}
25
26impl From<rumqttc::Publish> for ReceivedMessage {
27    fn from(value: rumqttc::Publish) -> Self {
28        Self {
29            topic: value.topic,
30            payload: value.payload.to_vec(),
31        }
32    }
33}
34
35impl From<rumqttc::Event> for MqttEvent {
36    fn from(value: rumqttc::Event) -> Self {
37        match value {
38            rumqttc::Event::Incoming(e) => match e {
39                rumqttc::Packet::ConnAck(_) => MqttEvent::Connected,
40                rumqttc::Packet::Publish(m) => MqttEvent::Received(m.into()),
41                rumqttc::Packet::Disconnect => MqttEvent::Disconnected,
42                rumqttc::Packet::Connect(_) => MqttEvent::BeforeConnect,
43                rumqttc::Packet::SubAck(_) => Self::Subscribed,
44                rumqttc::Packet::PubAck(_) => Self::Published,
45                _ => MqttEvent::Other,
46            },
47
48            rumqttc::Event::Outgoing(_) => Self::Other,
49        }
50    }
51}
52
53impl MqttClient<rumqttc::Client> {
54    pub fn new(
55        config: &MqttConfiguration,
56        tlscerts: &'static TLSconfiguration,
57        callback: Box<dyn Fn(MqttEvent) + Send>,
58    ) -> Result<Self, Error> {
59        let mut option = rumqttc::MqttOptions::new(config.clientid, config.host, config.port);
60        option.transport();
61
62        option.set_keep_alive(std::time::Duration::from_secs(60));
63
64        option.set_transport(rumqttc::Transport::tls(
65            tlscerts.server_cert.to_vec(),
66            Some((
67                tlscerts.client_cert.to_vec(),
68                rumqttc::Key::RSA(tlscerts.private_key.to_vec()),
69            )),
70            None,
71        ));
72        let (client, mut conn) = rumqttc::Client::new(option, 5);
73        std::thread::spawn(move || {
74            for notification in conn.iter() {
75                match notification {
76                    Ok(notif) => callback(notif.into()),
77                    Err(e) => log::error!("error while executing callback: {:?}", e),
78                };
79            }
80        });
81
82        Ok(Self { client })
83    }
84
85    pub fn publish(&mut self, topic: &str, qos: &QoSLevel, payload: Vec<u8>) -> u32 {
86        self.client
87            .publish(topic, qos.into(), false, payload)
88            .expect("unable to publish");
89
90        // return 0 to signify msg_id returning not supported
91        0
92    }
93
94    pub fn subscribe(&mut self, topic: &str, qos: &QoSLevel) -> Result<(), Error> {
95        self.client
96            .subscribe(topic, qos.into())
97            .expect("unable to subscribe");
98
99        Ok(())
100    }
101}