#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://api.openweathermap.org/data/2.5";
const ENV_KEY: &str = "OPENWEATHERMAP_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.query(&[("appid", &credential)]);
Ok(builder)
}
#[actor(
OpenweathermapReadWeatherActor,
inports::<100>(q, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_weather(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/weather".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /weather failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadForecastActor,
inports::<100>(q, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_forecast(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/forecast".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /forecast failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadCurrentWeatherActor,
inports::<100>(q, id, lat, lon, zip, units, lang, mode, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_current_weather(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/weather".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("zip") {
query_pairs.push(("zip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("mode") {
query_pairs.push(("mode", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /weather failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadForecast5dayActor,
inports::<100>(q, id, lat, lon, zip, cnt, units, lang, mode, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_forecast_5day(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/forecast".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("zip") {
query_pairs.push(("zip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cnt") {
query_pairs.push(("cnt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("mode") {
query_pairs.push(("mode", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /forecast failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadOneCallActor,
inports::<100>(lat, lon, exclude, units, lang, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_one_call(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/onecall".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("exclude") {
query_pairs.push(("exclude", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /onecall failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadHistoricalWeatherActor,
inports::<100>(lat, lon, dt, units, lang, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_historical_weather(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/onecall/timemachine".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("dt") {
query_pairs.push(("dt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /onecall/timemachine failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadDailyForecast16dayActor,
inports::<100>(q, id, lat, lon, zip, cnt, units, lang, mode, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_daily_forecast_16day(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/forecast/daily".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("zip") {
query_pairs.push(("zip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cnt") {
query_pairs.push(("cnt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("mode") {
query_pairs.push(("mode", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /forecast/daily failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadHourlyForecast4dayActor,
inports::<100>(q, id, lat, lon, cnt, units, lang, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_hourly_forecast_4day(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/forecast/hourly".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cnt") {
query_pairs.push(("cnt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /forecast/hourly failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadClimaticForecast30dayActor,
inports::<100>(q, id, lat, lon, cnt, units, lang, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_climatic_forecast_30day(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/forecast/climate".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cnt") {
query_pairs.push(("cnt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /forecast/climate failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadAirPollutionCurrentActor,
inports::<100>(lat, lon, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_air_pollution_current(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/air_pollution".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /air_pollution failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadAirPollutionForecastActor,
inports::<100>(lat, lon, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_air_pollution_forecast(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/air_pollution/forecast".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /air_pollution/forecast failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadAirPollutionHistoricalActor,
inports::<100>(lat, lon, start, end, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_air_pollution_historical(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/air_pollution/history".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start") {
query_pairs.push(("start", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end") {
query_pairs.push(("end", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /air_pollution/history failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadGeocodingDirectActor,
inports::<100>(q, limit, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_geocoding_direct(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/geo/1.0/direct".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /geo/1.0/direct failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadGeocodingReverseActor,
inports::<100>(lat, lon, limit, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_geocoding_reverse(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/geo/1.0/reverse".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("limit") {
query_pairs.push(("limit", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /geo/1.0/reverse failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadGeocodingZipActor,
inports::<100>(zip, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_geocoding_zip(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/geo/1.0/zip".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("zip") {
query_pairs.push(("zip", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /geo/1.0/zip failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadWeatherMapTileActor,
inports::<100>(layer, z, x, y, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_weather_map_tile(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/map/{layer}/{z}/{x}/{y}".to_string();
if let Some(val) = inputs.get("layer") {
endpoint = endpoint.replace("{{layer}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("z") {
endpoint = endpoint.replace("{{z}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("x") {
endpoint = endpoint.replace("{{x}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("y") {
endpoint = endpoint.replace("{{y}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /map/{{layer}}/{{z}}/{{x}}/{{y}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadStatisticalWeatherActor,
inports::<100>(q, id, lat, lon, month, day, type_, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_statistical_weather(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/stat/1.0/history".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("month") {
query_pairs.push(("month", super::message_to_str(val)));
}
if let Some(val) = inputs.get("day") {
query_pairs.push(("day", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /stat/1.0/history failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadHistoryWeatherActor,
inports::<100>(q, id, lat, lon, start, end, cnt, type_, units, lang, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_history_weather(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/history/city".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if let Some(val) = inputs.get("id") {
query_pairs.push(("id", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("start") {
query_pairs.push(("start", super::message_to_str(val)));
}
if let Some(val) = inputs.get("end") {
query_pairs.push(("end", super::message_to_str(val)));
}
if let Some(val) = inputs.get("cnt") {
query_pairs.push(("cnt", super::message_to_str(val)));
}
if let Some(val) = inputs.get("type_") {
query_pairs.push(("type", super::message_to_str(val)));
}
if let Some(val) = inputs.get("units") {
query_pairs.push(("units", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lang") {
query_pairs.push(("lang", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /history/city failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
OpenweathermapReadSolarIrradianceActor,
inports::<100>(lat, lon, date, tz, step, appid),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn openweathermap_read_solar_irradiance(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/energy/1.0/solar/interval".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("lat") {
query_pairs.push(("lat", super::message_to_str(val)));
}
if let Some(val) = inputs.get("lon") {
query_pairs.push(("lon", super::message_to_str(val)));
}
if let Some(val) = inputs.get("date") {
query_pairs.push(("date", super::message_to_str(val)));
}
if let Some(val) = inputs.get("tz") {
query_pairs.push(("tz", super::message_to_str(val)));
}
if let Some(val) = inputs.get("step") {
query_pairs.push(("step", super::message_to_str(val)));
}
if let Some(val) = inputs.get("appid") {
query_pairs.push(("appid", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /energy/1.0/solar/interval failed: {}", e).into()),
);
}
}
Ok(output)
}