#![allow(clippy::all, unused_imports, dead_code)]
use crate::{Actor, ActorBehavior, ClientBuilderExt, Message, Port};
use anyhow::{Error, Result};
use reflow_actor::{message::EncodableValue, ActorContext};
use reflow_actor_macro::actor;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::time::Duration;
const BASE_URL: &str = "https://dataservice.accuweather.com";
const ENV_KEY: &str = "ACCUWEATHER_API_KEY";
fn apply_auth(
config: &reflow_actor::ActorConfig,
mut builder: reqwest::RequestBuilder,
) -> Result<reqwest::RequestBuilder> {
let credential = config
.get_config_or_env(ENV_KEY)
.ok_or_else(|| anyhow::anyhow!("Missing env var: {}", ENV_KEY))?;
builder = builder.query(&[("apikey", &credential)]);
Ok(builder)
}
#[actor(
AccuweatherSearchLocationsActor,
inports::<100>(q),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_search_locations(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/locations/v1/cities/search".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("q") {
query_pairs.push(("q", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /locations/v1/cities/search failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadCurrentConditionsActor,
inports::<100>(locationKey, details),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_current_conditions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/currentconditions/v1/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /currentconditions/v1/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadLocationActor,
inports::<100>(locationKey, apikey, language, details),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_location(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/locations/v1/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /locations/v1/{{locationKey}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AccuweatherListCountriesActor,
inports::<100>(regionCode, apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_list_countries(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/locations/v1/countries/{regionCode}".to_string();
if let Some(val) = inputs.get("regionCode") {
endpoint = endpoint.replace("{{regionCode}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /locations/v1/countries/{{regionCode}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherListRegionsActor,
inports::<100>(apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_list_regions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let endpoint = "/locations/v1/regions".to_string();
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /locations/v1/regions failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadCurrentConditionsHistoricalActor,
inports::<100>(locationKey, apikey, language, details),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_current_conditions_historical(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/currentconditions/v1/{locationKey}/historical/24".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /currentconditions/v1/{{locationKey}}/historical/24 failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadForecastHourlyActor,
inports::<100>(locationKey, apikey, language, details, metric),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_forecast_hourly(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/forecasts/v1/hourly/1hour/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if let Some(val) = inputs.get("metric") {
query_pairs.push(("metric", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /forecasts/v1/hourly/1hour/{{locationKey}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadForecastDailyActor,
inports::<100>(locationKey, apikey, language, details, metric),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_forecast_daily(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/forecasts/v1/daily/1day/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if let Some(val) = inputs.get("metric") {
query_pairs.push(("metric", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /forecasts/v1/daily/1day/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadMinutecastActor,
inports::<100>(locationKey, apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_minutecast(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/forecasts/v1/minute/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /forecasts/v1/minute/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadAlertActor,
inports::<100>(locationKey, apikey, language, details),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_alert(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/alerts/v1/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(format!("GET /alerts/v1/{{locationKey}} failed: {}", e).into()),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadLifestyleIndexActor,
inports::<100>(locationKey, indexId, apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_lifestyle_index(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/indices/v1/daily/1day/{locationKey}/{indexId}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("indexId") {
endpoint = endpoint.replace("{{indexId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /indices/v1/daily/1day/{{locationKey}}/{{indexId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadLifestyleIndicesActor,
inports::<100>(locationKey, apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_lifestyle_indices(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/indices/v1/daily/5day/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /indices/v1/daily/5day/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadTropicalStormActor,
inports::<100>(year, basinId, apikey, language),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_tropical_storm(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/tropical/v1/gov/storms/{year}/{basinId}".to_string();
if let Some(val) = inputs.get("year") {
endpoint = endpoint.replace("{{year}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("basinId") {
endpoint = endpoint.replace("{{basinId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!(
"GET /tropical/v1/gov/storms/{{year}}/{{basinId}} failed: {}",
e
)
.into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadTropicalStormPositionsActor,
inports::<100>(year, basinId, stormId, apikey, language, details),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_tropical_storm_positions(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/tropical/v1/gov/storms/{year}/{basinId}/{stormId}/positions".to_string();
if let Some(val) = inputs.get("year") {
endpoint = endpoint.replace("{{year}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("basinId") {
endpoint = endpoint.replace("{{basinId}}", &super::message_to_str(val));
}
if let Some(val) = inputs.get("stormId") {
endpoint = endpoint.replace("{{stormId}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("details") {
query_pairs.push(("details", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert("error".to_string(), Message::Error(format!("GET /tropical/v1/gov/storms/{{year}}/{{basinId}}/{{stormId}}/positions failed: {}", e).into()));
}
}
Ok(output)
}
#[actor(
AccuweatherReadAlarmActor,
inports::<100>(locationKey, apikey, language, metric),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_alarm(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/alarms/v1/daily/1day/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("language") {
query_pairs.push(("language", super::message_to_str(val)));
}
if let Some(val) = inputs.get("metric") {
query_pairs.push(("metric", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /alarms/v1/daily/1day/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}
#[actor(
AccuweatherReadImageryRadarActor,
inports::<100>(locationKey, apikey, resolution),
outports::<50>(response, error),
state(MemoryState)
)]
pub async fn accuweather_read_imagery_radar(
context: ActorContext,
) -> Result<HashMap<String, Message>, Error> {
let inputs = context.get_payload();
let actor_config = context.get_config();
let mut endpoint = "/imagery/v1/maps/radsat/{locationKey}".to_string();
if let Some(val) = inputs.get("locationKey") {
endpoint = endpoint.replace("{{locationKey}}", &super::message_to_str(val));
}
let url = format!("{}{}", BASE_URL.trim_end_matches('/'), endpoint);
let client = reqwest::Client::builder()
.timeout_compat(Duration::from_secs(30))
.build()?;
let mut builder = client.get(&url);
builder = builder.header("Content-Type", "application/json");
builder = apply_auth(actor_config, builder)?;
let mut query_pairs: Vec<(&str, String)> = Vec::new();
if let Some(val) = inputs.get("apikey") {
query_pairs.push(("apikey", super::message_to_str(val)));
}
if let Some(val) = inputs.get("resolution") {
query_pairs.push(("resolution", super::message_to_str(val)));
}
if !query_pairs.is_empty() {
builder = builder.query(&query_pairs);
}
let mut output = HashMap::new();
match builder.send().await {
Ok(resp) => {
let status = resp.status().as_u16();
let headers: HashMap<String, String> = resp
.headers()
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|val| (k.to_string(), val.to_string())))
.collect();
let body_text = resp.text().await.unwrap_or_default();
let body_value: Value =
serde_json::from_str(&body_text).unwrap_or(Value::String(body_text));
output.insert(
"response".to_string(),
Message::object(EncodableValue::from(json!({
"status": status,
"headers": headers,
"body": body_value,
}))),
);
}
Err(e) => {
output.insert(
"error".to_string(),
Message::Error(
format!("GET /imagery/v1/maps/radsat/{{locationKey}} failed: {}", e).into(),
),
);
}
}
Ok(output)
}