1use serde_json::Value;
2use log::info;
3
4use nanoid::nanoid;
5use rumqttc_dev_patched::{Client, Connection, Event, Incoming, LastWill, MqttOptions, Outgoing, Packet, QoS, Transport};
7use native_tls;
8
9pub mod request_command;
10
11use request_command::RequestCommand;
12
13pub struct BambulabClient {
14 client: Client,
15 connection: Connection,
16 report_topic: String,
17 request_topic: String,
18}
19
20fn extract_sequence_id(payload: &str) -> Option<String>{
22 let json: serde_json::Value = serde_json::from_str(payload).unwrap();
23 if let Value::Object(map) = json {
24 for (_key, inner) in map {
25 if let Some(sequence_id) = inner.get("sequence_id") {
26 if let Some(sequence_id_str) = sequence_id.as_str() {
27 return Some(sequence_id_str.to_string());
28 }
29 }
30 }
31 }
32 None
33}
34
35fn check_sequence_id(payload: &str, sequence_id: &String) -> bool {
36 match extract_sequence_id(payload) {
37 Some(get_sequence_id) => {
38 if *sequence_id == get_sequence_id {
39 return true;
40 } else {
41 return false;
42 }
43 },
44 None => {
45 return false;
46 }
47 }
48}
49
50impl BambulabClient {
51 pub fn new(host: String, password: String, serial: String) -> Self{
52 let connector = native_tls::TlsConnector::builder()
53 .danger_accept_invalid_certs(true)
54 .danger_accept_invalid_hostnames(true)
55 .build().unwrap();
56 let mut mqttoptions: MqttOptions = MqttOptions::new(nanoid!(), host, 8883);
57 mqttoptions.set_credentials("bblp", password);
58 mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
59 mqttoptions.set_transport(Transport::tls_with_config(connector.into()));
60
61 let report_topic = format!("device/{}/report", &serial);
62 let request_topic= format!("device/{}/request", &serial);
63
64 let (client, connection) = Client::new(mqttoptions, 10);
65
66 BambulabClient {
67 report_topic,
68 request_topic,
69 client,
70 connection,
71 }
72 }
73
74
75 fn wait_request(&mut self, sequence_id:&String) -> String{
76 for (i, notification) in self.connection.iter().enumerate() {
77 match notification {
78 Ok(Event::Incoming(packet)) => {
79 info!("Incoming {:?}",packet);
80 if let Packet::Publish(p) = packet {
82 info!(" → Publish received on '{}': {}", p.topic, String::from_utf8_lossy(&p.payload));
83 if check_sequence_id(&String::from_utf8_lossy(&p.payload), &sequence_id) {
84 info!(" → Sequence ID matched: {}", sequence_id);
85 return String::from_utf8_lossy(&p.payload).to_string();
87 }
88 }
89 }
90 Ok(notif) => {
91 info!("{i}. Notification = {notif:?}");
92 }
93 Err(error) => {
94 info!("{i}. Notification = {error:?}");
95 }
96 }
97 }
98 "".to_string()
99 }
100
101 pub fn request(&mut self, cmd: &RequestCommand) -> Result<String, serde_json::Error> {
102 match cmd.to_payload() {
103 Ok(payload) => {
104 info!("Publishing over MQTT:\n{}", payload);
105 let sequence_id = cmd.get_sequence_id().unwrap();
106 self.client.subscribe(&self.report_topic, QoS::AtMostOnce).unwrap();
107 self.client.publish(&self.request_topic, QoS::AtLeastOnce, true, payload).unwrap();
108 let res = self.wait_request(&sequence_id);
109 self.client.unsubscribe(&self.report_topic).unwrap();
110 Ok(res)
111 }
112 Err(e) =>{
113 Err(e)
114 }
115 }
116 }
117
118 pub fn disconnect(&mut self) {
119 self.client.disconnect().unwrap();
120 }
121
122}
123