1use std::{collections::HashMap, error::Error};
2
3use prometheus::{
4 core::Collector, opts, register_gauge_vec_with_registry, GaugeVec, Registry, TextEncoder,
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use url::Url;
9
10#[macro_use]
11extern crate lazy_static;
12
13#[cfg(not(test))]
14use log::{debug, info, warn};
15
16#[cfg(test)]
17use std::{println as debug, println as warn, println as info};
18
19lazy_static! {
20 static ref REGISTRY: Registry = Registry::new_custom(Some("deconz".into()), None).expect("Failed to create registry");
22}
23
24#[derive(Serialize, Deserialize, Debug)]
26pub struct Gateway {
27 pub apiversion: String,
28 pub bridgeid: String,
29 pub devicename: String,
30 pub dhcp: bool,
31 pub gateway: String,
32 pub ipaddress: String,
33 pub linkbutton: bool,
34 pub mac: String,
35 pub modelid: String,
36 pub name: String,
37 pub swversion: String,
38 pub websocketport: u16,
39 pub zigbeechannel: u8,
40}
41
42#[derive(Clone, Default, Serialize, Deserialize, Debug)]
46pub struct SensorConfig {
47 pub battery: f64,
48 pub offset: f64,
49 pub on: bool,
50 pub reachable: bool,
51}
52
53#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct Sensor {
56 #[serde(default)]
57 pub config: Option<SensorConfig>,
58 pub etag: Option<String>,
59 pub lastannounced: Option<String>,
60 pub lastseen: Option<String>,
61 pub manufacturername: String,
62 pub modelid: String,
63 pub name: String,
64 #[serde(default)]
65 pub state: HashMap<String, Value>,
66 pub swversion: Option<String>,
67 #[serde(rename = "type")]
68 pub tipe: String,
69 pub uniqueid: String,
70 #[serde(skip)]
71 dummy: String,
72}
73
74pub struct State {
75 sensors: HashMap<String, Sensor>,
76 metrics: HashMap<String, GaugeVec>,
77}
78
79#[derive(Serialize, Deserialize, Debug)]
83pub struct Event {
84 #[serde(rename = "t")]
86 pub type_: String,
87 #[serde(rename = "e")]
89 pub event: String,
90 #[serde(rename = "r")]
92 pub resource: String,
93 pub id: String,
95 pub uniqueid: String,
97 pub gid: Option<String>,
99 pub scid: Option<String>,
101 #[serde(default)]
104 pub config: Option<SensorConfig>,
105 pub name: Option<String>,
107 #[serde(default)]
110 pub state: HashMap<String, Value>,
111 #[serde(default)]
113 pub group: HashMap<String, Value>,
114 #[serde(default)]
116 pub light: HashMap<String, Value>,
117 #[serde(default)]
119 pub sensor: HashMap<String, Value>,
120 pub attr: Option<Sensor>,
122}
123
124pub type Callback = fn(&mut Event, &mut State) -> Result<(), Box<dyn Error>>;
126
127pub fn gateway(host: &Url, username: &str) -> Result<Gateway, reqwest::Error> {
129 let u = format!("{}/api/{}/config", host, username);
130 info!("Connecting to API gateway at {u}");
131 reqwest::blocking::get(u)?.json()
132}
133
134pub fn websocket(host: &Url, username: &str) -> Result<Url, Box<dyn Error>> {
136 let gw = gateway(host, username)?;
137 let mut host = host.clone();
138
139 host.set_scheme("ws").unwrap();
140 host.set_port(Some(gw.websocketport)).unwrap();
141
142 info!("Discovered websocket port at {}", host);
143 Ok(host)
144}
145
146pub fn run(host: &Url, username: &str) -> Result<(), Box<dyn Error>> {
148 let mut state = State::with_metrics();
149 let socket = websocket(host, username)?;
150 stream(&socket, &mut state, process)
151}
152
153pub fn stream(url: &Url, state: &mut State, callback: Callback) -> Result<(), Box<dyn Error>> {
159 info!("🔌 Start listening for websocket events at {url}");
160
161 let (mut socket, _) = tungstenite::client::connect(url)?;
162 loop {
163 match serde_json::from_str::<Event>(socket.read_message()?.to_text()?) {
164 Ok(mut event) => {
165 if let Err(err) = callback(&mut event, state) {
168 warn!("Failed to handle event: `{:?}`: {:?}", event, err)
169 }
170 }
171 Err(err) => {
172 warn!("Failed to serialize, ignoring message: {:?}", err)
173 }
174 }
175 }
176}
177
178pub fn process(e: &mut Event, state: &mut State) -> Result<(), Box<dyn Error>> {
185 debug!("Received event for {}", e.id);
186
187 if let Some(attr) = &e.attr {
189 if e.type_ == "event" && e.event == "changed" {
190 debug!("Updating attrs for {}", e.id);
191 state.sensors.insert(e.id.to_string(), attr.clone());
192 return Ok(());
193 }
194 }
195
196 if e.type_ == "event" && e.event == "changed" && !e.state.is_empty() {
198 if let Some(sensor) = state.sensors.get(&e.id) {
199 for (k, v) in &e.state {
200 if k == "lastupdated" {
201 continue;
202 }
203
204 if let Some(gauge) = state.metrics.get(k.as_str()) {
205 if let Some(val) = v.as_f64() {
206 debug!("Updating metric ID:{}, {k}:{v}", e.id);
207 gauge.with(&sensor.labels(true)).set(val);
208 }
209 } else {
210 debug!("Ignoring metric ID:{}, {k}:{v}", e.id);
211 }
212 return Ok(());
213 }
214 } else {
215 warn!("Ignoring event update for unknown sensor {}: {:?}", e.id, e)
216 }
217
218 return Ok(());
219 }
220
221 if let Some(config) = &e.config {
223 if e.type_ == "event" && e.event == "changed" {
224 debug!("Updating metric ID:{}, battery:{}", e.id, config.battery);
225
226 let s = state.sensors.get(&e.id).unwrap().clone();
227 let gauge = state.metrics.get("battery").unwrap();
228
229 gauge.with(&s.labels(false)).set(config.battery);
230 return Ok(());
231 }
232 }
233
234 warn!("Ignoring unknown event {:?}", e);
235
236 Ok(())
237}
238
239pub fn metrics() -> String {
241 let encoder = TextEncoder::new();
242 let metric_families = REGISTRY.gather();
243 encoder.encode_to_string(&metric_families).unwrap()
244}
245
246impl State {
247 fn with_metrics() -> Self {
248 let mut s = State {
249 metrics: Default::default(),
250 sensors: Default::default(),
251 };
252
253 let metrics = vec![
254 register_gauge_vec_with_registry!(
255 opts!("battery", "Battery level of sensors"),
256 &["manufacturername", "modelid", "name", "swversion"],
257 REGISTRY
258 )
259 .unwrap(),
260 register_gauge_vec_with_registry!(
261 opts!("humidity", "Humidity level"),
262 &["manufacturername", "modelid", "name", "swversion", "type"],
263 REGISTRY,
264 )
265 .unwrap(),
266 register_gauge_vec_with_registry!(
267 opts!("pressure", "Pressure level"),
268 &["manufacturername", "modelid", "name", "swversion", "type"],
269 REGISTRY
270 )
271 .unwrap(),
272 register_gauge_vec_with_registry!(
273 opts!("temperature", "Temperature level"),
274 &["manufacturername", "modelid", "name", "swversion", "type"],
275 REGISTRY,
276 )
277 .unwrap(),
278 ];
279
280 for gauge in metrics {
281 s.metrics
282 .insert(gauge.desc()[0].fq_name.clone(), gauge.clone());
283 }
284
285 s
286 }
287}
288
289impl Sensor {
290 fn labels(&self, tipe: bool) -> HashMap<&str, &str> {
292 vec![
293 ("manufacturername", &self.manufacturername),
294 ("modelid", &self.modelid),
295 ("name", &self.name),
296 ("swversion", self.swversion.as_ref().unwrap_or(&self.dummy)),
297 if tipe {
298 ("type", &self.tipe)
299 } else {
300 ("", &self.dummy)
301 },
302 ]
303 .into_iter()
304 .filter(|(name, _)| !name.is_empty())
305 .map(|(name, value)| (name, value.as_str()))
306 .collect()
307 }
308}
309
310#[cfg(test)]
311mod test {
312 use super::*;
313
314 #[test]
315 #[ignore]
316 fn read_config() {
317 let resp = gateway(
318 &Url::parse("http://nyx.jabid.in:4501").unwrap(),
319 "381412B455",
320 );
321
322 match resp {
323 Ok(cfg) => {
324 assert_eq!(cfg.apiversion, "1.16.0");
325 assert_eq!(cfg.bridgeid, "00212EFFFF07D25D")
326 }
327 Err(e) => {
328 panic!("Failed to read gateway config from home assistant: {}", e)
329 }
330 }
331 }
332
333 #[test]
334 fn test_process() {
335 let events = include_str!("../events.json");
336 let mut state = State::with_metrics();
337
338 for event in events.lines().filter(|l| !l.trim().is_empty()) {
339 let mut e = serde_json::from_str::<Event>(event)
340 .unwrap_or_else(|err| panic!("Failed to parse event {}: {}", &event, err));
341
342 process(&mut e, &mut state)
343 .unwrap_or_else(|err| panic!("Failed to process event {:?}: {}", &e, err));
344 }
345
346 let m = metrics();
348 let m = m
349 .lines()
350 .filter(|line| !line.starts_with('#'))
351 .collect::<Vec<_>>();
352
353 dbg!(&m);
354
355 assert!(m.len() > 10, "Too few metrics exported")
356 }
357}