use log::{debug, error, info};
use metrics::{describe_gauge, describe_histogram, gauge, histogram};
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
use std::iter::Iterator;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{SystemTime, SystemTimeError};
use tokio::sync::Mutex;
use tokio::time::{interval, Duration, Interval, MissedTickBehavior};
use openweathermap_client::models::CurrentWeather;
use openweathermap_client::{Client, Query};
use crate::metric_metadata::*;
use crate::ExporterConfig;
use crate::ExporterError;
pub struct Exporter {
config: ExporterConfig,
client: Client,
rate_limiter: Arc<Mutex<Interval>>,
}
impl Exporter {
pub fn new(config: ExporterConfig) -> Result<Exporter, ExporterError> {
config.validate()?;
let rate_limit_duration_millis = (60000. / (config.max_calls_per_minute as f32)).round() as u64;
let mut rate_limiter = interval(Duration::from_millis(rate_limit_duration_millis));
rate_limiter.set_missed_tick_behavior(MissedTickBehavior::Delay);
Ok(Exporter {
client: Client::new(config.owm.clone())?,
config,
rate_limiter: Arc::new(Mutex::new(rate_limiter)),
})
}
pub async fn run(&mut self) -> Result<(), ExporterError> {
info!("Starting");
info!("config={:?}", self.config);
self.init_prometheus_exporter()?;
self.reading_loop().await
}
async fn reading_loop(&self) -> ! {
info!("Starting reading loop");
let mut poll_interval = interval(self.config.poll_interval);
poll_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
poll_interval.reset();
loop {
for query in self.config.query_iterator() {
self.slow_down().await;
info!("Getting weather for {:?}", query);
let start = SystemTime::now();
let reading = self.client.fetch_weather(query).await;
let call_duration = &SystemTime::now().duration_since(start);
update_call_time_metrics(call_duration);
match reading {
Ok(reading) => self.update_metrics_for_successful_query(query, &reading),
Err(e) => {
self.update_metrics_for_failed_query(query);
error!("Error reading weather for {:?}. Error: {:?}", query, e)
}
};
}
poll_interval.tick().await;
}
}
async fn slow_down(&self) {
self.rate_limiter.lock().await.tick().await;
}
fn init_prometheus_exporter(&self) -> Result<(), ExporterError> {
let listen_address = self.config.listen.address;
let port = self.config.listen.port;
let builder = PrometheusBuilder::new();
builder
.idle_timeout(MetricKindMask::ALL, Some(self.config.poll_interval))
.with_http_listener(SocketAddr::new(listen_address, port))
.install()?;
info!("Listening on {:?}:{:?}", listen_address, port);
self.describe_call_metrics();
self.describe_current_weather_metrics();
Ok(())
}
fn update_metrics_for_successful_query(&self, query: &dyn Query, reading: &CurrentWeather) {
debug!(
"updating metrics for successful query {:?} with reading {:?}",
query, reading
);
self.update_query_success_metrics(query, true);
let labels = labels_for(query, reading);
self.write_reading_values(reading, &labels);
}
fn update_metrics_for_failed_query(&self, query: &dyn Query) {
debug!("updating metrics for failed query {:?}", query);
self.update_query_success_metrics(query, false);
}
fn update_query_success_metrics(&self, query: &dyn Query, success: bool) {
let labels = labels_for_query(query);
gauge!(OWM_QUERY_SUCCESS.name(), if success { 1. } else { 0. }, &labels);
}
fn describe_call_metrics(&self) {
describe_histogram!(OWM_API_CALL_TIME_HIST.name(), OWM_API_CALL_TIME_HIST.description());
}
fn describe_current_weather_metrics(&self) {
let units = &self.config.owm.units;
vec![
OWM_QUERY_SUCCESS,
OWM_TIMESTAMP_SECONDS,
owm_temperature(units),
owm_temperature_feels_like(units),
OWM_PRESSURE,
OWM_HUMIDITY_PERCENT,
owm_wind_speed(units),
OWM_WIND_DIRECTION,
owm_wind_gust(units),
OWM_CLOUDINESS_PERCENT,
OWM_VISIBILITY,
OWM_RAIN_1H,
OWM_RAIN_3H,
OWM_SNOW_1H,
OWM_SNOW_3H,
]
.into_iter()
.for_each(|m| {
describe_gauge!(m.name(), m.description());
});
}
fn write_reading_values(&self, reading: &CurrentWeather, labels: &Vec<(&'static str, String)>) {
let units = &self.config.owm.units;
gauge!(OWM_TIMESTAMP_SECONDS.name(), reading.dt as f64, labels);
gauge!(owm_temperature(units).name(), reading.main.temp, labels);
gauge!(
owm_temperature_feels_like(units).name(),
reading.main.feels_like,
labels
);
gauge!(OWM_PRESSURE.name(), reading.main.pressure, labels);
gauge!(OWM_HUMIDITY_PERCENT.name(), reading.main.humidity, labels);
gauge!(owm_wind_speed(units).name(), reading.wind.speed, labels);
gauge!(OWM_WIND_DIRECTION.name(), reading.wind.deg, labels);
if let Some(gust) = reading.wind.gust {
gauge!(owm_wind_gust(units).name(), gust, labels);
}
gauge!(OWM_CLOUDINESS_PERCENT.name(), reading.clouds.cloudiness, labels);
gauge!(OWM_VISIBILITY.name(), reading.visibility as f64, labels);
if let Some(pv) = &reading.rain {
if let Some(mm) = pv.one_hour {
gauge!(OWM_RAIN_1H.name(), mm, labels);
}
if let Some(mm) = pv.three_hour {
gauge!(OWM_RAIN_3H.name(), mm, labels);
}
}
if let Some(pv) = &reading.snow {
if let Some(mm) = pv.one_hour {
gauge!(OWM_SNOW_1H.name(), mm, labels);
}
if let Some(mm) = pv.three_hour {
gauge!(OWM_SNOW_3H.name(), mm, labels);
}
}
}
}
fn update_call_time_metrics(call_duration: &Result<Duration, SystemTimeError>) {
if let Ok(duration) = call_duration {
histogram!(OWM_API_CALL_TIME_HIST.name(), duration.as_millis() as f64);
}
}
fn labels_for_query(query: &dyn Query) -> Vec<(&'static str, String)> {
let mut labels = query.query_params();
add_display_name(query, &mut labels);
labels
}
fn add_display_name(query: &dyn Query, labels: &mut Vec<(&'static str, String)>) {
if let Some(display_name) = query.get_display_name() {
labels.push(("display_name", display_name.to_string()));
}
}
fn labels_for(query: &dyn Query, reading: &CurrentWeather) -> Vec<(&'static str, String)> {
let location = match query.get_display_name() {
Some(name) => name,
None => &reading.name,
};
let mut labels = vec![("location", location.to_string())];
labels.append(&mut query.query_params());
add_display_name(query, &mut labels);
labels.push(("reading_id", reading.id.to_string()));
labels.push(("reading_lat", reading.coord.lat.to_string()));
labels.push(("reading_lon", reading.coord.lon.to_string()));
labels.push(("reading_name", reading.name.to_string()));
labels
}