1use std::cell::RefCell;
25use std::collections::HashMap;
26use std::fs::File;
27use std::io::{Error, ErrorKind};
28use std::net::{SocketAddrV4, UdpSocket};
29use std::path::{Path, PathBuf};
30use std::thread;
31use std::time::Duration;
32
33use chrono::Utc;
34use log;
35use serde::{Deserialize, Serialize};
36use serde_json::Value;
37use serde_yaml;
38use serialport::{self, SerialPort};
39
40pub mod messages;
41use messages::*;
42pub use messages::{Measurement, SensorData};
43
44#[cfg(test)]
45pub mod tests;
46
47pub type Result<T> = std::result::Result<T, Error>;
49
50const BUFFER_SIZE: usize = 1024;
51const TIMEOUT: Duration = Duration::from_secs(4);
52
53#[derive(Debug, Deserialize, Serialize)]
68pub struct DhtLoggerConfig {
69 pub port: PathBuf,
70 pub baud: u32,
71 pub logger_config: HashMap<String, Value>,
72}
73
74impl DhtLoggerConfig {
75 pub fn load_yaml(config_file: &Path) -> DhtLoggerConfig {
77 let config_file = File::open(config_file).unwrap();
78 match serde_yaml::from_reader(config_file) {
79 Ok(dht_logger) => dht_logger,
80 Err(_) => panic!("YAML parse error in DHT logger config."),
81 }
82 }
83}
84
85pub struct DhtLogger {
92 port: RefCell<Box<dyn SerialPort>>,
93 verbose: bool,
94 udp_addrs: Vec<SocketAddrV4>,
95 udp_socket: Option<UdpSocket>,
96}
97
98impl DhtLogger {
99 pub fn new(port: Box<dyn SerialPort>, logger_config: HashMap<String, Value>) -> DhtLogger {
105 let verbose = if let Some(verbose) = logger_config.get("verbose") {
106 if let Value::Bool(verbose) = verbose {
107 *verbose
108 } else {
109 panic!("logger.verbose must be boolean, got value: {}", verbose)
110 }
111 } else {
112 false
113 };
114
115 let default = Value::Array(Vec::new());
116 let udp_addrs: Vec<SocketAddrV4> = logger_config
117 .get("udp")
118 .unwrap_or(&default)
119 .as_array()
120 .expect("logger.udp must be a list")
121 .iter()
122 .map(|addr| {
123 addr.as_str().expect(&format!(
124 "UDP addresses must be strings, got value: {}",
125 addr
126 ))
127 })
128 .map(|addr| {
129 addr.parse()
130 .expect(&format!("Failed to parse IP:PORT, got value: {}", addr))
131 })
132 .collect();
133
134 let udp_socket = match udp_addrs.len() {
135 0 => None,
136 _ => Some(UdpSocket::bind("0.0.0.0:0").unwrap()),
137 };
138
139 DhtLogger {
140 port: RefCell::new(port),
141 verbose,
142 udp_addrs,
143 udp_socket,
144 }
145 }
146
147 pub fn from_config(config: &DhtLoggerConfig) -> DhtLogger {
149 let port = serialport::new(config.port.to_str().unwrap(), config.baud)
150 .timeout(TIMEOUT)
151 .open()
152 .expect(&format!(
153 "Failed to open port: {}",
154 config.port.to_str().unwrap()
155 ));
156
157 log::trace!("Data bits: {:?}", port.data_bits());
159 log::trace!("Flow control: {:?}", port.flow_control());
160 log::trace!("Parity: {:?}", port.parity());
161 log::trace!("Stop bits: {:?}", port.stop_bits());
162 log::trace!("Timeout: {:?}", port.timeout());
163
164 DhtLogger::new(port, config.logger_config.to_owned())
165 }
166
167 pub fn port(&self) -> Option<PathBuf> {
169 match self.port.borrow().name() {
170 Some(name) => Some(Path::new(&name).to_path_buf()),
171 None => None,
172 }
173 }
174
175 pub fn read_sensor(&self) -> Result<DhtSensors> {
178 let mut buffer: [u8; BUFFER_SIZE] = [0; BUFFER_SIZE];
179 let n_bytes = self.port.borrow_mut().read(&mut buffer)?;
180 let buffer = &buffer[..n_bytes];
181 if let Ok(buffer) = std::str::from_utf8(&buffer) {
182 log::trace!("got bytes: {}", buffer);
183 }
184 let timestamp = Utc::now();
185 let raw = match serde_json::from_slice::<Value>(&buffer)? {
186 Value::Object(map) => map,
187 _ => {
188 return Err(Error::new(
189 ErrorKind::InvalidData,
190 "DHT logger data must be a JSON mapping",
191 ))
192 }
193 };
194
195 let mut sensors = HashMap::new();
196 for (key, value) in raw.iter() {
197 let value = if let Value::Object(map) = value {
198 map
199 } else {
200 panic!("Sensor value must be a JSON mapping, got value: {}", value);
201 };
202
203 let measurement = if let Some(error) = value.get("e") {
204 let error = if let Value::String(error) = error {
205 error
206 } else {
207 panic!("Error value must be a string, got value: {}", error);
208 };
209 Measurement::new(None, Some(error))
210 } else {
211 let raw: DhtDataRaw = serde_json::from_value(Value::Object(value.clone()))?;
212 Measurement::new(Some(SensorData::from(raw)), None)
213 };
214
215 if let Some(error) = measurement.get_error() {
216 log::warn!("Error reading '{}' sensor: {}", key, error);
217 continue;
218 }
219
220 let data = measurement.get_data().unwrap();
221 sensors.insert(String::from(key), data);
222 }
223
224 Ok(DhtSensors {
225 timestamp,
226 data: sensors,
227 })
228 }
229
230 pub fn wait_for_sensor(&self, retries: u32) -> Result<DhtSensors> {
235 let mut retry: u32 = 0;
236 loop {
237 match self.read_sensor() {
238 Ok(measurement) => {
239 return Ok(measurement);
240 }
241 Err(err) => {
242 retry += 1;
243 log::trace!("{}", err);
244 if retry == retries {
245 return Err(err);
246 }
247 thread::sleep(Duration::from_millis(100));
248 }
249 }
250 }
251 }
252
253 pub fn log_measurement(&self, measurement: DhtSensors) -> Result<()> {
256 let data_pretty = serde_json::to_string_pretty(&measurement)?;
258 let data_pretty = format!("Received measurement:\n{}", data_pretty);
259 if self.verbose {
260 log::info!("{}", data_pretty);
261 } else {
262 log::debug!("{}", data_pretty);
263 }
264
265 if let Some(udp_socket) = &self.udp_socket {
267 let data_json = serde_json::to_vec(&DhtSensorsSerde::from(measurement))?;
268 log::trace!("{}", std::str::from_utf8(data_json.as_slice()).unwrap());
269 for addr in self.udp_addrs.iter() {
270 let bytes_sent = udp_socket.send_to(data_json.as_slice(), addr)?;
271 log::trace!("Sent {} bytes to UDP addr: {:?}", bytes_sent, addr);
272 }
273 }
274
275 Ok(())
276 }
277
278 pub fn read_sensor_and_log_data(&self, retries: u32) {
283 let measurement = match self.wait_for_sensor(retries) {
284 Ok(data) => data,
285 Err(_) => return,
286 };
287
288 if let Err(err) = self.log_measurement(measurement) {
289 log::warn!("{}", err);
290 }
291 }
292}