use serde::{Deserialize, Serialize};
use std::fs::File;
use reqwest::Response;
use url::{ParseError, Url};
use crate::errors::ConnectorError;
use std::path::Path;
use std::fs;
use polars::prelude::*;
use crate::location::Point;
use std::fmt;
const BASE_URL: &str = "https://api.meteomatics.com";
#[derive(Debug)]
pub struct TimeSeries{
pub start: chrono::DateTime<chrono::Utc>,
pub end: chrono::DateTime<chrono::Utc>,
pub timedelta: Option<chrono::Duration>
}
impl fmt::Display for TimeSeries {
fn fmt(&self, f:&mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}--{}:{}",
&self.start.to_rfc3339(),
&self.end.to_rfc3339(),
&self.timedelta.unwrap()
)
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct UStatsResponse{
pub message: String,
#[serde(rename(serialize = "user statistics", deserialize = "user statistics"))]
pub stats: UserStats,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct UserStats{
pub username: String,
#[serde(rename(serialize = "requests total", deserialize = "requests total"))]
pub total: Limit,
#[serde(rename(serialize = "requests since last UTC midnight", deserialize = "requests since last UTC midnight"))]
pub since_midnight: Limit,
#[serde(rename(serialize = "requests since HH:00:00", deserialize = "requests since HH:00:00"))]
pub since_0: Limit,
#[serde(rename(serialize = "requests in the last 60 seconds", deserialize = "requests in the last 60 seconds"))]
pub since_60s: Limit,
#[serde(rename(serialize = "requests in parallel", deserialize = "requests in parallel"))]
pub parallel: Limit,
#[serde(rename(serialize = "historic request option", deserialize = "historic request option"))]
pub hist: String,
#[serde(rename(serialize = "area request option", deserialize = "area request option"))]
pub area: bool,
#[serde(rename(serialize = "model set", deserialize = "model set"))]
pub models: Vec<String>,
#[serde(rename(serialize = "error message", deserialize = "error message"))]
pub error: String,
#[serde(rename(serialize = "contact emails", deserialize = "contact emails"))]
pub contact: Vec<String>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Limit{
pub used: u32,
#[serde(rename(serialize = "soft limit", deserialize = "soft limit"))]
pub soft_lim: u32,
#[serde(rename(serialize = "hard limit", deserialize = "hard limit"))]
pub hard_lim: u32
}
pub async fn extract_user_statistics(response: Response) -> std::result::Result<UStatsResponse, ConnectorError> {
let json: UStatsResponse = response.json::<UStatsResponse>().await?;
Ok(json)
}
pub async fn write_file(response: Response, file_name: &String) -> std::result::Result<(), ConnectorError> {
let body = response.bytes().await?;
let mut content = std::io::Cursor::new(body);
let mut file = File::create(file_name)?;
std::io::copy(&mut content, &mut file)?;
Ok(())
}
pub async fn create_path(file_name: &String) -> std::result::Result<(), ConnectorError> {
let dir: &Path = Path::new(file_name).parent().unwrap();
let b: bool = dir.is_dir();
if !b {
match fs::create_dir_all(dir) {
Ok(_) => Ok(()),
Err(_) => Err(ConnectorError::FileIOError)
}
} else {
Ok(())
}
}
pub async fn df_add_latlon(df_in: polars::frame::DataFrame, point: &Point) ->
std::result::Result<polars::frame::DataFrame, polars::error::PolarsError> {
use polars::prelude::*;
let n = df_in.height();
let lat = vec![point.lat; n];
let lon = vec![point.lon; n];
let df_tmp = df!("lat" => &lat, "lon" => &lon)?;
let df_out: DataFrame = df_tmp.hstack(df_in.get_columns())?;
Ok(df_out)
}
pub async fn df_add_postal(df_in: polars::frame::DataFrame, postal: &str) ->
std::result::Result<polars::frame::DataFrame, polars::error::PolarsError> {
let n = df_in.height();
let col_zipcode = vec![postal; n];
let df_tmp = df!("station_id" => col_zipcode)?;
let df_out: DataFrame = df_tmp.hstack(df_in.get_columns())?;
Ok(df_out)
}
pub async fn parse_response_to_df(
response: Response,
) -> std::result::Result<polars::frame::DataFrame, polars::error::PolarsError> {
let body = response.text().await.unwrap();
let file = std::io::Cursor::new(&body);
use polars::prelude::*;
let dataframe = polars::io::csv::CsvReader::new(file)
.infer_schema(Some(100))
.with_delimiter(b';')
.has_header(true)
.with_parse_dates(false)
.with_ignore_parser_errors(false)
.finish()?;
Ok(dataframe)
}
pub async fn parse_grid_response_to_df(
response: Response,
) -> std::result::Result<polars::frame::DataFrame, polars::error::PolarsError> {
let body = response.text().await.unwrap();
let file = std::io::Cursor::new(&body);
use polars::prelude::*;
let dataframe = polars::io::csv::CsvReader::new(file)
.infer_schema(Some(100))
.with_delimiter(b';')
.has_header(true)
.with_skip_rows(2)
.with_parse_dates(false)
.with_ignore_parser_errors(false)
.finish()?;
Ok(dataframe)
}
pub async fn build_ts_query_specs(
time_series: &TimeSeries,
parameters: &[String],
coords_str: &str,
optionals: &Option<Vec<String>>,
format: &str,
) -> String {
let query_specs = format!(
"{}/{}/{}/{}",
time_series,
parameters.join(","),
coords_str,
format
);
let query_specs = match optionals {
None => query_specs,
Some(_) => {
format!(
"{}?{}",
query_specs,
optionals.as_ref().unwrap().join("&")
)
}
};
query_specs
}
pub async fn build_grid_query_specs(
timestamp: &chrono::DateTime<chrono::Utc>,
parameter: &String,
coords_str: &str,
optionals: &Option<Vec<String>>,
format: &str,
) -> String {
let query_specs = format!(
"{}/{}/{}/{}",
timestamp.to_rfc3339(),
parameter,
coords_str,
format
);
let query_specs = match optionals {
None => query_specs,
Some(_) => {
format!(
"{}?{}",
query_specs,
optionals.as_ref().unwrap().join("&")
)
}
};
query_specs
}
pub async fn build_grid_ts_query_specs(
time_series: &TimeSeries,
parameter: &String,
coords_str: &str,
format: &str,
optionals: &Option<Vec<String>>,
) -> String {
let query_specs = format!(
"{}/{}/{}/{}",
time_series,
parameter,
coords_str,
format,
);
let query_specs = match optionals {
None => query_specs,
Some(_) => {
format!(
"{}?{}",
query_specs,
optionals.as_ref().unwrap().join("&")
)
}
};
query_specs
}
pub async fn build_grid_ts_lightning_query_specs(
time_series: &TimeSeries,
coords_str: &str
) -> String {
let query_specs = format!(
"get_lightning_list?time_range={}--{}&bounding_box={}&format=csv",
time_series.start.to_rfc3339(),
time_series.end.to_rfc3339(),
coords_str
);
query_specs
}
pub async fn build_route_query_specs(
dates: &str,
parameters: &str,
coords_str: &str
) -> String {
let query_specs = format!(
"{}/{}/{}/csv?route=true",
dates,
parameters,
coords_str
);
query_specs
}
pub async fn build_url(url_fragment: &str) -> std::result::Result<Url, ParseError> {
let base_url = Url::parse(BASE_URL).expect("Base URL is known to be valid");
let full_url = base_url.join(url_fragment)?;
Ok(full_url)
}
pub async fn points_to_str(coords: &[Point]) -> String {
coords.iter().map(|p| format!("{}", p)).collect::<Vec<String>>().join("+")
}
#[cfg(test)]
mod tests {
use chrono::prelude::*;
use chrono::Duration;
use crate::location::{Point, BBox};
use std::path::Path;
use std::fs;
use serde_json;
use crate::util::{UStatsResponse, TimeSeries};
#[tokio::test]
async fn check_path_creation_nonexistent() {
let file_name: String = String::from("tests/netcdfs/my_netcdf.nc");
crate::util::create_path(&file_name).await.unwrap();
let dir: &Path = Path::new(&file_name).parent().unwrap();
let check: bool = dir.is_dir();
assert_eq!(check, true);
fs::remove_dir_all(dir).unwrap();
}
#[tokio::test]
async fn check_locations_string() {
let p1: Point = Point { lat: 52.520551, lon: 13.461804};
let p2: Point = Point { lat: -52.520551, lon: 13.461804};
let coords: Vec<Point> = vec![p1, p2];
let coord_str = crate::util::points_to_str(&coords).await;
assert_eq!("52.520551,13.461804+-52.520551,13.461804", coord_str);
}
#[tokio::test]
async fn check_ts_query_specs_string() {
let start_date = Utc.ymd(2022, 5, 17).and_hms(12, 00, 00);
let time_series = TimeSeries{
start: start_date,
end: start_date + Duration::days(1),
timedelta: Option::from(Duration::hours(1))
};
let parameters: Vec<String> = vec![String::from("t_2m:C")];
let p1: Point = Point { lat: 52.520551, lon: 13.461804};
let coords: Vec<Point> = vec![p1];
let coord_str = crate::util::points_to_str(&coords).await;
let query_s = crate::util::build_ts_query_specs(
&time_series, ¶meters, &coord_str, &None, &String::from("csv")
).await;
assert_eq!(
"2022-05-17T12:00:00+00:00--2022-05-18T12:00:00+00:00:PT3600S/t_2m:C/52.520551,13.461804/csv",
query_s
);
let start_date = Utc.ymd(2022, 5, 17).and_hms_micro(12, 00, 00, 453_829);
let time_series = TimeSeries{
start: start_date,
end: start_date + Duration::days(1),
timedelta: Option::from(Duration::hours(1))
};
let query_ms = crate::util::build_ts_query_specs(
&time_series, ¶meters, &coord_str, &None, &String::from("csv")
).await;
assert_eq!(
"2022-05-17T12:00:00.453829+00:00--2022-05-18T12:00:00.453829+00:00:PT3600S/t_2m:C/52.520551,13.461804/csv",
query_ms
);
let start_date = Utc.ymd(2022, 5, 17).and_hms_nano(12, 00, 00, 453_829_123);
let time_series = TimeSeries{
start: start_date,
end: start_date + Duration::days(1),
timedelta: Option::from(Duration::hours(1))
};
let query_ns = crate::util::build_ts_query_specs(
&time_series, ¶meters, &coord_str, &None, &String::from("csv")
).await;
assert_eq!(
"2022-05-17T12:00:00.453829123+00:00--2022-05-18T12:00:00.453829123+00:00:PT3600S/t_2m:C/52.520551,13.461804/csv",
query_ns
);
}
#[tokio::test]
async fn check_grid_string() {
let bbox: BBox = BBox {
lat_min: -90.0,
lat_max: 90.0,
lon_min: -180.0,
lon_max: 180.0,
lat_res: 5.0,
lon_res: 5.0,
};
let coord_str = format!("{}", bbox);
assert_eq!("90,-180_-90,180:5,5", coord_str);
}
#[tokio::test]
async fn check_deserialization() {
let s1 = r#"{"message" : "In case the limits don't match your understanding of the contr"#;
let s2 = r#"act, please contact us (api@meteomatics.com). For other inquiries please wri"#;
let s3 = r#"te to (support@meteomatics.com). Soft or hard limit values of 0 mean that th"#;
let s4 = r#"e corresponding limit is not set.", "user statistics" : {"username" : "rusty"#;
let s5 = r#"thecrab", "requests total" : {"used" : 4280, "soft limit" : 0, "hard limi"#;
let s6 = r#"t" : 0}, "requests since last UTC midnight" : {"used" : 85, "soft limit" : 1"#;
let s7 = r#"00000, "hard limit" : 0}, "requests since HH:00:00" : {"used" : 85, "soft lim"#;
let s8 = r#"it" : 10000, "hard limit" : 0}, "requests in the last 60 seconds" : {"used" :"#;
let s9 = r#" 0, "soft limit" : 0, "hard limit" : 6000}, "requests in parallel" : {"used" "#;
let s10 = r#": 0, "soft limit" : 20, "hard limit" : 500}, "historic request option" : "19"#;
let s11 = r#"00-01-01T00:00:00Z--2100-01-01T00:00:00Z", "area request option" : true, "mo"#;
let s12 = r#"del set" : ["all_minus_euro1k"], "error message" : "", "contact emails" : [""#;
let s13 = r#"rustythecrab@meteomatics.com"]}}"#;
let s = [s1, s2, s3, s4, s5, s6, s7, s8, s9, s10, s11, s12, s13].concat();
let json: UStatsResponse = serde_json::from_str(&s).unwrap();
assert_eq!(
json.message,
"In case the limits don't match your understanding of the contract, \
please contact us (api@meteomatics.com). For other inquiries please \
write to (support@meteomatics.com). Soft or hard limit values of 0 \
mean that the corresponding limit is not set."
);
assert_eq!(json.stats.username, "rustythecrab");
assert_eq!(json.stats.total.used, 4280);
assert_eq!(json.stats.total.soft_lim, 0);
assert_eq!(json.stats.total.hard_lim, 0);
assert!(json.stats.area);
assert_eq!(json.stats.models[0], "all_minus_euro1k");
assert_eq!(json.stats.contact[0], "rustythecrab@meteomatics.com");
}
}