use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use prometheus::Registry;
use tokio::task::JoinHandle;
use dr_metrix_core::error::Result;
use dr_metrix_core::MetricsCollector;
use crate::http_metrics::HttpMetrics;
use crate::layer::HttpMetricsLayer;
pub struct PrometheusMetrics {
pub registry: Registry,
pub http_metrics: Arc<HttpMetrics>,
handles: std::sync::Mutex<Vec<(&'static str, JoinHandle<()>)>>,
collector_count: AtomicUsize,
}
impl PrometheusMetrics {
pub fn builder(namespace: &str) -> PrometheusMetricsBuilder {
PrometheusMetricsBuilder {
namespace: namespace.to_string(),
process_collector: false,
}
}
pub fn http_layer(&self) -> HttpMetricsLayer {
HttpMetricsLayer::new(self.http_metrics.clone())
}
pub fn add_collector<C: MetricsCollector>(
&self,
collector: C,
interval: Duration,
) -> Result<()> {
let name = collector.name();
collector.register(&self.registry)?;
let arc = Arc::new(collector);
let handle = arc.spawn_collect_loop(interval);
self.handles
.lock()
.unwrap()
.push((name, handle));
self.collector_count.fetch_add(1, Ordering::Relaxed);
tracing::info!(collector = name, ?interval, "collector registered and running");
Ok(())
}
pub fn collector_count(&self) -> usize {
self.collector_count.load(Ordering::Relaxed)
}
pub fn shutdown(&self) {
let mut handles = self.handles.lock().unwrap();
for (name, handle) in handles.drain(..) {
tracing::info!(collector = name, "shutting down collector");
handle.abort();
}
}
}
impl Drop for PrometheusMetrics {
fn drop(&mut self) {
self.shutdown();
}
}
pub struct PrometheusMetricsBuilder {
namespace: String,
process_collector: bool,
}
impl PrometheusMetricsBuilder {
pub fn with_process_collector(mut self) -> Self {
self.process_collector = true;
self
}
pub fn build(self) -> Result<PrometheusMetrics> {
let registry = Registry::new();
if self.process_collector {
let pc = prometheus::process_collector::ProcessCollector::for_self();
registry
.register(Box::new(pc))
.map_err(dr_metrix_core::MetricsError::Prometheus)?;
tracing::info!("process collector registered");
}
let http_metrics = Arc::new(HttpMetrics::new(&self.namespace)?);
http_metrics.register(®istry)?;
Ok(PrometheusMetrics {
registry,
http_metrics,
handles: std::sync::Mutex::new(Vec::new()),
collector_count: AtomicUsize::new(0),
})
}
}