use clap::Parser;
use nws_exporter::client::{ClientError, NwsClient};
use nws_exporter::http::RequestContext;
use nws_exporter::metrics::ForecastMetrics;
use reqwest::Client;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::process;
use std::sync::Arc;
use std::time::Duration;
use tokio::signal::unix::{self, SignalKind};
use tracing::{Instrument, Level};
const DEFAULT_LOG_LEVEL: Level = Level::INFO;
const DEFAULT_BIND_ADDR: ([u8; 4], u16) = ([0, 0, 0, 0], 9782);
const DEFAULT_REFERSH_SECS: u64 = 300;
const DEFAULT_TIMEOUT_MILLIS: u64 = 5000;
const DEFAULT_API_URL: &str = "https://api.weather.gov/";
#[derive(Debug, Parser)]
#[clap(name = "nws_exporter", version = clap::crate_version!())]
struct NwsExporterApplication {
#[clap(required = true)]
station: Vec<String>,
#[clap(long, default_value_t = DEFAULT_API_URL.into())]
api_url: String,
#[clap(long, default_value_t = DEFAULT_LOG_LEVEL)]
log_level: Level,
#[clap(long, default_value_t = DEFAULT_REFERSH_SECS)]
refresh_secs: u64,
#[clap(long, default_value_t = DEFAULT_TIMEOUT_MILLIS)]
timeout_millis: u64,
#[clap(long, default_value_t = DEFAULT_BIND_ADDR.into())]
bind: SocketAddr,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let opts = NwsExporterApplication::parse();
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(opts.log_level)
.finish(),
)
.expect("failed to set tracing subscriber");
let timeout = Duration::from_millis(opts.timeout_millis);
let http_client = Client::builder().timeout(timeout).build().unwrap_or_else(|e| {
tracing::error!(message = "unable to initialize HTTP client", error = %e);
process::exit(1)
});
let client = NwsClient::new(http_client, &opts.api_url).unwrap_or_else(|e| {
tracing::error!(message = "unable to initialize NWS client", error = %e);
process::exit(1)
});
let registry = prometheus::default_registry().clone();
let metrics = ForecastMetrics::new(®istry);
let update = UpdateTask::new(opts.station, metrics, client, Duration::from_secs(opts.refresh_secs));
if let Err(e) = update.initialize().await {
tracing::error!(message = "failed to fetch initial station information", error = %e);
process::exit(1);
}
tokio::spawn(update.run());
let context = Arc::new(RequestContext::new(registry));
let handler = nws_exporter::http::text_metrics(context);
let (sock, server) = warp::serve(handler)
.try_bind_with_graceful_shutdown(opts.bind, async {
tokio::select! {
_ = sigterm() => {}
_ = sigint() => {}
}
})
.unwrap_or_else(|e| {
tracing::error!(message = "error binding to address", address = %opts.bind, error = %e);
process::exit(1)
});
tracing::info!(message = "server started", address = %sock);
server.await;
tracing::info!("server shutdown");
Ok(())
}
async fn sigterm() -> io::Result<()> {
unix::signal(SignalKind::terminate())?.recv().await;
Ok(())
}
async fn sigint() -> io::Result<()> {
unix::signal(SignalKind::interrupt())?.recv().await;
Ok(())
}
struct UpdateTask {
stations: Vec<String>,
metrics: ForecastMetrics,
client: NwsClient,
interval: Duration,
}
impl UpdateTask {
fn new(stations: Vec<String>, metrics: ForecastMetrics, client: NwsClient, interval: Duration) -> Self {
Self {
stations,
metrics,
client,
interval,
}
}
async fn initialize(&self) -> Result<(), ClientError> {
for id in self.stations.iter() {
let station = self
.client
.station(id)
.instrument(tracing::span!(Level::DEBUG, "nws_station"))
.await?;
self.metrics.station(&station);
}
Ok(())
}
async fn run(self) -> ! {
let mut interval = tokio::time::interval(self.interval);
loop {
let _ = interval.tick().await;
for id in self.stations.iter() {
match self
.client
.observation(id)
.instrument(tracing::span!(Level::DEBUG, "nws_observation"))
.await
{
Ok(obs) => {
self.metrics.observation(&obs);
tracing::info!(message = "fetched new forecast", station_id = %id, observation = %obs.id);
}
Err(e) => {
tracing::error!(message = "failed to fetch forecast", station_id = %id, error = %e);
}
}
}
}
}
}