dht_logger/
lib.rs

1//! DHT Logger
2//!
3//! This crate is for logging measurement from a device reading DHT sensors and writing the
4//! measurements over a serial connection. The hardware producing the data does not matter, but it
5//! must be logging data over serial in JSON with fields for temperature, humidity, and heat index.
6//! Here's a pretty version of an example reading:
7//! ```json
8//! {
9//!   "sensor_label": {
10//!     "t": 20.0,
11//!     "h": 50.0,
12//!     "hi": 20.0
13//!   },
14//!   "another_sensor": {
15//!     "error": "some error message"
16//!   }
17//! }
18//! ```
19//!
20//! This code has been tested using
21//! [arduino-dht-logger](https://github.com/domagalski/arduino-dht-logger) as the hardware source
22//! providing data over serial.
23
24use std::cell::RefCell;
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::{Error, ErrorKind};
28use std::net::{SocketAddrV4, UdpSocket};
29use std::path::{Path, PathBuf};
30use std::thread;
31use std::time::Duration;
32
33use chrono::Utc;
34use log;
35use serde::{Deserialize, Serialize};
36use serde_json::Value;
37use serde_yaml;
38use serialport::{self, SerialPort};
39
40pub mod messages;
41use messages::*;
42pub use messages::{Measurement, SensorData};
43
44#[cfg(test)]
45pub mod tests;
46
47/// Contain results with `std::io::Error` as the `Error` implementation.
48pub type Result<T> = std::result::Result<T, Error>;
49
50const BUFFER_SIZE: usize = 1024;
51const TIMEOUT: Duration = Duration::from_secs(4);
52
53/// Configuration of a DHT Logger client.
54///
55/// Example configuration YAML:
56/// ```yaml
57/// # Serial port configuration
58/// port: /dev/ttyUSB0
59/// baud: 115200
60///
61/// # Configure how the sensor data is logged.
62/// logger_config:
63///   # verbose: true tells the logger to
64///   # use log::info! for sensor readings
65///   verbose: true
66/// ```
67#[derive(Debug, Deserialize, Serialize)]
68pub struct DhtLoggerConfig {
69    pub port: PathBuf,
70    pub baud: u32,
71    pub logger_config: HashMap<String, Value>,
72}
73
74impl DhtLoggerConfig {
75    /// Load a YAML config file into a config struct
76    pub fn load_yaml(config_file: &Path) -> DhtLoggerConfig {
77        let config_file = File::open(config_file).unwrap();
78        match serde_yaml::from_reader(config_file) {
79            Ok(dht_logger) => dht_logger,
80            Err(_) => panic!("YAML parse error in DHT logger config."),
81        }
82    }
83}
84
85/// DHT Logger client.
86///
87/// This is for reading data over serial and logging it using various means.
88///
89/// Supported logging methods:
90/// * `verbose`: Log incoming data using `log::info!`
91pub struct DhtLogger {
92    port: RefCell<Box<dyn SerialPort>>,
93    verbose: bool,
94    udp_addrs: Vec<SocketAddrV4>,
95    udp_socket: Option<UdpSocket>,
96}
97
98impl DhtLogger {
99    /// Create a DHT logger from an existing serial port.
100    ///
101    /// Args:
102    /// * `port`: An interface to use as a serial port.
103    /// * `logger_config`: Configure how data is logged. See the `DhtLoggerConfig` documentation.
104    pub fn new(port: Box<dyn SerialPort>, logger_config: HashMap<String, Value>) -> DhtLogger {
105        let verbose = if let Some(verbose) = logger_config.get("verbose") {
106            if let Value::Bool(verbose) = verbose {
107                *verbose
108            } else {
109                panic!("logger.verbose must be boolean, got value: {}", verbose)
110            }
111        } else {
112            false
113        };
114
115        let default = Value::Array(Vec::new());
116        let udp_addrs: Vec<SocketAddrV4> = logger_config
117            .get("udp")
118            .unwrap_or(&default)
119            .as_array()
120            .expect("logger.udp must be a list")
121            .iter()
122            .map(|addr| {
123                addr.as_str().expect(&format!(
124                    "UDP addresses must be strings, got value: {}",
125                    addr
126                ))
127            })
128            .map(|addr| {
129                addr.parse()
130                    .expect(&format!("Failed to parse IP:PORT, got value: {}", addr))
131            })
132            .collect();
133
134        let udp_socket = match udp_addrs.len() {
135            0 => None,
136            _ => Some(UdpSocket::bind("0.0.0.0:0").unwrap()),
137        };
138
139        DhtLogger {
140            port: RefCell::new(port),
141            verbose,
142            udp_addrs,
143            udp_socket,
144        }
145    }
146
147    /// Create a DHT logger from a DhtLoggerConfig.
148    pub fn from_config(config: &DhtLoggerConfig) -> DhtLogger {
149        let port = serialport::new(config.port.to_str().unwrap(), config.baud)
150            .timeout(TIMEOUT)
151            .open()
152            .expect(&format!(
153                "Failed to open port: {}",
154                config.port.to_str().unwrap()
155            ));
156
157        // trace log serial port parameters
158        log::trace!("Data bits: {:?}", port.data_bits());
159        log::trace!("Flow control: {:?}", port.flow_control());
160        log::trace!("Parity: {:?}", port.parity());
161        log::trace!("Stop bits: {:?}", port.stop_bits());
162        log::trace!("Timeout: {:?}", port.timeout());
163
164        DhtLogger::new(port, config.logger_config.to_owned())
165    }
166
167    /// Get the name of the serial port.
168    pub fn port(&self) -> Option<PathBuf> {
169        match self.port.borrow().name() {
170            Some(name) => Some(Path::new(&name).to_path_buf()),
171            None => None,
172        }
173    }
174
175    /// Read sensor data over serial and return it. This blocks until data is readable over the
176    /// serial interface or a timeout occurs.
177    pub fn read_sensor(&self) -> Result<DhtSensors> {
178        let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
179        let n_bytes = self.port.borrow_mut().read(&mut buffer)?;
180        let buffer = &buffer[..n_bytes];
181        if let Ok(buffer) = std::str::from_utf8(&buffer) {
182            log::trace!("got bytes: {}", buffer);
183        }
184        let timestamp = Utc::now();
185        let raw = match serde_json::from_slice::<Value>(&buffer)? {
186            Value::Object(map) => map,
187            _ => {
188                return Err(Error::new(
189                    ErrorKind::InvalidData,
190                    "DHT logger data must be a JSON mapping",
191                ))
192            }
193        };
194
195        let mut sensors = HashMap::new();
196        for (key, value) in raw.iter() {
197            let value = if let Value::Object(map) = value {
198                map
199            } else {
200                panic!("Sensor value must be a JSON mapping, got value: {}", value);
201            };
202
203            let measurement = if let Some(error) = value.get("e") {
204                let error = if let Value::String(error) = error {
205                    error
206                } else {
207                    panic!("Error value must be a string, got value: {}", error);
208                };
209                Measurement::new(None, Some(error))
210            } else {
211                let raw: DhtDataRaw = serde_json::from_value(Value::Object(value.clone()))?;
212                Measurement::new(Some(SensorData::from(raw)), None)
213            };
214
215            if let Some(error) = measurement.get_error() {
216                log::warn!("Error reading '{}' sensor: {}", key, error);
217                continue;
218            }
219
220            let data = measurement.get_data().unwrap();
221            sensors.insert(String::from(key), data);
222        }
223
224        Ok(DhtSensors {
225            timestamp,
226            data: sensors,
227        })
228    }
229
230    /// Wait for the sensor to return data for a specified amount of retries. If the number of
231    /// attempts to read data exceed the allowed number of retries, the last error message is
232    /// returned. If an error occurs, this function sleeps for 100s. All sensor read errors are
233    /// logged to `log::trace!` as they arrive.
234    pub fn wait_for_sensor(&self, retries: u32) -> Result<DhtSensors> {
235        let mut retry: u32 = 0;
236        loop {
237            match self.read_sensor() {
238                Ok(measurement) => {
239                    return Ok(measurement);
240                }
241                Err(err) => {
242                    retry += 1;
243                    log::trace!("{}", err);
244                    if retry == retries {
245                        return Err(err);
246                    }
247                    thread::sleep(Duration::from_millis(100));
248                }
249            }
250        }
251    }
252
253    /// Log a measurement to the all of the logging channels
254    /// configured in the logger config for the DHT Logger.
255    pub fn log_measurement(&self, measurement: DhtSensors) -> Result<()> {
256        // Verbose logging
257        let data_pretty = serde_json::to_string_pretty(&measurement)?;
258        let data_pretty = format!("Received measurement:\n{}", data_pretty);
259        if self.verbose {
260            log::info!("{}", data_pretty);
261        } else {
262            log::debug!("{}", data_pretty);
263        }
264
265        // UDP logging
266        if let Some(udp_socket) = &self.udp_socket {
267            let data_json = serde_json::to_vec(&DhtSensorsSerde::from(measurement))?;
268            log::trace!("{}", std::str::from_utf8(data_json.as_slice()).unwrap());
269            for addr in self.udp_addrs.iter() {
270                let bytes_sent = udp_socket.send_to(data_json.as_slice(), addr)?;
271                log::trace!("Sent {} bytes to UDP addr: {:?}", bytes_sent, addr);
272            }
273        }
274
275        Ok(())
276    }
277
278    /// Read data from the DHT sensor serial interface and log data to all logging channels.
279    ///
280    /// Args:
281    /// * `retries`: Number of sensor read retries (see `wait_for_sensor docs) before giving up.
282    pub fn read_sensor_and_log_data(&self, retries: u32) {
283        let measurement = match self.wait_for_sensor(retries) {
284            Ok(data) => data,
285            Err(_) => return,
286        };
287
288        if let Err(err) = self.log_measurement(measurement) {
289            log::warn!("{}", err);
290        }
291    }
292}