1use drmem_api::{
2 device,
3 driver::{self, DriverConfig},
4 Error, Result,
5};
6use std::convert::{Infallible, TryFrom};
7use std::{future::Future, pin::Pin, sync::Arc, time::SystemTime};
8use tokio::sync::Mutex;
9use tokio::time::{interval_at, Duration, Instant};
10use tracing::{debug, error, warn, Span};
11use weather_underground as wu;
12
13const DEFAULT_INTERVAL: u64 = 10;
14const MIN_PUBLIC_INTERVAL: u64 = 10;
15
16enum PrecipState {
23 NoRain,
24 Rain {
25 prev: f64,
26 running: f64,
27 time: SystemTime,
28 },
29 Pause {
30 prev: f64,
31 },
32}
33
34impl PrecipState {
35 fn new() -> Self {
36 PrecipState::NoRain
37 }
38
39 fn update(
44 &mut self,
45 p_rate: f64,
46 p_total: f64,
47 now: SystemTime,
48 ) -> (f64, f64, Option<f64>) {
49 match self {
50 Self::NoRain => {
54 if p_total > 0.0 {
58 *self = Self::Rain {
59 prev: p_total,
60 running: p_total,
61 time: now,
62 };
63 }
64 (p_rate, p_total, None)
65 }
66
67 Self::Pause { prev } => {
71 (
72 p_rate,
73 if p_total == 0.0 {
76 if p_rate == 0.0 {
77 *self = Self::NoRain;
78 }
79 0.0
80 }
81 else if p_total > *prev {
86 let total = p_total - *prev;
87
88 *self = Self::Rain {
89 prev: p_total,
90 running: total,
91 time: now,
92 };
93 total
94 }
95 else if p_total < *prev {
100 *self = Self::Rain {
101 prev: p_total,
102 running: p_total,
103 time: now,
104 };
105 p_total
106 } else {
107 0.0
108 },
109 None,
110 )
111 }
112
113 Self::Rain {
115 prev,
116 running,
117 time,
118 } => {
119 const TIMEOUT: Duration = Duration::from_secs(36_000);
120 let delta = now
121 .duration_since(*time)
122 .unwrap_or_else(|_| Duration::from_secs(0));
123
124 if p_rate == 0.0 && delta >= TIMEOUT {
129 let last_total = *running;
130
131 *self = Self::Pause { prev: *prev };
132 (0.0, 0.0, Some(last_total))
133 } else {
134 *running += if *prev > p_total {
139 p_total
140 } else {
141 p_total - *prev
142 };
143
144 if p_rate > 0.0 {
148 *time = now;
149 }
150
151 *prev = p_total;
154 (p_rate, *running, None)
155 }
156 }
157 }
158 }
159}
160
161pub struct Instance {
162 con: reqwest::Client,
163 api_key: String,
164 interval: Duration,
165
166 precip: PrecipState,
167}
168
169pub struct Devices {
170 station: String,
171 units: wu::Unit,
172
173 d_dewpt: driver::ReadOnlyDevice<f64>,
174 d_htidx: driver::ReadOnlyDevice<f64>,
175 d_humidity: driver::ReadOnlyDevice<f64>,
176 d_prec_rate: driver::ReadOnlyDevice<f64>,
177 d_prec_total: driver::ReadOnlyDevice<f64>,
178 d_prec_last_total: driver::ReadOnlyDevice<f64>,
179 d_pressure: driver::ReadOnlyDevice<f64>,
180 d_solrad: driver::ReadOnlyDevice<f64>,
181 d_state: driver::ReadOnlyDevice<bool>,
182 d_temp: driver::ReadOnlyDevice<f64>,
183 d_uv: driver::ReadOnlyDevice<f64>,
184 d_wndchl: driver::ReadOnlyDevice<f64>,
185 d_wnddir: driver::ReadOnlyDevice<f64>,
186 d_wndgst: driver::ReadOnlyDevice<f64>,
187 d_wndspd: driver::ReadOnlyDevice<f64>,
188}
189
190impl Instance {
191 pub const NAME: &'static str = "weather-wu";
192
193 pub const SUMMARY: &'static str =
194 "obtains weather data from Weather Underground";
195
196 pub const DESCRIPTION: &'static str = include_str!("../README.md");
197
198 fn get_cfg_station(cfg: &DriverConfig) -> Result<String> {
199 match cfg.get("station") {
200 Some(toml::value::Value::String(station)) => {
201 Ok(station.to_string())
202 }
203 Some(_) => Err(Error::ConfigError(String::from(
204 "'station' config parameter should be a string",
205 ))),
206 None => Err(Error::ConfigError(String::from(
207 "missing 'station' parameter in config",
208 ))),
209 }
210 }
211
212 fn get_cfg_interval(cfg: &DriverConfig) -> Result<u64> {
213 match cfg.get("interval") {
214 Some(toml::value::Value::Integer(val)) => {
215 Ok(std::cmp::max(*val as u64, 1))
216 }
217 Some(_) => Err(Error::ConfigError(String::from(
218 "'interval' config parameter should be a positive integer",
219 ))),
220 None => Ok(DEFAULT_INTERVAL),
221 }
222 }
223
224 fn get_cfg_key(cfg: &DriverConfig) -> Result<Option<String>> {
225 match cfg.get("key") {
226 Some(toml::value::Value::String(val)) => Ok(Some(val.to_string())),
227 Some(_) => Err(Error::ConfigError(String::from(
228 "'key' config parameter should be a string",
229 ))),
230 None => Ok(None),
231 }
232 }
233
234 async fn get_cfg_key_and_interval(
235 con: &mut reqwest::Client,
236 key: Option<String>,
237 interval: u64,
238 ) -> Result<(String, Duration)> {
239 match key {
240 Some(val) => Ok((val, Duration::from_secs(interval * 60))),
241 None => {
242 if let Ok(api_key) = wu::fetch_api_key(con).await {
243 Ok((
244 api_key,
245 Duration::from_secs(
246 std::cmp::max(interval, MIN_PUBLIC_INTERVAL) * 60,
247 ),
248 ))
249 } else {
250 error!("couldn't determine public API key");
251 Err(Error::NotFound)
252 }
253 }
254 }
255 }
256
257 fn get_cfg_units(cfg: &DriverConfig) -> Result<wu::Unit> {
258 match cfg.get("units") {
259 Some(toml::value::Value::String(val)) => match val.as_str() {
260 "metric" => Ok(wu::Unit::Metric),
261 "imperial" => Ok(wu::Unit::English),
262 _ => Err(Error::ConfigError(String::from(
263 "'units' parameter should be \"imperial\" or \"metric\"",
264 ))),
265 },
266 Some(_) => Err(Error::ConfigError(String::from(
267 "'units' parameter should be a string",
268 ))),
269 None => Ok(wu::Unit::Metric),
270 }
271 }
272
273 async fn handle(
278 &mut self,
279 obs: &wu::Observation,
280 devices: &mut <Instance as driver::API>::DeviceSet,
281 ) {
282 let (dewpt, htidx, prate, ptotal, press, temp, wndchl, wndgst, wndspd) =
286 if let wu::Unit::Metric = devices.units {
287 if let Some(params) = &obs.metric {
288 (
289 params.dewpt,
290 params.heat_index,
291 params.precip_rate,
292 params.precip_total,
293 params.pressure,
294 params.temp,
295 params.wind_chill,
296 params.wind_gust,
297 params.wind_speed,
298 )
299 } else {
300 panic!("weather data didn't return any metric data")
301 }
302 } else if let Some(params) = &obs.imperial {
303 (
304 params.dewpt,
305 params.heat_index,
306 params.precip_rate,
307 params.precip_total,
308 params.pressure,
309 params.temp,
310 params.wind_chill,
311 params.wind_gust,
312 params.wind_speed,
313 )
314 } else {
315 panic!("weather data didn't return any imperial data")
316 };
317
318 if let Some(dewpt) = dewpt {
319 if (0.0..=200.0).contains(&dewpt) {
320 devices.d_dewpt.report_update(dewpt).await
321 } else {
322 warn!("ignoring bad dew point value: {:.1}", dewpt)
323 }
324 }
325
326 if let Some(htidx) = htidx {
327 if (0.0..=200.0).contains(&htidx) {
328 devices.d_htidx.report_update(htidx).await
329 } else {
330 warn!("ignoring bad heat index value: {:.1}", htidx)
331 }
332 }
333
334 if let (Some(prate), Some(ptotal)) = (prate, ptotal) {
335 let (nrate, ntotal, nlast) =
336 self.precip.update(prate, ptotal, SystemTime::now());
337
338 devices.d_prec_rate.report_update(nrate).await;
339 devices.d_prec_total.report_update(ntotal).await;
340
341 if let Some(last) = nlast {
342 devices.d_prec_last_total.report_update(last).await;
343 }
344 } else {
345 warn!("need both precip fields to update precip calculations")
346 }
347
348 if let Some(press) = press {
349 devices.d_pressure.report_update(press).await
350 }
351
352 if let Some(temp) = temp {
353 devices.d_temp.report_update(temp).await
354 }
355
356 if let Some(wndchl) = wndchl {
357 devices.d_wndchl.report_update(wndchl).await
358 }
359
360 if let Some(wndgst) = wndgst {
361 devices.d_wndgst.report_update(wndgst).await
362 }
363
364 if let Some(wndspd) = wndspd {
365 devices.d_wndspd.report_update(wndspd).await
366 }
367
368 if let Some(sol_rad) = obs.solar_radiation {
371 if (0.0..=1400.0).contains(&sol_rad) {
377 devices.d_solrad.report_update(sol_rad).await
378 } else {
379 warn!("ignoring bad solar radiation value: {:.1}", sol_rad)
380 }
381 }
382
383 if let Some(humidity) = obs.humidity {
386 if (0.0..=100.0).contains(&humidity) {
390 devices.d_humidity.report_update(humidity).await
391 } else {
392 warn!("ignoring bad humidity value: {:.1}", humidity)
393 }
394 }
395
396 if let Some(uv) = obs.uv {
399 devices.d_uv.report_update(uv).await
400 }
401
402 if let Some(winddir) = obs.winddir {
405 if (0.0..=360.0).contains(&winddir) {
408 devices.d_wnddir.report_update(winddir).await
409 } else {
410 warn!("ignoring bad wind direction value: {:.1}", winddir)
411 }
412 }
413 }
414}
415
416impl driver::API for Instance {
417 type DeviceSet = Devices;
418
419 fn register_devices(
420 core: driver::RequestChan,
421 cfg: &DriverConfig,
422 max_history: Option<usize>,
423 ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
424 let dewpoint_name = "dewpoint".parse::<device::Base>().unwrap();
425 let heat_index_name = "heat-index".parse::<device::Base>().unwrap();
426 let humidity_name = "humidity".parse::<device::Base>().unwrap();
427 let precip_rate_name = "precip-rate".parse::<device::Base>().unwrap();
428 let precip_total_name = "precip-total".parse::<device::Base>().unwrap();
429 let precip_last_total_name =
430 "precip-last-total".parse::<device::Base>().unwrap();
431 let pressure_name = "pressure".parse::<device::Base>().unwrap();
432 let solar_rad_name = "solar-rad".parse::<device::Base>().unwrap();
433 let state_name = "state".parse::<device::Base>().unwrap();
434 let temperature_name = "temperature".parse::<device::Base>().unwrap();
435 let uv_name = "uv".parse::<device::Base>().unwrap();
436 let wind_chill_name = "wind-chill".parse::<device::Base>().unwrap();
437 let wind_dir_name = "wind-dir".parse::<device::Base>().unwrap();
438 let wind_gust_name = "wind-gust".parse::<device::Base>().unwrap();
439 let wind_speed_name = "wind-speed".parse::<device::Base>().unwrap();
440
441 let station = Instance::get_cfg_station(cfg);
442 let units = Instance::get_cfg_units(cfg);
443
444 Box::pin(async move {
445 let station = station?;
446 let units = units?;
447
448 let temp_unit = Some(if let wu::Unit::English = units {
449 "°F"
450 } else {
451 "°C"
452 });
453 let speed_unit = Some(if let wu::Unit::English = units {
454 "mph"
455 } else {
456 "km/h"
457 });
458
459 let d_dewpt = core
460 .add_ro_device(dewpoint_name, temp_unit, max_history)
461 .await?;
462 let d_htidx = core
463 .add_ro_device(heat_index_name, temp_unit, max_history)
464 .await?;
465 let d_humidity = core
466 .add_ro_device(humidity_name, Some("%"), max_history)
467 .await?;
468 let d_prec_rate = core
469 .add_ro_device(
470 precip_rate_name,
471 Some(if let wu::Unit::English = units {
472 "in/hr"
473 } else {
474 "mm/hr"
475 }),
476 max_history,
477 )
478 .await?;
479
480 let d_prec_total = core
481 .add_ro_device(
482 precip_total_name,
483 Some(if let wu::Unit::English = units {
484 "in"
485 } else {
486 "mm"
487 }),
488 max_history,
489 )
490 .await?;
491
492 let d_prec_last_total = core
493 .add_ro_device(
494 precip_last_total_name,
495 Some(if let wu::Unit::English = units {
496 "in"
497 } else {
498 "mm"
499 }),
500 max_history,
501 )
502 .await?;
503
504 let d_pressure = core
505 .add_ro_device(
506 pressure_name,
507 Some(if let wu::Unit::English = units {
508 "inHg"
509 } else {
510 "hPa"
511 }),
512 max_history,
513 )
514 .await?;
515
516 let d_solrad = core
517 .add_ro_device(solar_rad_name, Some("W/m²"), max_history)
518 .await?;
519 let d_state =
520 core.add_ro_device(state_name, None, max_history).await?;
521 let d_temp = core
522 .add_ro_device(temperature_name, temp_unit, max_history)
523 .await?;
524 let d_uv = core.add_ro_device(uv_name, None, max_history).await?;
525 let d_wndchl = core
526 .add_ro_device(wind_chill_name, temp_unit, max_history)
527 .await?;
528 let d_wnddir = core
529 .add_ro_device(wind_dir_name, Some("°"), max_history)
530 .await?;
531 let d_wndgst = core
532 .add_ro_device(wind_gust_name, speed_unit, max_history)
533 .await?;
534 let d_wndspd = core
535 .add_ro_device(wind_speed_name, speed_unit, max_history)
536 .await?;
537
538 Ok(Devices {
539 station,
540 units,
541 d_dewpt,
542 d_htidx,
543 d_humidity,
544 d_prec_rate,
545 d_prec_total,
546 d_prec_last_total,
547 d_pressure,
548 d_solrad,
549 d_state,
550 d_temp,
551 d_uv,
552 d_wndchl,
553 d_wnddir,
554 d_wndgst,
555 d_wndspd,
556 })
557 })
558 }
559
560 fn create_instance(
561 cfg: &DriverConfig,
562 ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
563 debug!("reading config parameters");
564
565 let interval = Instance::get_cfg_interval(cfg);
566 let key = Instance::get_cfg_key(cfg);
567
568 Span::current().record("cfg", Instance::get_cfg_station(cfg).unwrap());
569
570 let fut = async move {
571 match wu::create_client(Duration::from_secs(5)) {
572 Ok(mut con) => {
573 let (api_key, interval) =
576 Instance::get_cfg_key_and_interval(
577 &mut con, key?, interval?,
578 )
579 .await?;
580
581 debug!("instance successfully created");
584
585 Ok(Box::new(Instance {
586 con,
587 api_key,
588 interval,
589 precip: PrecipState::new(),
590 }))
591 }
592 Err(e) => Err(Error::ConfigError(format!(
593 "couldn't build client connection -- {}",
594 &e
595 ))),
596 }
597 };
598
599 Box::pin(fut)
600 }
601
602 fn run<'a>(
603 &'a mut self,
604 devices: Arc<Mutex<Self::DeviceSet>>,
605 ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
606 let fut = async move {
607 let mut devices = devices.lock().await;
608
609 Span::current().record("cfg", devices.station.as_str());
610
611 let mut timer = interval_at(Instant::now(), self.interval);
612
613 loop {
616 debug!("waiting for next poll time");
617
618 timer.tick().await;
621
622 debug!("fetching next observation");
623
624 let result = wu::fetch_observation(
625 &self.con,
626 &self.api_key,
627 &devices.station,
628 &devices.units,
629 )
630 .await;
631
632 match result {
633 Ok(Some(response)) => {
634 match wu::ObservationResponse::try_from(response) {
635 Ok(resp) => {
636 if let Some(obs) = resp.observations {
637 if !obs.is_empty() {
638 if obs.len() > 1 {
645 warn!("ignoring {} extra weather observations", obs.len() - 1);
646 }
647 devices
648 .d_state
649 .report_update(true)
650 .await;
651 self.handle(&obs[0], &mut devices)
652 .await;
653 continue;
654 }
655 }
656 warn!("no weather data received")
657 }
658
659 Err(e) => {
660 devices.d_state.report_update(false).await;
661 panic!("error response from Weather Underground -- {:?}", &e)
662 }
663 }
664 }
665
666 Ok(None) => {
667 devices.d_state.report_update(false).await;
668 panic!("no response from Weather Underground")
669 }
670
671 Err(e) => {
672 devices.d_state.report_update(false).await;
673 panic!(
674 "error accessing Weather Underground -- {:?}",
675 &e
676 )
677 }
678 }
679 }
680 };
681
682 Box::pin(fut)
683 }
684}
685
686#[cfg(test)]
687mod tests {
688 use super::PrecipState;
689 use std::time::{Duration, SystemTime, UNIX_EPOCH};
690
691 fn mk_time(secs: u64) -> SystemTime {
692 UNIX_EPOCH.checked_add(Duration::from_secs(secs)).unwrap()
693 }
694
695 #[test]
696 fn test_precip() {
697 {
701 let mut s = PrecipState::new();
702
703 assert_eq!(s.update(0.0, 0.0, mk_time(0)), (0.0, 0.0, None));
707 assert_eq!(s.update(0.0, 0.0, mk_time(600)), (0.0, 0.0, None));
708
709 assert_eq!(s.update(0.1, 0.0, mk_time(1200)), (0.1, 0.0, None));
713
714 assert_eq!(s.update(0.0, 0.0, mk_time(1800)), (0.0, 0.0, None));
718
719 assert_eq!(s.update(0.1, 0.125, mk_time(2400)), (0.1, 0.125, None));
722 assert_eq!(s.update(0.05, 0.25, mk_time(3000)), (0.05, 0.25, None));
723 assert_eq!(s.update(0.7, 0.375, mk_time(3600)), (0.7, 0.375, None));
724
725 assert_eq!(s.update(0.0, 0.375, mk_time(4200)), (0.0, 0.375, None));
728
729 assert_eq!(s.update(0.0, 0.0, mk_time(4800)), (0.0, 0.375, None));
733
734 assert_eq!(s.update(0.1, 0.125, mk_time(5400)), (0.1, 0.5, None));
737 assert_eq!(s.update(0.0, 0.125, mk_time(6000)), (0.0, 0.5, None));
738 assert_eq!(s.update(0.0, 0.125, mk_time(41_399)), (0.0, 0.5, None));
739 assert_eq!(
740 s.update(0.0, 0.125, mk_time(41_400)),
741 (0.0, 0.0, Some(0.5))
742 );
743 assert_eq!(s.update(0.0, 0.125, mk_time(42_001)), (0.0, 0.0, None));
744
745 assert_eq!(s.update(0.1, 0.125, mk_time(40_800)), (0.1, 0.0, None));
748 assert_eq!(
749 s.update(0.1, 0.25, mk_time(41_400)),
750 (0.1, 0.125, None)
751 );
752 }
753
754 {
757 let mut s = PrecipState::new();
758
759 assert_eq!(s.update(0.1, 0.125, mk_time(0)), (0.1, 0.125, None));
763 assert_eq!(s.update(0.05, 0.25, mk_time(600)), (0.05, 0.25, None));
764 assert_eq!(s.update(0.7, 0.375, mk_time(1200)), (0.7, 0.375, None));
765 assert_eq!(s.update(0.3, 0.125, mk_time(1800)), (0.3, 0.5, None));
766 assert_eq!(s.update(0.3, 0.25, mk_time(2400)), (0.3, 0.625, None));
767
768 assert_eq!(s.update(0.1, 0.0, mk_time(3000)), (0.1, 0.625, None));
773 assert_eq!(s.update(0.0, 0.0, mk_time(3600)), (0.0, 0.625, None));
774 assert_eq!(s.update(0.3, 0.125, mk_time(4200)), (0.3, 0.75, None));
775 }
776 }
777}