use clap::Parser;
use nws_exporter::client::{ClientError, WeatherGovClient};
use nws_exporter::http::RequestContext;
use nws_exporter::metrics::ForecastMetrics;
use reqwest::Client;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal::unix::{self, SignalKind};
use tracing::{Instrument, Level};
const DEFAULT_LOG_LEVEL: Level = Level::INFO;
const DEFAULT_BIND_ADDR: ([u8; 4], u16) = ([0, 0, 0, 0], 9782);
const DEFAULT_REFERSH_SECS: u64 = 300;
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000;
const DEFAULT_API_URL: &str = "https://api.weather.gov/";
#[derive(Debug, Parser)]
#[clap(name = "nws_exporter", version = clap::crate_version!())]
struct NwsExporterApplication {
#[clap(long)]
station: String,
#[clap(long, default_value_t = DEFAULT_API_URL.into())]
api_url: String,
#[clap(long, default_value_t = DEFAULT_LOG_LEVEL)]
log_level: Level,
#[clap(long, default_value_t = DEFAULT_REFERSH_SECS)]
refresh_secs: u64,
#[clap(long, default_value_t = DEFAULT_TIMEOUT_MILLIS)]
timeout_millis: u64,
#[clap(long, default_value_t = DEFAULT_BIND_ADDR.into())]
bind: SocketAddr,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let opts = NwsExporterApplication::parse();
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(opts.log_level)
.finish(),
)
.expect("failed to set tracing subscriber");
let timeout = Duration::from_millis(opts.timeout_millis);
let http_client = Client::builder().timeout(timeout).build().unwrap_or_else(|e| {
tracing::error!(message = "unable to initialize HTTP client", error = %e);
process::exit(1)
});
let client = WeatherGovClient::new(http_client, &opts.api_url);
match client.station(&opts.station).await {
Err(ClientError::InvalidStation(station)) => {
tracing::error!(message = "invalid station provided", station = %station);
process::exit(1)
}
Err(e) => {
tracing::warn!(message = "failed to fetch initial station information", error = %e);
}
Ok(s) => {
tracing::debug!(message = "verified station information", station = ?s);
}
}
let station = opts.station.clone();
let registry = prometheus::default_registry().clone();
let metrics = ForecastMetrics::new(®istry);
let mut interval = tokio::time::interval(Duration::from_secs(opts.refresh_secs));
tokio::spawn(async move {
tracing::info!(message = "forecast polling started", api_url = %opts.api_url, station = %station);
loop {
let _ = interval.tick().await;
match client
.observation(&station)
.instrument(tracing::span!(Level::DEBUG, "nws_observation"))
.await
{
Ok(obs) => {
metrics.observe(&obs);
tracing::info!(message = "fetched new forecast", observation = %obs.id);
}
Err(e) => {
tracing::error!(message = "failed to fetch forecast", error = %e);
}
}
}
});
let context = Arc::new(RequestContext::new(registry));
let handler = nws_exporter::http::text_metrics(context);
let (sock, server) = warp::serve(handler)
.try_bind_with_graceful_shutdown(opts.bind, async {
tokio::select! {
_ = sigterm() => {}
_ = sigint() => {}
}
})
.unwrap_or_else(|e| {
tracing::error!(message = "error binding to address", address = %opts.bind, error = %e);
process::exit(1)
});
tracing::info!(message = "server started", address = %sock);
server.await;
tracing::info!("server shutdown");
Ok(())
}
async fn sigterm() -> io::Result<()> {
unix::signal(SignalKind::terminate())?.recv().await;
Ok(())
}
async fn sigint() -> io::Result<()> {
unix::signal(SignalKind::interrupt())?.recv().await;
Ok(())
}