weatherlink_tools/
http_client.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::{anyhow, bail, Context, Result};
5use tokio::sync::{broadcast, Mutex};
6use tokio::{task, time};
7use tracing::{debug, warn};
8
9use crate::{rate_limit, send};
10
11pub fn init_http_recv(
12    weatherlink_http_poll_secs: u32,
13    weatherlink_host: &str,
14    tx: broadcast::Sender<send::Conditions>,
15    http_client: Arc<Mutex<WeatherlinkClient>>,
16) -> task::JoinHandle<()> {
17    let weatherlink_host_cpy = weatherlink_host.to_string();
18    task::spawn(async move {
19        loop {
20            let current_conditions;
21            {
22                current_conditions = http_client
23                    .lock()
24                    .await
25                    .get_current_conditions(&weatherlink_host_cpy)
26                    .await;
27            }
28            match current_conditions {
29                Ok(current_conditions) => {
30                    if let Err(e) = send::parse_and_send(&current_conditions, &tx).await {
31                        warn!("Failed to send polled conditions: {}", e);
32                    }
33                }
34                Err(e) => {
35                    warn!("Failed to fetch current conditions: {}", e);
36                }
37            }
38            time::sleep(Duration::from_secs(weatherlink_http_poll_secs as u64)).await;
39        }
40    })
41}
42
43pub struct WeatherlinkClient {
44    timeout: Duration,
45    rate_limit: rate_limit::RateLimit,
46}
47
48impl WeatherlinkClient {
49    pub fn new(timeout: Duration, wait_between_queries: Duration) -> Result<WeatherlinkClient> {
50        Ok(WeatherlinkClient {
51            timeout,
52            rate_limit: rate_limit::RateLimit::new(wait_between_queries),
53        })
54    }
55
56    /// Requests that the weatherlink start sending out broadcasts for the next 24 hours.
57    pub async fn start_udp_broadcasts(&mut self, weatherlink_host: &str) -> Result<()> {
58        let url = format!("http://{}/v1/real_time?duration=86400", weatherlink_host);
59        let response = self.get(url.clone()).await.map_err(|err| anyhow!(err))?;
60        if !response.status().is_success() {
61            bail!(
62                "Failed to get '{}': {} {:?}",
63                url,
64                response.status(),
65                response.status()
66            );
67        }
68        extract_json_data(response)
69            .await
70            .map(|data| debug!("Broadcast started: {}", data))
71    }
72
73    /// Queries the current_conditions endpoint, returns a string containing the JSON conditions section.
74    /// Returns an error if the connection fails or times out, or if the response contains an error.
75    pub async fn get_current_conditions(&mut self, weatherlink_host: &str) -> Result<String> {
76        let url = format!("http://{}/v1/current_conditions", weatherlink_host);
77        let response = self.get(url.clone()).await.map_err(|err| anyhow!(err))?;
78        if !response.status().is_success() {
79            bail!(
80                "Failed to get '{}': {} {:?}",
81                url,
82                response.status(),
83                response.status()
84            );
85        }
86        extract_json_data(response).await
87    }
88
89    /// Records that a (non-http) request against the weatherlink has occurred elsewhere.
90    /// This ensures that upcoming http requests don't happen too soon.
91    pub fn record_request(&mut self) {
92        self.rate_limit.record();
93    }
94
95    async fn get(&mut self, url: String) -> Result<reqwest::Response> {
96        debug!("get {}...", url);
97        self.rate_limit.check().await;
98
99        let response = time::timeout(self.timeout, reqwest::get(&url))
100            .await
101            .with_context(|| format!("Timed out on HTTP query for {}", url))?
102            .with_context(|| format!("Failed to query url {}", url))?;
103        // Note: unable to log body here - body can only be read once
104        debug!("get {} result: {:?}", url, response);
105
106        self.rate_limit.record();
107        Ok(response)
108    }
109}
110
111/// Typical response: {"data": {...}, "error": null}
112/// We want to return string like: "{...}", or return the error if any
113async fn extract_json_data(response: reqwest::Response) -> Result<String> {
114    let response_body = response.bytes().await.map_err(|err| anyhow!(err))?;
115    let json: serde_json::Value = serde_json::from_reader(&*response_body)?;
116    let json_error = &json["error"];
117    if !json_error.is_null() {
118        bail!("WeatherLink http response contains error: {}", json_error);
119    }
120    // Serialize data back to a json string
121    serde_json::to_string(&json["data"]).map_err(|err| anyhow!(err))
122}