http_app/
prom_metrics_server.rs1use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
2
3use arc_metrics::{helpers::RegisterableMetric, PromMetricRegistry, RegisterAction};
4use bytes::Bytes;
5use http_body_util::Full;
6use hyper::StatusCode;
7
8use crate::HttpServerHandler;
9
10pub struct PromMetricsServer {
11 init: PromMetricRegistry,
12 rw_lock: std::sync::RwLock<PromMetricRegistry>,
13 async_rw_lock: tokio::sync::RwLock<PromMetricRegistry>,
14 last_write_size: AtomicUsize,
15}
16
17impl PromMetricsServer {
18 pub fn new(reg: PromMetricRegistry) -> Arc<Self> {
19 Arc::new(PromMetricsServer {
20 init: reg,
21 rw_lock: std::sync::RwLock::new(PromMetricRegistry::new()),
22 async_rw_lock: tokio::sync::RwLock::new(PromMetricRegistry::new()),
23 last_write_size: AtomicUsize::new(0),
24 })
25 }
26
27 pub fn register_fn<T: 'static>(&self, metrics: &Arc<T>, register: impl FnOnce(&'static T, &mut RegisterAction<'_>)) {
28 let mut lock = self.rw_lock.write().expect("failed to lock");
29 lock.register_fn(metrics, register);
30 }
31
32 pub async fn register_async_fn<T: 'static>(&self, metrics: &Arc<T>, register: impl FnOnce(&'static T, &mut RegisterAction<'_>)) {
33 let mut lock = self.async_rw_lock.write().await;
34 lock.register_fn(metrics, register);
35 }
36
37 pub fn register<T: RegisterableMetric + 'static>(&self, metrics: &Arc<T>) {
38 let mut lock = self.rw_lock.write().expect("failed to lock");
39 lock.register(metrics);
40 }
41
42 pub async fn register_async<T: RegisterableMetric + 'static>(&self, metrics: &Arc<T>) {
43 let mut lock = self.async_rw_lock.write().await;
44 lock.register(metrics);
45 }
46
47 pub async fn write<T: std::io::Write>(&self, f: &mut T) -> std::io::Result<()> {
48 write!(f, "{}", self.init)?;
49 write!(f, "{}", self.rw_lock.read().expect("failed to read lock"))?;
50 write!(f, "{}", self.async_rw_lock.read().await)?;
51 Ok(())
52 }
53}
54
55impl HttpServerHandler for PromMetricsServer {
56 type Body = Full<Bytes>;
57
58 async fn handle_request(self: Arc<Self>, _source: std::net::IpAddr, _request: hyper::Request<hyper::body::Incoming>) -> hyper::Response<Self::Body> {
59 let mut out = Vec::with_capacity(self.last_write_size.load(Ordering::Relaxed));
60
61 if let Err(error) = self.write(&mut out).await {
62 tracing::error!(?error, "failed to write prom metrics string");
63
64 return hyper::Response::builder()
65 .status(StatusCode::INTERNAL_SERVER_ERROR)
66 .body(Full::new(Bytes::from_static("prom metrics serialization error".as_bytes())))
67 .unwrap();
68 }
69
70 self.last_write_size.store(out.len(), Ordering::Relaxed);
71 hyper::Response::new(Full::new(Bytes::from(out)))
72 }
73}
74