use std::sync::Arc;
use std::time::Duration;
use reqwest::Client;
use time::OffsetDateTime;
use tracing::{debug, info, warn};
use crate::config::PrometheusConfig;
use crate::state::AppState;
pub struct PrometheusPusher {
state: Arc<AppState>,
}
impl PrometheusPusher {
pub fn new(state: Arc<AppState>) -> Self {
Self { state }
}
pub async fn start(&self) {
let config = self.state.config.read().await;
let prometheus_config = config.prometheus.clone();
drop(config);
if !prometheus_config.enabled {
debug!("Prometheus metrics endpoint is enabled but push gateway is not configured");
return;
}
let push_gateway = match &prometheus_config.push_gateway {
Some(url) if !url.is_empty() => url.clone(),
_ => {
debug!("Prometheus push gateway URL not configured");
return;
}
};
info!(
"Starting Prometheus pusher to {} (interval: {}s)",
push_gateway, prometheus_config.push_interval
);
let state = Arc::clone(&self.state);
let stop_rx = self.state.collector.subscribe_stop();
tokio::spawn(async move {
run_prometheus_pusher(state, prometheus_config, push_gateway, stop_rx).await;
});
}
}
async fn run_prometheus_pusher(
state: Arc<AppState>,
config: PrometheusConfig,
push_gateway: String,
mut stop_rx: tokio::sync::watch::Receiver<bool>,
) {
let client = match Client::builder().timeout(Duration::from_secs(30)).build() {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to create HTTP client for Prometheus pusher: {e}");
return;
}
};
let push_interval = Duration::from_secs(config.push_interval);
let mut interval = tokio::time::interval(push_interval);
let job_name = "aranet-service";
loop {
tokio::select! {
_ = interval.tick() => {
let metrics = generate_metrics(&state).await;
if let Err(e) = push_metrics(&client, &push_gateway, job_name, &metrics).await {
warn!("Failed to push metrics to Prometheus: {}", e);
} else {
debug!("Pushed metrics to Prometheus push gateway");
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
info!("Prometheus pusher received stop signal");
break;
}
}
}
}
info!("Prometheus pusher stopped");
}
async fn generate_metrics(state: &AppState) -> String {
let mut output = String::with_capacity(4096);
let running = state.collector.is_running();
let uptime = state.collector.started_at().map(|s| {
let now = OffsetDateTime::now_utc();
(now - s).whole_seconds().max(0)
});
output.push_str("# HELP aranet_collector_running Whether the collector is running\n");
output.push_str("# TYPE aranet_collector_running gauge\n");
output.push_str(&format!(
"aranet_collector_running {}\n",
if running { 1 } else { 0 }
));
if let Some(uptime_secs) = uptime {
output.push_str("# HELP aranet_collector_uptime_seconds Collector uptime\n");
output.push_str("# TYPE aranet_collector_uptime_seconds gauge\n");
output.push_str(&format!(
"aranet_collector_uptime_seconds {}\n",
uptime_secs
));
}
let device_stats = state.collector.device_stats.read().await;
if !device_stats.is_empty() {
output.push_str("# HELP aranet_device_poll_success_total Successful polls\n");
output.push_str("# TYPE aranet_device_poll_success_total counter\n");
for stat in device_stats.iter() {
let alias = stat.alias.as_deref().unwrap_or(&stat.device_id);
output.push_str(&format!(
"aranet_device_poll_success_total{{device=\"{}\",address=\"{}\"}} {}\n",
escape_label_value(alias),
escape_label_value(&stat.device_id),
stat.success_count
));
}
output.push_str("# HELP aranet_device_poll_failure_total Failed polls\n");
output.push_str("# TYPE aranet_device_poll_failure_total counter\n");
for stat in device_stats.iter() {
let alias = stat.alias.as_deref().unwrap_or(&stat.device_id);
output.push_str(&format!(
"aranet_device_poll_failure_total{{device=\"{}\",address=\"{}\"}} {}\n",
escape_label_value(alias),
escape_label_value(&stat.device_id),
stat.failure_count
));
}
}
drop(device_stats);
let device_readings: Vec<_> = {
let store = state.store.lock().await;
let devices = store.list_devices().unwrap_or_default();
devices
.into_iter()
.filter_map(|device| {
store
.get_latest_reading(&device.id)
.ok()
.flatten()
.map(|reading| (device, reading))
})
.collect()
};
if !device_readings.is_empty() {
output.push_str("# HELP aranet_co2_ppm CO2 concentration in ppm\n");
output.push_str("# TYPE aranet_co2_ppm gauge\n");
for (device, reading) in &device_readings {
let device_name = device.name.as_deref().unwrap_or(&device.id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_name),
escape_label_value(&device.id)
);
output.push_str(&format!("aranet_co2_ppm{{{}}} {}\n", labels, reading.co2));
}
output.push_str("# HELP aranet_temperature_celsius Temperature\n");
output.push_str("# TYPE aranet_temperature_celsius gauge\n");
for (device, reading) in &device_readings {
let device_name = device.name.as_deref().unwrap_or(&device.id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_name),
escape_label_value(&device.id)
);
output.push_str(&format!(
"aranet_temperature_celsius{{{}}} {:.2}\n",
labels, reading.temperature
));
}
output.push_str("# HELP aranet_humidity_percent Relative humidity\n");
output.push_str("# TYPE aranet_humidity_percent gauge\n");
for (device, reading) in &device_readings {
let device_name = device.name.as_deref().unwrap_or(&device.id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_name),
escape_label_value(&device.id)
);
output.push_str(&format!(
"aranet_humidity_percent{{{}}} {}\n",
labels, reading.humidity
));
}
output.push_str("# HELP aranet_pressure_hpa Atmospheric pressure\n");
output.push_str("# TYPE aranet_pressure_hpa gauge\n");
for (device, reading) in &device_readings {
let device_name = device.name.as_deref().unwrap_or(&device.id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_name),
escape_label_value(&device.id)
);
output.push_str(&format!(
"aranet_pressure_hpa{{{}}} {:.2}\n",
labels, reading.pressure
));
}
output.push_str("# HELP aranet_battery_percent Battery level\n");
output.push_str("# TYPE aranet_battery_percent gauge\n");
for (device, reading) in &device_readings {
let device_name = device.name.as_deref().unwrap_or(&device.id);
let labels = format!(
"device=\"{}\",address=\"{}\"",
escape_label_value(device_name),
escape_label_value(&device.id)
);
output.push_str(&format!(
"aranet_battery_percent{{{}}} {}\n",
labels, reading.battery
));
}
}
output
}
async fn push_metrics(
client: &Client,
push_gateway: &str,
job_name: &str,
metrics: &str,
) -> Result<(), PushError> {
let url = format!(
"{}/metrics/job/{}",
push_gateway.trim_end_matches('/'),
job_name
);
let response = client
.post(&url)
.header("Content-Type", "text/plain; version=0.0.4")
.body(metrics.to_string())
.send()
.await
.map_err(PushError::Request)?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(PushError::Response {
status: status.as_u16(),
body,
});
}
Ok(())
}
fn escape_label_value(s: &str) -> String {
s.replace('\\', "\\\\")
.replace('"', "\\\"")
.replace('\n', "\\n")
}
#[derive(Debug, thiserror::Error)]
pub enum PushError {
#[error("Request failed: {0}")]
Request(#[from] reqwest::Error),
#[error("Push gateway returned error {status}: {body}")]
Response { status: u16, body: String },
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_escape_label_value() {
assert_eq!(escape_label_value("hello"), "hello");
assert_eq!(escape_label_value("hello\"world"), "hello\\\"world");
assert_eq!(escape_label_value("hello\\world"), "hello\\\\world");
assert_eq!(escape_label_value("hello\nworld"), "hello\\nworld");
}
}