use anyhow::Result;
use axum::{Router, routing::get};
use prometheus::{Encoder, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder};
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use tokio::time::{self, Duration, Instant};
use tokio_listener::Listener;
use crate::{morgue::shutdown_signal, queer::HRT};
const VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Clone, Default)]
pub struct TenXProgrammer {
pub registry: MetricRegistry,
pub persist_path: Option<PathBuf>,
pub persist_interval: Duration,
persister_abort: Option<tokio::task::AbortHandle>,
}
#[derive(Clone, Default)]
pub struct MetricRegistry {
registry: Arc<Registry>,
counters: Arc<RwLock<HashMap<String, LabeledIntCounterVec>>>,
}
impl MetricRegistry {
#[must_use]
pub fn gather(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
pub fn register(&self, c: LabeledIntCounterVec) -> Result<LabeledIntCounterVec> {
match self.registry.register(Box::new(c.counter.clone())) {
Ok(()) => {
self.counters
.write()
.map_err(|e| {
tracing::error!("failed to lock metrics registry for writing");
anyhow::anyhow!("{e}")
})?
.insert(c.name.clone(), c.clone());
Ok(c)
}
Err(prometheus::Error::AlreadyReg) => {
let counter = self
.counters
.read()
.map_err(|e| {
tracing::error!("failed to lock metrics registry for reading");
anyhow::anyhow!("{e}")
})?
.get(&c.name)
.unwrap()
.clone();
Ok(counter)
}
Err(e) => Err(e.into()),
}
}
}
#[derive(Deserialize, Debug, Default, Clone)]
pub struct PersistedMetrics {
#[serde(flatten)]
pub metrics: HashMap<String, Vec<PersistedMetric>>,
}
#[derive(Deserialize, Debug, Default, Clone)]
pub struct PersistedMetric {
pub labels: HashMap<String, String>,
pub value: f64,
}
impl PersistedMetrics {
pub fn update(&self, counter: &LabeledIntCounterVec) {
let Some(metrics) = self.metrics.get(&counter.name) else {
return;
};
tracing::debug!({ metric = counter.name }, "updating persisted metric");
for metric in metrics {
counter.set(&metric.labels, metric.value);
}
}
}
#[derive(Clone)]
pub struct LabeledIntCounterVec {
pub counter: IntCounterVec,
pub name: String,
pub labels: Vec<String>,
}
impl LabeledIntCounterVec {
pub fn new<S: AsRef<str>, T: AsRef<str>>(name: S, desc: S, labels: &[T]) -> Result<Self> {
let opts = Opts::new(name.as_ref(), desc.as_ref());
let metric_labels: Vec<_> = labels.iter().map(AsRef::as_ref).collect();
let counter = IntCounterVec::new(opts, metric_labels.as_slice()).inspect_err(|e| {
tracing::error!(
{ name = name.as_ref(), labels = format!("{:?}", metric_labels) },
"unable to create IntCounterVec metric: {e}"
);
})?;
Ok(Self {
counter,
name: name.as_ref().to_owned(),
labels: metric_labels.into_iter().map(ToOwned::to_owned).collect(),
})
}
pub fn inc<S: AsRef<str> + std::fmt::Debug>(&self, label_values: &[S]) -> Option<()> {
if label_values.len() != self.labels.len() {
tracing::error!(
{ name = self.name, expected = self.labels.len(), actual = label_values.len() },
"number of label values do not match",
);
return None;
}
self.counter.with_label_values(label_values).inc();
Some(())
}
pub fn inc_by<S: AsRef<str> + std::fmt::Debug>(
&self,
amount: u64,
label_values: &[S],
) -> Option<()> {
if label_values.len() != self.labels.len() {
tracing::error!(
{ name = self.name, expected = self.labels.len(), actual = label_values.len() },
"number of label values do not match",
);
return None;
}
self.counter.with_label_values(label_values).inc_by(amount);
Some(())
}
#[allow(clippy::cast_possible_truncation)]
#[allow(clippy::cast_sign_loss)]
pub fn set(&self, labels: &HashMap<String, String>, value: f64) -> Option<()> {
if labels.len() != self.labels.len() {
tracing::error!(
{ metric = self.name, expected = self.labels.len(), actual = labels.len() },
"number of label values do not match",
);
return None;
}
let mut values = Vec::new();
for name in &self.labels {
let Some(value) = labels.get(name) else {
tracing::error!(
{ metric = self.name, name },
"label not found in persisted metric"
);
return None;
};
values.push(value);
}
let counter = self.counter.with_label_values(&values);
counter.reset();
counter.inc_by(value as u64);
Some(())
}
}
impl TenXProgrammer {
pub fn new(persist_path: &Option<PathBuf>, persist_interval: Duration) -> Result<Self> {
let registry = Registry::new();
let version_opts = Opts::new(
"iocaine_version",
"Version of the running iocaine (in the 'version' label)",
);
let version = IntGaugeVec::new(version_opts, &["version"])?;
version.with_label_values(&[VERSION]).set(1);
registry.register(Box::new(version))?;
let tenx = Self {
registry: MetricRegistry {
registry: Arc::new(registry),
counters: Arc::default(),
},
persist_path: persist_path.clone(),
persist_interval,
persister_abort: None,
};
Ok(tenx)
}
pub fn persist(&self) -> Result<()> {
let Some(persist_path) = &self.persist_path else {
return Ok(());
};
tracing::debug!(
{ persist_path = persist_path.display().to_string() },
"persisting metrics"
);
let mut f = File::create(persist_path)?;
let encoder = HRT::new();
let metrics = self.registry.gather();
Ok(encoder.encode(&metrics, &mut f)?)
}
pub fn load_metrics(&self) -> Result<PersistedMetrics> {
let Some(ref persist_path) = self.persist_path else {
return Ok(PersistedMetrics::default());
};
tracing::debug!(
{ persist_path = persist_path.display().to_string() },
"loading persisted metrics"
);
let Ok(data) = std::fs::read_to_string(persist_path) else {
return Ok(PersistedMetrics::default());
};
Ok(serde_json::from_str(&data)?)
}
pub fn abort(&self) {
if let Some(handle) = &self.persister_abort {
handle.abort();
}
}
pub async fn serve(mut self, listener: Listener) -> Result<()> {
if let Some(ref persist_path) = self.persist_path {
let tenx = self.clone();
let persist_path = persist_path.display().to_string();
let handle = tokio::spawn(async move {
let sleep = time::sleep(self.persist_interval);
tokio::pin!(sleep);
loop {
sleep.as_mut().await;
if let Err(e) = tenx.persist() {
tracing::error!({ persist_path }, "unable to persist metrics: {e}");
}
sleep.as_mut().reset(Instant::now() + self.persist_interval);
}
});
self.persister_abort = Some(handle.abort_handle());
}
let registry = self.registry.clone();
let app = Router::new()
.route(
"/metrics",
get(async move || {
let encoder = TextEncoder::new();
let mut buffer = Vec::<u8>::new();
let metrics = registry.gather();
encoder.encode(&metrics, &mut buffer).unwrap();
let metrics = prometheus::gather();
encoder.encode(&metrics, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}),
)
.layer(tower_http::trace::TraceLayer::new_for_http());
Ok(axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(Some(self)))
.await?)
}
}