prometheus_endpoint/
lib.rs1use futures_util::{FutureExt, future::Future};
19pub use prometheus::{
20 self,
21 Registry, Error as PrometheusError, Opts,
22 Histogram, HistogramOpts, HistogramVec,
23 exponential_buckets,
24 core::{
25 GenericGauge as Gauge, GenericCounter as Counter,
26 GenericGaugeVec as GaugeVec, GenericCounterVec as CounterVec,
27 AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64,
28 }
29};
30use prometheus::{Encoder, TextEncoder, core::Collector};
31use std::net::SocketAddr;
32
33#[cfg(not(target_os = "unknown"))]
34mod networking;
35mod sourced;
36
37pub use sourced::{SourcedCounter, SourcedGauge, MetricSource, SourcedMetric};
38
39#[cfg(target_os = "unknown")]
40pub use unknown_os::init_prometheus;
41#[cfg(not(target_os = "unknown"))]
42pub use known_os::init_prometheus;
43
44pub fn register<T: Clone + Collector + 'static>(metric: T, registry: &Registry) -> Result<T, PrometheusError> {
45 registry.register(Box::new(metric.clone()))?;
46 Ok(metric)
47}
48
49#[cfg(target_os = "unknown")]
51mod unknown_os {
52 use super::*;
53
54 pub enum Error {}
55
56 pub async fn init_prometheus(_: SocketAddr, _registry: Registry) -> Result<(), Error> {
57 Ok(())
58 }
59}
60
61#[cfg(not(target_os = "unknown"))]
62mod known_os {
63 use super::*;
64 use hyper::http::StatusCode;
65 use hyper::{Server, Body, Request, Response, service::{service_fn, make_service_fn}};
66
67 #[derive(Debug, derive_more::Display, derive_more::From)]
68 pub enum Error {
69 Hyper(hyper::Error),
71 Http(hyper::http::Error),
73 Io(std::io::Error),
75 #[display(fmt = "Prometheus port {} already in use.", _0)]
76 PortInUse(SocketAddr)
77 }
78
79 impl std::error::Error for Error {
80 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
81 match self {
82 Error::Hyper(error) => Some(error),
83 Error::Http(error) => Some(error),
84 Error::Io(error) => Some(error),
85 Error::PortInUse(_) => None
86 }
87 }
88 }
89
90 async fn request_metrics(req: Request<Body>, registry: Registry) -> Result<Response<Body>, Error> {
91 if req.uri().path() == "/metrics" {
92 let metric_families = registry.gather();
93 let mut buffer = vec![];
94 let encoder = TextEncoder::new();
95 encoder.encode(&metric_families, &mut buffer).unwrap();
96
97 Response::builder().status(StatusCode::OK)
98 .header("Content-Type", encoder.format_type())
99 .body(Body::from(buffer))
100 .map_err(Error::Http)
101 } else {
102 Response::builder().status(StatusCode::NOT_FOUND)
103 .body(Body::from("Not found."))
104 .map_err(Error::Http)
105 }
106
107 }
108
109 #[derive(Clone)]
110 pub struct Executor;
111
112 impl<T> hyper::rt::Executor<T> for Executor
113 where
114 T: Future + Send + 'static,
115 T::Output: Send + 'static,
116 {
117 fn execute(&self, future: T) {
118 async_std::task::spawn(future);
119 }
120 }
121
122 pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error>{
125 use networking::Incoming;
126 let listener = async_std::net::TcpListener::bind(&prometheus_addr)
127 .await
128 .map_err(|_| Error::PortInUse(prometheus_addr))?;
129
130 log::info!("〽️ Prometheus server started at {}", prometheus_addr);
131
132 let service = make_service_fn(move |_| {
133 let registry = registry.clone();
134
135 async move {
136 Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
137 request_metrics(req, registry.clone())
138 }))
139 }
140 });
141
142 let server = Server::builder(Incoming(listener.incoming()))
143 .executor(Executor)
144 .serve(service)
145 .boxed();
146
147 let result = server.await.map_err(Into::into);
148
149 result
150 }
151}