deconz_exporter/
lib.rs

1use std::{collections::HashMap, error::Error};
2
3use prometheus::{
4    core::Collector, opts, register_gauge_vec_with_registry, GaugeVec, Registry, TextEncoder,
5};
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use url::Url;
9
10#[macro_use]
11extern crate lazy_static;
12
13#[cfg(not(test))]
14use log::{debug, info, warn};
15
16#[cfg(test)]
17use std::{println as debug, println as warn, println as info};
18
19lazy_static! {
20    /// Global prometheus registry for all metrics
21    static ref REGISTRY: Registry = Registry::new_custom(Some("deconz".into()), None).expect("Failed to create registry");
22}
23
24/// deCONZ gateway config
25#[derive(Serialize, Deserialize, Debug)]
26pub struct Gateway {
27    pub apiversion: String,
28    pub bridgeid: String,
29    pub devicename: String,
30    pub dhcp: bool,
31    pub gateway: String,
32    pub ipaddress: String,
33    pub linkbutton: bool,
34    pub mac: String,
35    pub modelid: String,
36    pub name: String,
37    pub swversion: String,
38    pub websocketport: u16,
39    pub zigbeechannel: u8,
40}
41
42/// Sensor config
43///
44/// Present only for "ZHA{Humidity, Pressure, Switch, Temperature}, null for "Configuration tool"
45#[derive(Clone, Default, Serialize, Deserialize, Debug)]
46pub struct SensorConfig {
47    pub battery: f64,
48    pub offset: f64,
49    pub on: bool,
50    pub reachable: bool,
51}
52
53/// Sensor info
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub struct Sensor {
56    #[serde(default)]
57    pub config: Option<SensorConfig>,
58    pub etag: Option<String>,
59    pub lastannounced: Option<String>,
60    pub lastseen: Option<String>,
61    pub manufacturername: String,
62    pub modelid: String,
63    pub name: String,
64    #[serde(default)]
65    pub state: HashMap<String, Value>,
66    pub swversion: Option<String>,
67    #[serde(rename = "type")]
68    pub tipe: String,
69    pub uniqueid: String,
70    #[serde(skip)]
71    dummy: String,
72}
73
74pub struct State {
75    sensors: HashMap<String, Sensor>,
76    metrics: HashMap<String, GaugeVec>,
77}
78
79/// Websocket event from deCONZ for Conbee2
80//
81// https://dresden-elektronik.github.io/deconz-rest-doc/endpoints/websocket/#message-fields
82#[derive(Serialize, Deserialize, Debug)]
83pub struct Event {
84    // "event" - the message holds an event.
85    #[serde(rename = "t")]
86    pub type_: String,
87    // One of added | changed | deleted | scene-called
88    #[serde(rename = "e")]
89    pub event: String,
90    // Resource is one of groups | lights | scenes | sensors
91    #[serde(rename = "r")]
92    pub resource: String,
93    // The id of the resource to which the message relates
94    pub id: String,
95    // The uniqueid of the resource to which the message relates
96    pub uniqueid: String,
97    // The group id of the resource to which the message relates.
98    pub gid: Option<String>,
99    // The scene id of the resource to which the message relates.
100    pub scid: Option<String>,
101    // Depending on the `websocketnotifyall` setting: a map containing all or only the changed config attributes of a
102    // sensor resource.  Only for changed events.
103    #[serde(default)]
104    pub config: Option<SensorConfig>,
105    // The (new) name of a resource. Only for changed events.
106    pub name: Option<String>,
107    // Depending on the websocketnotifyall setting: a map containing all or only the changed state attributes of a
108    // group, light, or sensor resource.  Only for changed events.
109    #[serde(default)]
110    pub state: HashMap<String, Value>,
111    // The full group resource.  Only for added events of a group resource
112    #[serde(default)]
113    pub group: HashMap<String, Value>,
114    // The full light resource.  Only for added events of a light resource.
115    #[serde(default)]
116    pub light: HashMap<String, Value>,
117    // The full sensor resource.  Only for added events of a sensor resource.
118    #[serde(default)]
119    pub sensor: HashMap<String, Value>,
120    // Undocumented, but present in API responses.
121    pub attr: Option<Sensor>,
122}
123
124/// Callback function executed for every update event
125pub type Callback = fn(&mut Event, &mut State) -> Result<(), Box<dyn Error>>;
126
127/// Read gateway config from deCONZ REST API
128pub fn gateway(host: &Url, username: &str) -> Result<Gateway, reqwest::Error> {
129    let u = format!("{}/api/{}/config", host, username);
130    info!("Connecting to API gateway at {u}");
131    reqwest::blocking::get(u)?.json()
132}
133
134/// Discover websocket port from gateway config
135pub fn websocket(host: &Url, username: &str) -> Result<Url, Box<dyn Error>> {
136    let gw = gateway(host, username)?;
137    let mut host = host.clone();
138
139    host.set_scheme("ws").unwrap();
140    host.set_port(Some(gw.websocketport)).unwrap();
141
142    info!("Discovered websocket port at {}", host);
143    Ok(host)
144}
145
146/// Run listener for websocket events.
147pub fn run(host: &Url, username: &str) -> Result<(), Box<dyn Error>> {
148    let mut state = State::with_metrics();
149    let socket = websocket(host, username)?;
150    stream(&socket, &mut state, process)
151}
152
153/// Run a callback for each event received over websocket.
154//
155// NOTE: A stream of Events would have been much neater than a callback, but Rust makes that API significantly more
156// painful to implement.  Revisit this later.
157//
158pub fn stream(url: &Url, state: &mut State, callback: Callback) -> Result<(), Box<dyn Error>> {
159    info!("🔌 Start listening for websocket events at {url}");
160
161    let (mut socket, _) = tungstenite::client::connect(url)?;
162    loop {
163        match serde_json::from_str::<Event>(socket.read_message()?.to_text()?) {
164            Ok(mut event) => {
165                // Failing to process a single event is alright, and this process should just continue. Non recoverable
166                // errors should bubble up so that the whole stream can be reestablished.
167                if let Err(err) = callback(&mut event, state) {
168                    warn!("Failed to handle event: `{:?}`: {:?}", event, err)
169                }
170            }
171            Err(err) => {
172                warn!("Failed to serialize, ignoring message: {:?}", err)
173            }
174        }
175    }
176}
177
178/// Process events that can be handled and throw away everything else with a warning log.
179///
180/// The events structure is a bit messy and not in a good shape. See documentation of `Event` for details.
181///
182/// Events with `attrs` are used to get human readable labels and stored in a static map for future lookup, when state
183/// updates arrive without these attributes.
184pub fn process(e: &mut Event, state: &mut State) -> Result<(), Box<dyn Error>> {
185    debug!("Received event for {}", e.id);
186
187    // Sensor attributes contains human friendly names and labels. Store them now for future events with no attributes.
188    if let Some(attr) = &e.attr {
189        if e.type_ == "event" && e.event == "changed" {
190            debug!("Updating attrs for {}", e.id);
191            state.sensors.insert(e.id.to_string(), attr.clone());
192            return Ok(());
193        }
194    }
195
196    // State often has 2 keys, `lastupdated` and another one that is the actual data. Handle those, ignore the rest
197    if e.type_ == "event" && e.event == "changed" && !e.state.is_empty() {
198        if let Some(sensor) = state.sensors.get(&e.id) {
199            for (k, v) in &e.state {
200                if k == "lastupdated" {
201                    continue;
202                }
203
204                if let Some(gauge) = state.metrics.get(k.as_str()) {
205                    if let Some(val) = v.as_f64() {
206                        debug!("Updating metric ID:{}, {k}:{v}", e.id);
207                        gauge.with(&sensor.labels(true)).set(val);
208                    }
209                } else {
210                    debug!("Ignoring metric ID:{}, {k}:{v}", e.id);
211                }
212                return Ok(());
213            }
214        } else {
215            warn!("Ignoring event update for unknown sensor {}: {:?}", e.id, e)
216        }
217
218        return Ok(());
219    }
220
221    // Config change should be pretty much identical to state change
222    if let Some(config) = &e.config {
223        if e.type_ == "event" && e.event == "changed" {
224            debug!("Updating metric ID:{}, battery:{}", e.id, config.battery);
225
226            let s = state.sensors.get(&e.id).unwrap().clone();
227            let gauge = state.metrics.get("battery").unwrap();
228
229            gauge.with(&s.labels(false)).set(config.battery);
230            return Ok(());
231        }
232    }
233
234    warn!("Ignoring unknown event {:?}", e);
235
236    Ok(())
237}
238
239/// Export prometheus metrics as a string
240pub fn metrics() -> String {
241    let encoder = TextEncoder::new();
242    let metric_families = REGISTRY.gather();
243    encoder.encode_to_string(&metric_families).unwrap()
244}
245
246impl State {
247    fn with_metrics() -> Self {
248        let mut s = State {
249            metrics: Default::default(),
250            sensors: Default::default(),
251        };
252
253        let metrics = vec![
254            register_gauge_vec_with_registry!(
255                opts!("battery", "Battery level of sensors"),
256                &["manufacturername", "modelid", "name", "swversion"],
257                REGISTRY
258            )
259            .unwrap(),
260            register_gauge_vec_with_registry!(
261                opts!("humidity", "Humidity level"),
262                &["manufacturername", "modelid", "name", "swversion", "type"],
263                REGISTRY,
264            )
265            .unwrap(),
266            register_gauge_vec_with_registry!(
267                opts!("pressure", "Pressure level"),
268                &["manufacturername", "modelid", "name", "swversion", "type"],
269                REGISTRY
270            )
271            .unwrap(),
272            register_gauge_vec_with_registry!(
273                opts!("temperature", "Temperature level"),
274                &["manufacturername", "modelid", "name", "swversion", "type"],
275                REGISTRY,
276            )
277            .unwrap(),
278        ];
279
280        for gauge in metrics {
281            s.metrics
282                .insert(gauge.desc()[0].fq_name.clone(), gauge.clone());
283        }
284
285        s
286    }
287}
288
289impl Sensor {
290    /// Convert sensor into prometheus labels
291    fn labels(&self, tipe: bool) -> HashMap<&str, &str> {
292        vec![
293            ("manufacturername", &self.manufacturername),
294            ("modelid", &self.modelid),
295            ("name", &self.name),
296            ("swversion", self.swversion.as_ref().unwrap_or(&self.dummy)),
297            if tipe {
298                ("type", &self.tipe)
299            } else {
300                ("", &self.dummy)
301            },
302        ]
303        .into_iter()
304        .filter(|(name, _)| !name.is_empty())
305        .map(|(name, value)| (name, value.as_str()))
306        .collect()
307    }
308}
309
310#[cfg(test)]
311mod test {
312    use super::*;
313
314    #[test]
315    #[ignore]
316    fn read_config() {
317        let resp = gateway(
318            &Url::parse("http://nyx.jabid.in:4501").unwrap(),
319            "381412B455",
320        );
321
322        match resp {
323            Ok(cfg) => {
324                assert_eq!(cfg.apiversion, "1.16.0");
325                assert_eq!(cfg.bridgeid, "00212EFFFF07D25D")
326            }
327            Err(e) => {
328                panic!("Failed to read gateway config from home assistant: {}", e)
329            }
330        }
331    }
332
333    #[test]
334    fn test_process() {
335        let events = include_str!("../events.json");
336        let mut state = State::with_metrics();
337
338        for event in events.lines().filter(|l| !l.trim().is_empty()) {
339            let mut e = serde_json::from_str::<Event>(event)
340                .unwrap_or_else(|err| panic!("Failed to parse event {}: {}", &event, err));
341
342            process(&mut e, &mut state)
343                .unwrap_or_else(|err| panic!("Failed to process event {:?}: {}", &e, err));
344        }
345
346        // Now that all the data is handled, make sure metrics are present.
347        let m = metrics();
348        let m = m
349            .lines()
350            .filter(|line| !line.starts_with('#'))
351            .collect::<Vec<_>>();
352
353        dbg!(&m);
354
355        assert!(m.len() > 10, "Too few metrics exported")
356    }
357}