drmem_drv_weather_wu/
lib.rs

1use drmem_api::{
2    device,
3    driver::{self, DriverConfig},
4    Error, Result,
5};
6use std::convert::{Infallible, TryFrom};
7use std::{future::Future, pin::Pin, sync::Arc, time::SystemTime};
8use tokio::sync::Mutex;
9use tokio::time::{interval_at, Duration, Instant};
10use tracing::{debug, error, warn, Span};
11use weather_underground as wu;
12
13const DEFAULT_INTERVAL: u64 = 10;
14const MIN_PUBLIC_INTERVAL: u64 = 10;
15
16// This type defines a mini state machine to help us accumulate
17// rainfall. Some weather stations reset their rainfall total at
18// midnight -- even if it's still raining! This state machine tries to
19// recognize those resets to properly maintain its local precip
20// totals.
21
22enum PrecipState {
23    NoRain,
24    Rain {
25        prev: f64,
26        running: f64,
27        time: SystemTime,
28    },
29    Pause {
30        prev: f64,
31    },
32}
33
34impl PrecipState {
35    fn new() -> Self {
36        PrecipState::NoRain
37    }
38
39    // This method updates the state of the data based on new
40    // readings. It returns values to be reported by the driver for
41    // the three precip devices.
42
43    fn update(
44        &mut self,
45        p_rate: f64,
46        p_total: f64,
47        now: SystemTime,
48    ) -> (f64, f64, Option<f64>) {
49        match self {
50            // This state models when it isn't raining. It's the
51            // initial state and it will be re-entered when the
52            // weather station reports no rain.
53            Self::NoRain => {
54                // If there's a non-zero total, we need to switch to
55                // the rain state.
56
57                if p_total > 0.0 {
58                    *self = Self::Rain {
59                        prev: p_total,
60                        running: p_total,
61                        time: now,
62                    };
63                }
64                (p_rate, p_total, None)
65            }
66
67            // This state is active after the 10 hour time between
68            // rainfall has occurred, but the weather station is still
69            // reporting a non-zero precip total.
70            Self::Pause { prev } => {
71                (
72                    p_rate,
73                    // If the weather station resets its total, we can
74                    // go back to the `NoRain` state.
75                    if p_total == 0.0 {
76                        if p_rate == 0.0 {
77                            *self = Self::NoRain;
78                        }
79                        0.0
80                    }
81                    // If more rain is reported, then a new system has
82                    // rolled in. Go back to the `Rain` state, but set
83                    // the currently reported total as the baseline
84                    // with which to subtract future readings.
85                    else if p_total > *prev {
86                        let total = p_total - *prev;
87
88                        *self = Self::Rain {
89                            prev: p_total,
90                            running: total,
91                            time: now,
92                        };
93                        total
94                    }
95                    // The total is less than the previous, but not
96                    // 0. This means we crossed midnight -- resetting
97                    // the total -- but more rain occurred before we
98                    // sampled the data. Go into the `Rain` state.
99                    else if p_total < *prev {
100                        *self = Self::Rain {
101                            prev: p_total,
102                            running: p_total,
103                            time: now,
104                        };
105                        p_total
106                    } else {
107                        0.0
108                    },
109                    None,
110                )
111            }
112
113            // This state is active while it is raining.
114            Self::Rain {
115                prev,
116                running,
117                time,
118            } => {
119                const TIMEOUT: Duration = Duration::from_secs(36_000);
120                let delta = now
121                    .duration_since(*time)
122                    .unwrap_or_else(|_| Duration::from_secs(0));
123
124                // If the weather station reports no rainfall and
125                // reset its total, we emit the total as the value of
126                // the last rainfall.
127
128                if p_rate == 0.0 && delta >= TIMEOUT {
129                    let last_total = *running;
130
131                    *self = Self::Pause { prev: *prev };
132                    (0.0, 0.0, Some(last_total))
133                } else {
134                    // If the total is less than the previous value,
135                    // then we crossed midnight. Just use the reset
136                    // value as the delta.
137
138                    *running += if *prev > p_total {
139                        p_total
140                    } else {
141                        p_total - *prev
142                    };
143
144                    // If the rainfall rate is 0, don't update the
145                    // timeout value.
146
147                    if p_rate > 0.0 {
148                        *time = now;
149                    }
150
151                    // Update the totals in the state.
152
153                    *prev = p_total;
154                    (p_rate, *running, None)
155                }
156            }
157        }
158    }
159}
160
161pub struct Instance {
162    con: reqwest::Client,
163    api_key: String,
164    interval: Duration,
165
166    precip: PrecipState,
167}
168
169pub struct Devices {
170    station: String,
171    units: wu::Unit,
172
173    d_dewpt: driver::ReadOnlyDevice<f64>,
174    d_htidx: driver::ReadOnlyDevice<f64>,
175    d_humidity: driver::ReadOnlyDevice<f64>,
176    d_prec_rate: driver::ReadOnlyDevice<f64>,
177    d_prec_total: driver::ReadOnlyDevice<f64>,
178    d_prec_last_total: driver::ReadOnlyDevice<f64>,
179    d_pressure: driver::ReadOnlyDevice<f64>,
180    d_solrad: driver::ReadOnlyDevice<f64>,
181    d_state: driver::ReadOnlyDevice<bool>,
182    d_temp: driver::ReadOnlyDevice<f64>,
183    d_uv: driver::ReadOnlyDevice<f64>,
184    d_wndchl: driver::ReadOnlyDevice<f64>,
185    d_wnddir: driver::ReadOnlyDevice<f64>,
186    d_wndgst: driver::ReadOnlyDevice<f64>,
187    d_wndspd: driver::ReadOnlyDevice<f64>,
188}
189
190impl Instance {
191    pub const NAME: &'static str = "weather-wu";
192
193    pub const SUMMARY: &'static str =
194        "obtains weather data from Weather Underground";
195
196    pub const DESCRIPTION: &'static str = include_str!("../README.md");
197
198    fn get_cfg_station(cfg: &DriverConfig) -> Result<String> {
199        match cfg.get("station") {
200            Some(toml::value::Value::String(station)) => {
201                Ok(station.to_string())
202            }
203            Some(_) => Err(Error::ConfigError(String::from(
204                "'station' config parameter should be a string",
205            ))),
206            None => Err(Error::ConfigError(String::from(
207                "missing 'station' parameter in config",
208            ))),
209        }
210    }
211
212    fn get_cfg_interval(cfg: &DriverConfig) -> Result<u64> {
213        match cfg.get("interval") {
214            Some(toml::value::Value::Integer(val)) => {
215                Ok(std::cmp::max(*val as u64, 1))
216            }
217            Some(_) => Err(Error::ConfigError(String::from(
218                "'interval' config parameter should be a positive integer",
219            ))),
220            None => Ok(DEFAULT_INTERVAL),
221        }
222    }
223
224    fn get_cfg_key(cfg: &DriverConfig) -> Result<Option<String>> {
225        match cfg.get("key") {
226            Some(toml::value::Value::String(val)) => Ok(Some(val.to_string())),
227            Some(_) => Err(Error::ConfigError(String::from(
228                "'key' config parameter should be a string",
229            ))),
230            None => Ok(None),
231        }
232    }
233
234    async fn get_cfg_key_and_interval(
235        con: &mut reqwest::Client,
236        key: Option<String>,
237        interval: u64,
238    ) -> Result<(String, Duration)> {
239        match key {
240            Some(val) => Ok((val, Duration::from_secs(interval * 60))),
241            None => {
242                if let Ok(api_key) = wu::fetch_api_key(con).await {
243                    Ok((
244                        api_key,
245                        Duration::from_secs(
246                            std::cmp::max(interval, MIN_PUBLIC_INTERVAL) * 60,
247                        ),
248                    ))
249                } else {
250                    error!("couldn't determine public API key");
251                    Err(Error::NotFound)
252                }
253            }
254        }
255    }
256
257    fn get_cfg_units(cfg: &DriverConfig) -> Result<wu::Unit> {
258        match cfg.get("units") {
259            Some(toml::value::Value::String(val)) => match val.as_str() {
260                "metric" => Ok(wu::Unit::Metric),
261                "imperial" => Ok(wu::Unit::English),
262                _ => Err(Error::ConfigError(String::from(
263                    "'units' parameter should be \"imperial\" or \"metric\"",
264                ))),
265            },
266            Some(_) => Err(Error::ConfigError(String::from(
267                "'units' parameter should be a string",
268            ))),
269            None => Ok(wu::Unit::Metric),
270        }
271    }
272
273    // Processes an observation by sending each parameter to the
274    // correct device channel. It also does some sanity checks on the
275    // values.
276
277    async fn handle(
278        &mut self,
279        obs: &wu::Observation,
280        devices: &mut <Instance as driver::API>::DeviceSet,
281    ) {
282        // Retreive all the parameters whose units can change between
283        // English and Metric.
284
285        let (dewpt, htidx, prate, ptotal, press, temp, wndchl, wndgst, wndspd) =
286            if let wu::Unit::Metric = devices.units {
287                if let Some(params) = &obs.metric {
288                    (
289                        params.dewpt,
290                        params.heat_index,
291                        params.precip_rate,
292                        params.precip_total,
293                        params.pressure,
294                        params.temp,
295                        params.wind_chill,
296                        params.wind_gust,
297                        params.wind_speed,
298                    )
299                } else {
300                    panic!("weather data didn't return any metric data")
301                }
302            } else if let Some(params) = &obs.imperial {
303                (
304                    params.dewpt,
305                    params.heat_index,
306                    params.precip_rate,
307                    params.precip_total,
308                    params.pressure,
309                    params.temp,
310                    params.wind_chill,
311                    params.wind_gust,
312                    params.wind_speed,
313                )
314            } else {
315                panic!("weather data didn't return any imperial data")
316            };
317
318        if let Some(dewpt) = dewpt {
319            if (0.0..=200.0).contains(&dewpt) {
320                devices.d_dewpt.report_update(dewpt).await
321            } else {
322                warn!("ignoring bad dew point value: {:.1}", dewpt)
323            }
324        }
325
326        if let Some(htidx) = htidx {
327            if (0.0..=200.0).contains(&htidx) {
328                devices.d_htidx.report_update(htidx).await
329            } else {
330                warn!("ignoring bad heat index value: {:.1}", htidx)
331            }
332        }
333
334        if let (Some(prate), Some(ptotal)) = (prate, ptotal) {
335            let (nrate, ntotal, nlast) =
336                self.precip.update(prate, ptotal, SystemTime::now());
337
338            devices.d_prec_rate.report_update(nrate).await;
339            devices.d_prec_total.report_update(ntotal).await;
340
341            if let Some(last) = nlast {
342                devices.d_prec_last_total.report_update(last).await;
343            }
344        } else {
345            warn!("need both precip fields to update precip calculations")
346        }
347
348        if let Some(press) = press {
349            devices.d_pressure.report_update(press).await
350        }
351
352        if let Some(temp) = temp {
353            devices.d_temp.report_update(temp).await
354        }
355
356        if let Some(wndchl) = wndchl {
357            devices.d_wndchl.report_update(wndchl).await
358        }
359
360        if let Some(wndgst) = wndgst {
361            devices.d_wndgst.report_update(wndgst).await
362        }
363
364        if let Some(wndspd) = wndspd {
365            devices.d_wndspd.report_update(wndspd).await
366        }
367
368        // If solar radiation readings are provided, report them.
369
370        if let Some(sol_rad) = obs.solar_radiation {
371            // On Earth, solar radiation varies between 0 and 1361
372            // W/m^2. (https://en.wikipedia.org/wiki/Solar_irradiance)
373            // We'll round up to 1400 so weather stations with
374            // slightly inaccurate sensors won't be ignored.
375
376            if (0.0..=1400.0).contains(&sol_rad) {
377                devices.d_solrad.report_update(sol_rad).await
378            } else {
379                warn!("ignoring bad solar radiation value: {:.1}", sol_rad)
380            }
381        }
382
383        // If humidity readings are provided, report them.
384
385        if let Some(humidity) = obs.humidity {
386            // Technically the humidity could get to 0%, but it's
387            // doubtful there's a place on earth that gets that low.
388
389            if (0.0..=100.0).contains(&humidity) {
390                devices.d_humidity.report_update(humidity).await
391            } else {
392                warn!("ignoring bad humidity value: {:.1}", humidity)
393            }
394        }
395
396        // If UV readings are provided, report them.
397
398        if let Some(uv) = obs.uv {
399            devices.d_uv.report_update(uv).await
400        }
401
402        // If wind direction readings are provided, report them.
403
404        if let Some(winddir) = obs.winddir {
405            // Make sure the reading is in range.
406
407            if (0.0..=360.0).contains(&winddir) {
408                devices.d_wnddir.report_update(winddir).await
409            } else {
410                warn!("ignoring bad wind direction value: {:.1}", winddir)
411            }
412        }
413    }
414}
415
416impl driver::API for Instance {
417    type DeviceSet = Devices;
418
419    fn register_devices(
420        core: driver::RequestChan,
421        cfg: &DriverConfig,
422        max_history: Option<usize>,
423    ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
424        let dewpoint_name = "dewpoint".parse::<device::Base>().unwrap();
425        let heat_index_name = "heat-index".parse::<device::Base>().unwrap();
426        let humidity_name = "humidity".parse::<device::Base>().unwrap();
427        let precip_rate_name = "precip-rate".parse::<device::Base>().unwrap();
428        let precip_total_name = "precip-total".parse::<device::Base>().unwrap();
429        let precip_last_total_name =
430            "precip-last-total".parse::<device::Base>().unwrap();
431        let pressure_name = "pressure".parse::<device::Base>().unwrap();
432        let solar_rad_name = "solar-rad".parse::<device::Base>().unwrap();
433        let state_name = "state".parse::<device::Base>().unwrap();
434        let temperature_name = "temperature".parse::<device::Base>().unwrap();
435        let uv_name = "uv".parse::<device::Base>().unwrap();
436        let wind_chill_name = "wind-chill".parse::<device::Base>().unwrap();
437        let wind_dir_name = "wind-dir".parse::<device::Base>().unwrap();
438        let wind_gust_name = "wind-gust".parse::<device::Base>().unwrap();
439        let wind_speed_name = "wind-speed".parse::<device::Base>().unwrap();
440
441        let station = Instance::get_cfg_station(cfg);
442        let units = Instance::get_cfg_units(cfg);
443
444        Box::pin(async move {
445            let station = station?;
446            let units = units?;
447
448            let temp_unit = Some(if let wu::Unit::English = units {
449                "°F"
450            } else {
451                "°C"
452            });
453            let speed_unit = Some(if let wu::Unit::English = units {
454                "mph"
455            } else {
456                "km/h"
457            });
458
459            let d_dewpt = core
460                .add_ro_device(dewpoint_name, temp_unit, max_history)
461                .await?;
462            let d_htidx = core
463                .add_ro_device(heat_index_name, temp_unit, max_history)
464                .await?;
465            let d_humidity = core
466                .add_ro_device(humidity_name, Some("%"), max_history)
467                .await?;
468            let d_prec_rate = core
469                .add_ro_device(
470                    precip_rate_name,
471                    Some(if let wu::Unit::English = units {
472                        "in/hr"
473                    } else {
474                        "mm/hr"
475                    }),
476                    max_history,
477                )
478                .await?;
479
480            let d_prec_total = core
481                .add_ro_device(
482                    precip_total_name,
483                    Some(if let wu::Unit::English = units {
484                        "in"
485                    } else {
486                        "mm"
487                    }),
488                    max_history,
489                )
490                .await?;
491
492            let d_prec_last_total = core
493                .add_ro_device(
494                    precip_last_total_name,
495                    Some(if let wu::Unit::English = units {
496                        "in"
497                    } else {
498                        "mm"
499                    }),
500                    max_history,
501                )
502                .await?;
503
504            let d_pressure = core
505                .add_ro_device(
506                    pressure_name,
507                    Some(if let wu::Unit::English = units {
508                        "inHg"
509                    } else {
510                        "hPa"
511                    }),
512                    max_history,
513                )
514                .await?;
515
516            let d_solrad = core
517                .add_ro_device(solar_rad_name, Some("W/m²"), max_history)
518                .await?;
519            let d_state =
520                core.add_ro_device(state_name, None, max_history).await?;
521            let d_temp = core
522                .add_ro_device(temperature_name, temp_unit, max_history)
523                .await?;
524            let d_uv = core.add_ro_device(uv_name, None, max_history).await?;
525            let d_wndchl = core
526                .add_ro_device(wind_chill_name, temp_unit, max_history)
527                .await?;
528            let d_wnddir = core
529                .add_ro_device(wind_dir_name, Some("°"), max_history)
530                .await?;
531            let d_wndgst = core
532                .add_ro_device(wind_gust_name, speed_unit, max_history)
533                .await?;
534            let d_wndspd = core
535                .add_ro_device(wind_speed_name, speed_unit, max_history)
536                .await?;
537
538            Ok(Devices {
539                station,
540                units,
541                d_dewpt,
542                d_htidx,
543                d_humidity,
544                d_prec_rate,
545                d_prec_total,
546                d_prec_last_total,
547                d_pressure,
548                d_solrad,
549                d_state,
550                d_temp,
551                d_uv,
552                d_wndchl,
553                d_wnddir,
554                d_wndgst,
555                d_wndspd,
556            })
557        })
558    }
559
560    fn create_instance(
561        cfg: &DriverConfig,
562    ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
563        debug!("reading config parameters");
564
565        let interval = Instance::get_cfg_interval(cfg);
566        let key = Instance::get_cfg_key(cfg);
567
568        Span::current().record("cfg", Instance::get_cfg_station(cfg).unwrap());
569
570        let fut = async move {
571            match wu::create_client(Duration::from_secs(5)) {
572                Ok(mut con) => {
573                    // Validate the driver parameters.
574
575                    let (api_key, interval) =
576                        Instance::get_cfg_key_and_interval(
577                            &mut con, key?, interval?,
578                        )
579                        .await?;
580
581                    // Assemble and return the state of the driver.
582
583                    debug!("instance successfully created");
584
585                    Ok(Box::new(Instance {
586                        con,
587                        api_key,
588                        interval,
589                        precip: PrecipState::new(),
590                    }))
591                }
592                Err(e) => Err(Error::ConfigError(format!(
593                    "couldn't build client connection -- {}",
594                    &e
595                ))),
596            }
597        };
598
599        Box::pin(fut)
600    }
601
602    fn run<'a>(
603        &'a mut self,
604        devices: Arc<Mutex<Self::DeviceSet>>,
605    ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
606        let fut = async move {
607            let mut devices = devices.lock().await;
608
609            Span::current().record("cfg", devices.station.as_str());
610
611            let mut timer = interval_at(Instant::now(), self.interval);
612
613            // Loop forever.
614
615            loop {
616                debug!("waiting for next poll time");
617
618                // Wait for the next sample time.
619
620                timer.tick().await;
621
622                debug!("fetching next observation");
623
624                let result = wu::fetch_observation(
625                    &self.con,
626                    &self.api_key,
627                    &devices.station,
628                    &devices.units,
629                )
630                .await;
631
632                match result {
633                    Ok(Some(response)) => {
634                        match wu::ObservationResponse::try_from(response) {
635                            Ok(resp) => {
636                                if let Some(obs) = resp.observations {
637                                    if !obs.is_empty() {
638                                        // The API we're using should
639                                        // only return 1 set of
640                                        // observations. If it, for
641                                        // some reason, changes and
642                                        // returns more, log it.
643
644                                        if obs.len() > 1 {
645                                            warn!("ignoring {} extra weather observations", obs.len() - 1);
646                                        }
647                                        devices
648                                            .d_state
649                                            .report_update(true)
650                                            .await;
651                                        self.handle(&obs[0], &mut devices)
652                                            .await;
653                                        continue;
654                                    }
655                                }
656                                warn!("no weather data received")
657                            }
658
659                            Err(e) => {
660                                devices.d_state.report_update(false).await;
661                                panic!("error response from Weather Underground -- {:?}", &e)
662                            }
663                        }
664                    }
665
666                    Ok(None) => {
667                        devices.d_state.report_update(false).await;
668                        panic!("no response from Weather Underground")
669                    }
670
671                    Err(e) => {
672                        devices.d_state.report_update(false).await;
673                        panic!(
674                            "error accessing Weather Underground -- {:?}",
675                            &e
676                        )
677                    }
678                }
679            }
680        };
681
682        Box::pin(fut)
683    }
684}
685
686#[cfg(test)]
687mod tests {
688    use super::PrecipState;
689    use std::time::{Duration, SystemTime, UNIX_EPOCH};
690
691    fn mk_time(secs: u64) -> SystemTime {
692        UNIX_EPOCH.checked_add(Duration::from_secs(secs)).unwrap()
693    }
694
695    #[test]
696    fn test_precip() {
697        // This tests for normal rainfall. It also makes sure the
698        // totals get adjusted after the long enough delay of no rain.
699
700        {
701            let mut s = PrecipState::new();
702
703            // Should start as `NoRain` and, as long as we have no
704            // precip, it should stay that way.
705
706            assert_eq!(s.update(0.0, 0.0, mk_time(0)), (0.0, 0.0, None));
707            assert_eq!(s.update(0.0, 0.0, mk_time(600)), (0.0, 0.0, None));
708
709            // Even if the rainfall rate is non zero, we don't go into
710            // the rain state until the total is nonzero.
711
712            assert_eq!(s.update(0.1, 0.0, mk_time(1200)), (0.1, 0.0, None));
713
714            // With both inputs 0.0, we shouldn't trigger a "last
715            // rainfall" total.
716
717            assert_eq!(s.update(0.0, 0.0, mk_time(1800)), (0.0, 0.0, None));
718
719            // As rain occurs, we should track the total.
720
721            assert_eq!(s.update(0.1, 0.125, mk_time(2400)), (0.1, 0.125, None));
722            assert_eq!(s.update(0.05, 0.25, mk_time(3000)), (0.05, 0.25, None));
723            assert_eq!(s.update(0.7, 0.375, mk_time(3600)), (0.7, 0.375, None));
724
725            // Zero rate shouldn't reset by itself.
726
727            assert_eq!(s.update(0.0, 0.375, mk_time(4200)), (0.0, 0.375, None));
728
729            // Even if both are zeroed, we don't reset the count if
730            // the time from the last rain was less than our timeout.
731
732            assert_eq!(s.update(0.0, 0.0, mk_time(4800)), (0.0, 0.375, None));
733
734            // Now add more and then simulate 10 hours of nothing.
735
736            assert_eq!(s.update(0.1, 0.125, mk_time(5400)), (0.1, 0.5, None));
737            assert_eq!(s.update(0.0, 0.125, mk_time(6000)), (0.0, 0.5, None));
738            assert_eq!(s.update(0.0, 0.125, mk_time(41_399)), (0.0, 0.5, None));
739            assert_eq!(
740                s.update(0.0, 0.125, mk_time(41_400)),
741                (0.0, 0.0, Some(0.5))
742            );
743            assert_eq!(s.update(0.0, 0.125, mk_time(42_001)), (0.0, 0.0, None));
744
745            // Now any new rainfall start new accumulation.
746
747            assert_eq!(s.update(0.1, 0.125, mk_time(40_800)), (0.1, 0.0, None));
748            assert_eq!(
749                s.update(0.1, 0.25, mk_time(41_400)),
750                (0.1, 0.125, None)
751            );
752        }
753
754        // This tests for a possible weird occurrance at midnight.
755
756        {
757            let mut s = PrecipState::new();
758
759            // Reproduce the previous rain, but we'll add a midnight
760            // crossing (which resets the total).
761
762            assert_eq!(s.update(0.1, 0.125, mk_time(0)), (0.1, 0.125, None));
763            assert_eq!(s.update(0.05, 0.25, mk_time(600)), (0.05, 0.25, None));
764            assert_eq!(s.update(0.7, 0.375, mk_time(1200)), (0.7, 0.375, None));
765            assert_eq!(s.update(0.3, 0.125, mk_time(1800)), (0.3, 0.5, None));
766            assert_eq!(s.update(0.3, 0.25, mk_time(2400)), (0.3, 0.625, None));
767
768            // Let's assume that the total got reset just before
769            // reporting it to Weather Underground. In that case, the
770            // total would be zero but the rate would be non-zero.
771
772            assert_eq!(s.update(0.1, 0.0, mk_time(3000)), (0.1, 0.625, None));
773            assert_eq!(s.update(0.0, 0.0, mk_time(3600)), (0.0, 0.625, None));
774            assert_eq!(s.update(0.3, 0.125, mk_time(4200)), (0.3, 0.75, None));
775        }
776    }
777}