weatherlink_tools/
http_client.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use anyhow::{anyhow, bail, Context, Result};
5use tokio::sync::{broadcast, Mutex};
6use tokio::{task, time};
7use tracing::{debug, warn};
8
9use crate::{rate_limit, send};
10
11pub fn init_http_recv(
12 weatherlink_http_poll_secs: u32,
13 weatherlink_host: &str,
14 tx: broadcast::Sender<send::Conditions>,
15 http_client: Arc<Mutex<WeatherlinkClient>>,
16) -> task::JoinHandle<()> {
17 let weatherlink_host_cpy = weatherlink_host.to_string();
18 task::spawn(async move {
19 loop {
20 let current_conditions;
21 {
22 current_conditions = http_client
23 .lock()
24 .await
25 .get_current_conditions(&weatherlink_host_cpy)
26 .await;
27 }
28 match current_conditions {
29 Ok(current_conditions) => {
30 if let Err(e) = send::parse_and_send(¤t_conditions, &tx).await {
31 warn!("Failed to send polled conditions: {}", e);
32 }
33 }
34 Err(e) => {
35 warn!("Failed to fetch current conditions: {}", e);
36 }
37 }
38 time::sleep(Duration::from_secs(weatherlink_http_poll_secs as u64)).await;
39 }
40 })
41}
42
43pub struct WeatherlinkClient {
44 timeout: Duration,
45 rate_limit: rate_limit::RateLimit,
46}
47
48impl WeatherlinkClient {
49 pub fn new(timeout: Duration, wait_between_queries: Duration) -> Result<WeatherlinkClient> {
50 Ok(WeatherlinkClient {
51 timeout,
52 rate_limit: rate_limit::RateLimit::new(wait_between_queries),
53 })
54 }
55
56 pub async fn start_udp_broadcasts(&mut self, weatherlink_host: &str) -> Result<()> {
58 let url = format!("http://{}/v1/real_time?duration=86400", weatherlink_host);
59 let response = self.get(url.clone()).await.map_err(|err| anyhow!(err))?;
60 if !response.status().is_success() {
61 bail!(
62 "Failed to get '{}': {} {:?}",
63 url,
64 response.status(),
65 response.status()
66 );
67 }
68 extract_json_data(response)
69 .await
70 .map(|data| debug!("Broadcast started: {}", data))
71 }
72
73 pub async fn get_current_conditions(&mut self, weatherlink_host: &str) -> Result<String> {
76 let url = format!("http://{}/v1/current_conditions", weatherlink_host);
77 let response = self.get(url.clone()).await.map_err(|err| anyhow!(err))?;
78 if !response.status().is_success() {
79 bail!(
80 "Failed to get '{}': {} {:?}",
81 url,
82 response.status(),
83 response.status()
84 );
85 }
86 extract_json_data(response).await
87 }
88
89 pub fn record_request(&mut self) {
92 self.rate_limit.record();
93 }
94
95 async fn get(&mut self, url: String) -> Result<reqwest::Response> {
96 debug!("get {}...", url);
97 self.rate_limit.check().await;
98
99 let response = time::timeout(self.timeout, reqwest::get(&url))
100 .await
101 .with_context(|| format!("Timed out on HTTP query for {}", url))?
102 .with_context(|| format!("Failed to query url {}", url))?;
103 debug!("get {} result: {:?}", url, response);
105
106 self.rate_limit.record();
107 Ok(response)
108 }
109}
110
111async fn extract_json_data(response: reqwest::Response) -> Result<String> {
114 let response_body = response.bytes().await.map_err(|err| anyhow!(err))?;
115 let json: serde_json::Value = serde_json::from_reader(&*response_body)?;
116 let json_error = &json["error"];
117 if !json_error.is_null() {
118 bail!("WeatherLink http response contains error: {}", json_error);
119 }
120 serde_json::to_string(&json["data"]).map_err(|err| anyhow!(err))
122}