use std::{collections::HashMap, str::FromStr, time::Duration};
use metrics::{counter, gauge};
use serde::Deserialize;
use tracing::{error, info, trace, warn};
use crate::signals::Shutdown;
#[derive(Debug, Clone, Copy, thiserror::Error)]
pub enum Error {
#[error("Unexpected shutdown")]
EarlyShutdown,
}
#[derive(Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub struct Config {
uri: String,
metrics: Option<Vec<String>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MetricType {
Gauge,
Counter,
Histogram,
Summary,
}
#[derive(Debug)]
enum MetricTypeParseError {
UnknownType,
}
impl FromStr for MetricType {
type Err = MetricTypeParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"counter" => Ok(Self::Counter),
"gauge" => Ok(Self::Gauge),
"histogram" => Ok(Self::Histogram),
"summary" => Ok(Self::Summary),
_ => Err(MetricTypeParseError::UnknownType),
}
}
}
#[derive(Debug)]
pub struct Prometheus {
config: Config,
shutdown: Shutdown,
}
impl Prometheus {
pub(crate) fn new(config: Config, shutdown: Shutdown) -> Self {
Self { config, shutdown }
}
#[allow(clippy::cast_sign_loss)]
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::too_many_lines)]
pub(crate) async fn run(mut self) -> Result<(), Error> {
info!("Prometheus target metrics scraper running");
let client = reqwest::Client::new();
let server = async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let Ok(resp) = client.get(&self.config.uri).timeout(Duration::from_secs(1)).send().await else {
info!("failed to get Prometheus uri");
continue;
};
let Ok(text) = resp.text().await else {
info!("failed to read Prometheus response");
continue;
};
let mut typemap = HashMap::new();
for line in text.lines() {
if line.starts_with("# HELP") {
continue;
}
if line.starts_with("# TYPE") {
let mut parts = line.split_ascii_whitespace().skip(2);
let name = parts.next().unwrap();
let metric_type = parts.next().unwrap();
let metric_type: MetricType = metric_type.parse().unwrap();
if matches!(metric_type, MetricType::Histogram | MetricType::Summary) {
typemap.insert(format!("{name}_sum"), metric_type);
typemap.insert(format!("{name}_count"), metric_type);
typemap.insert(format!("{name}_bucket"), metric_type);
}
typemap.insert(name.to_owned(), metric_type);
continue;
}
let mut parts = line.split_ascii_whitespace();
let name_and_labels = parts.next().unwrap();
let value = parts.next().unwrap();
let (name, labels) = {
if let Some((name, labels)) = name_and_labels.split_once('{') {
let labels = labels.trim_end_matches('}');
let labels = labels.split(',').map(|label| {
let (label_name, label_value) = label.split_once('=').unwrap();
let label_value = label_value.trim_matches('\"');
(label_name.to_owned(), label_value.to_owned())
});
let labels = labels.collect::<Vec<_>>();
(name, Some(labels))
} else {
(name_and_labels, None)
}
};
let metric_type = typemap.get(name);
let name = name.replace("__", ".");
if let Some(metrics) = &self.config.metrics {
if !metrics.contains(&name) {
continue;
}
}
match metric_type {
Some(MetricType::Gauge) => {
let Ok(value): Result<f64, _> = value.parse() else {
let e = value.parse::<f64>().unwrap_err();
warn!("{e}: {name} = {value}");
continue;
};
trace!("gauge: {name} = {value}");
gauge!(format!("target/{name}"), value, &labels.unwrap_or_default());
}
Some(MetricType::Counter) => {
let Ok(value): Result<f64, _> = value.parse() else {
let e = value.parse::<f64>().unwrap_err();
warn!("{e}: {name} = {value}");
continue;
};
let value = if value < 0.0 {
warn!("Negative counter value unhandled");
continue;
} else {
if value > u64::MAX as f64 {
warn!("Counter value above maximum limit");
continue;
}
value as u64
};
trace!("counter: {name} = {value}");
counter!(format!("target/{name}"), value, &labels.unwrap_or_default());
}
Some(_) | None => {
trace!("unsupported metric type: {name} = {value}");
}
}
}
}
};
tokio::select! {
_res = server => {
error!("server shutdown unexpectedly");
Err(Error::EarlyShutdown)
}
_ = self.shutdown.recv() => {
info!("shutdown signal received");
Ok(())
}
}
}
}