bambulab_mqtt/
lib.rs

1use serde_json::Value;
2use log::info;
3
4use nanoid::nanoid;
5//use rumqttc::{Client, Connection, Event, Incoming, LastWill, MqttOptions, Outgoing, Packet, QoS, Transport};
6use rumqttc_dev_patched::{Client, Connection, Event, Incoming, LastWill, MqttOptions, Outgoing, Packet, QoS, Transport};
7use native_tls;
8
9pub mod request_command;
10
11use request_command::RequestCommand;
12
13pub struct BambulabClient {
14    client: Client,
15    connection: Connection,
16    report_topic: String,
17    request_topic: String,
18}
19
20// sequence_idの抽出
21fn extract_sequence_id(payload: &str) -> Option<String>{
22    let json: serde_json::Value = serde_json::from_str(payload).unwrap();
23    if let Value::Object(map) = json {
24        for (_key, inner) in map {
25            if let Some(sequence_id) = inner.get("sequence_id") {
26                if let Some(sequence_id_str) = sequence_id.as_str() {
27                    return Some(sequence_id_str.to_string());
28                }
29            }
30        }
31    }
32    None
33}
34
35fn check_sequence_id(payload: &str, sequence_id: &String) -> bool {
36    match extract_sequence_id(payload) {
37        Some(get_sequence_id) => {
38            if *sequence_id == get_sequence_id {
39                return true;
40            } else {
41                return false;
42            }
43        },
44        None => {
45            return false;
46        }
47    }
48}
49
50impl BambulabClient {
51    pub fn new(host: String, password: String, serial: String) -> Self{
52        let connector = native_tls::TlsConnector::builder()
53                                    .danger_accept_invalid_certs(true)
54                                    .danger_accept_invalid_hostnames(true)
55                                    .build().unwrap();
56        let mut mqttoptions: MqttOptions = MqttOptions::new(nanoid!(), host, 8883);
57        mqttoptions.set_credentials("bblp", password);
58        mqttoptions.set_keep_alive(std::time::Duration::from_secs(5));
59        mqttoptions.set_transport(Transport::tls_with_config(connector.into()));
60
61        let report_topic = format!("device/{}/report", &serial);
62        let request_topic= format!("device/{}/request", &serial);
63
64        let (client, connection) = Client::new(mqttoptions, 10);
65
66        BambulabClient { 
67            report_topic,
68            request_topic,
69            client,
70            connection,
71        }
72    }
73
74
75    fn wait_request(&mut self, sequence_id:&String) -> String{
76        for (i, notification) in self.connection.iter().enumerate() {
77            match notification {
78                Ok(Event::Incoming(packet)) => {
79                    info!("Incoming {:?}",packet);
80                    // もし Publish パケットなら、トピックとペイロードを取り出してみる
81                    if let Packet::Publish(p) = packet {
82                        info!("    → Publish received on '{}': {}", p.topic, String::from_utf8_lossy(&p.payload));
83                        if check_sequence_id(&String::from_utf8_lossy(&p.payload), &sequence_id) {
84                            info!("    → Sequence ID matched: {}", sequence_id);
85                            //break;
86                            return String::from_utf8_lossy(&p.payload).to_string();
87                        }
88                    }
89                }
90                Ok(notif) => {
91                    info!("{i}. Notification = {notif:?}");
92                }
93                Err(error) => {
94                    info!("{i}. Notification = {error:?}");
95                }
96            }
97        }
98        "".to_string()
99    }
100
101    pub fn request(&mut self, cmd: &RequestCommand) -> Result<String, serde_json::Error> {
102        match cmd.to_payload() {
103            Ok(payload) => {
104                info!("Publishing over MQTT:\n{}", payload);
105                let sequence_id = cmd.get_sequence_id().unwrap();
106                self.client.subscribe(&self.report_topic, QoS::AtMostOnce).unwrap();
107                self.client.publish(&self.request_topic, QoS::AtLeastOnce, true, payload).unwrap();
108                let res = self.wait_request(&sequence_id);
109                self.client.unsubscribe(&self.report_topic).unwrap();
110                Ok(res)
111            }
112            Err(e) =>{
113                Err(e)
114            }
115        }
116    }
117
118    pub fn disconnect(&mut self) {
119        self.client.disconnect().unwrap();
120    }
121
122}
123