prometheus_endpoint/
lib.rs

1// This file is part of Tetcore.
2
3// Copyright (C) 2019-2021 Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18use futures_util::{FutureExt, future::Future};
19pub use prometheus::{
20	self,
21	Registry, Error as PrometheusError, Opts,
22	Histogram, HistogramOpts, HistogramVec,
23	exponential_buckets,
24	core::{
25		GenericGauge as Gauge, GenericCounter as Counter,
26		GenericGaugeVec as GaugeVec, GenericCounterVec as CounterVec,
27		AtomicF64 as F64, AtomicI64 as I64, AtomicU64 as U64,
28	}
29};
30use prometheus::{Encoder, TextEncoder, core::Collector};
31use std::net::SocketAddr;
32
33#[cfg(not(target_os = "unknown"))]
34mod networking;
35mod sourced;
36
37pub use sourced::{SourcedCounter, SourcedGauge, MetricSource, SourcedMetric};
38
39#[cfg(target_os = "unknown")]
40pub use unknown_os::init_prometheus;
41#[cfg(not(target_os = "unknown"))]
42pub use known_os::init_prometheus;
43
44pub fn register<T: Clone + Collector + 'static>(metric: T, registry: &Registry) -> Result<T, PrometheusError> {
45	registry.register(Box::new(metric.clone()))?;
46	Ok(metric)
47}
48
49// On WASM `init_prometheus` becomes a no-op.
50#[cfg(target_os = "unknown")]
51mod unknown_os {
52	use super::*;
53
54	pub enum Error {}
55
56	pub async fn init_prometheus(_: SocketAddr, _registry: Registry) -> Result<(), Error> {
57		Ok(())
58	}
59}
60
61#[cfg(not(target_os = "unknown"))]
62mod known_os {
63	use super::*;
64	use hyper::http::StatusCode;
65	use hyper::{Server, Body, Request, Response, service::{service_fn, make_service_fn}};
66
67	#[derive(Debug, derive_more::Display, derive_more::From)]
68	pub enum Error {
69		/// Hyper internal error.
70		Hyper(hyper::Error),
71		/// Http request error.
72		Http(hyper::http::Error),
73		/// i/o error.
74		Io(std::io::Error),
75		#[display(fmt = "Prometheus port {} already in use.", _0)]
76		PortInUse(SocketAddr)
77	}
78
79	impl std::error::Error for Error {
80		fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
81			match self {
82				Error::Hyper(error) => Some(error),
83				Error::Http(error) => Some(error),
84				Error::Io(error) => Some(error),
85				Error::PortInUse(_) => None
86			}
87		}
88	}
89
90	async fn request_metrics(req: Request<Body>, registry: Registry) -> Result<Response<Body>, Error> {
91		if req.uri().path() == "/metrics" {
92			let metric_families = registry.gather();
93			let mut buffer = vec![];
94			let encoder = TextEncoder::new();
95			encoder.encode(&metric_families, &mut buffer).unwrap();
96
97			Response::builder().status(StatusCode::OK)
98				.header("Content-Type", encoder.format_type())
99				.body(Body::from(buffer))
100				.map_err(Error::Http)
101		} else {
102			Response::builder().status(StatusCode::NOT_FOUND)
103				.body(Body::from("Not found."))
104				.map_err(Error::Http)
105		}
106
107	}
108
109	#[derive(Clone)]
110	pub struct Executor;
111
112	impl<T> hyper::rt::Executor<T> for Executor
113	where
114		T: Future + Send + 'static,
115		T::Output: Send + 'static,
116	{
117		fn execute(&self, future: T) {
118			async_std::task::spawn(future);
119		}
120	}
121
122	/// Initializes the metrics context, and starts an HTTP server
123	/// to serve metrics.
124	pub async fn init_prometheus(prometheus_addr: SocketAddr, registry: Registry) -> Result<(), Error>{
125		use networking::Incoming;
126		let listener = async_std::net::TcpListener::bind(&prometheus_addr)
127			.await
128			.map_err(|_| Error::PortInUse(prometheus_addr))?;
129
130		log::info!("〽️ Prometheus server started at {}", prometheus_addr);
131
132		let service = make_service_fn(move |_| {
133			let registry = registry.clone();
134
135			async move {
136				Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
137					request_metrics(req, registry.clone())
138				}))
139			}
140		});
141
142		let server = Server::builder(Incoming(listener.incoming()))
143			.executor(Executor)
144			.serve(service)
145			.boxed();
146
147		let result = server.await.map_err(Into::into);
148
149		result
150	}
151}