http_app/
prom_metrics_server.rs

1use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
2
3use arc_metrics::{helpers::RegisterableMetric, PromMetricRegistry, RegisterAction};
4use bytes::Bytes;
5use http_body_util::Full;
6use hyper::StatusCode;
7
8use crate::HttpServerHandler;
9
10pub struct PromMetricsServer {
11    init: PromMetricRegistry,
12    rw_lock: std::sync::RwLock<PromMetricRegistry>,
13    async_rw_lock: tokio::sync::RwLock<PromMetricRegistry>,
14    last_write_size: AtomicUsize,
15}
16
17impl PromMetricsServer {
18    pub fn new(reg: PromMetricRegistry) -> Arc<Self> {
19        Arc::new(PromMetricsServer {
20            init: reg,
21            rw_lock: std::sync::RwLock::new(PromMetricRegistry::new()),
22            async_rw_lock: tokio::sync::RwLock::new(PromMetricRegistry::new()),
23            last_write_size: AtomicUsize::new(0),
24        })
25    }
26
27    pub fn register_fn<T: 'static>(&self, metrics: &Arc<T>, register: impl FnOnce(&'static T, &mut RegisterAction<'_>)) {
28        let mut lock = self.rw_lock.write().expect("failed to lock");
29        lock.register_fn(metrics, register);
30    }
31
32    pub async fn register_async_fn<T: 'static>(&self, metrics: &Arc<T>, register: impl FnOnce(&'static T, &mut RegisterAction<'_>)) {
33        let mut lock = self.async_rw_lock.write().await;
34        lock.register_fn(metrics, register);
35    }
36
37    pub fn register<T: RegisterableMetric + 'static>(&self, metrics: &Arc<T>) {
38        let mut lock = self.rw_lock.write().expect("failed to lock");
39        lock.register(metrics);
40    }
41
42    pub async fn register_async<T: RegisterableMetric + 'static>(&self, metrics: &Arc<T>) {
43        let mut lock = self.async_rw_lock.write().await;
44        lock.register(metrics);
45    }
46
47    pub async fn write<T: std::io::Write>(&self, f: &mut T) -> std::io::Result<()> {
48        write!(f, "{}", self.init)?;
49        write!(f, "{}", self.rw_lock.read().expect("failed to read lock"))?;
50        write!(f, "{}", self.async_rw_lock.read().await)?;
51        Ok(())
52    }
53}
54
55impl HttpServerHandler for PromMetricsServer {
56    type Body = Full<Bytes>;
57
58    async fn handle_request(self: Arc<Self>, _source: std::net::IpAddr, _request: hyper::Request<hyper::body::Incoming>) -> hyper::Response<Self::Body> {
59        let mut out = Vec::with_capacity(self.last_write_size.load(Ordering::Relaxed));
60
61        if let Err(error) = self.write(&mut out).await {
62            tracing::error!(?error, "failed to write prom metrics string");
63
64            return hyper::Response::builder()
65                .status(StatusCode::INTERNAL_SERVER_ERROR)
66                .body(Full::new(Bytes::from_static("prom metrics serialization error".as_bytes())))
67                .unwrap();
68        }
69
70        self.last_write_size.store(out.len(), Ordering::Relaxed);
71        hyper::Response::new(Full::new(Bytes::from(out)))
72    }
73}
74