1use axum::routing::get;
2use axum::{http::StatusCode, response::IntoResponse, Router};
3use hyper::{Request, Response};
4use lazy_static::lazy_static;
5use prometheus::{
6 exponential_buckets, gather, register_counter, register_gauge, register_histogram, Counter,
7 Encoder, Gauge, Histogram, TextEncoder,
8};
9use std::collections::HashMap;
10use std::time::Instant;
11use std::{
12 sync::{Arc, RwLock},
13 task::{Context, Poll},
14};
15use tonic::body::BoxBody;
16use tower::{Layer, Service};
17
18lazy_static! {
19 static ref METRICS_DATA: Arc<RwLock<HashMap<String, MetricsType>>> =
20 Arc::new(RwLock::new(HashMap::new()));
21}
22
23#[derive(Debug, Clone)]
24pub enum MetricsType {
25 Histogram(Option<Histogram>),
26 Gauge(Option<Gauge>),
27 Counter(Option<Counter>),
28}
29
30fn register_metrics(
31 metrics_type: &MetricsType,
32 key: &str,
33 help_info: &str,
34 histogram_buckets: Option<Vec<f64>>,
35) -> Option<MetricsType> {
36 match metrics_type {
37 MetricsType::Histogram(_) => {
38 match register_histogram!(
39 key,
40 help_info,
41 histogram_buckets.unwrap_or(exponential_buckets(0.0001, 10.0, 8).unwrap()),
42 ) {
43 Ok(h) => {
44 info!("register histogram: {key} success");
45 Some(MetricsType::Histogram(Some(h)))
46 }
47 Err(e) => {
48 warn!("register histogram: {key} failed: {e}");
49 None
50 }
51 }
52 }
53 MetricsType::Gauge(_) => match register_gauge!(key, help_info,) {
54 Ok(g) => {
55 info!("register gauge: {key} success");
56 Some(MetricsType::Gauge(Some(g)))
57 }
58 Err(e) => {
59 warn!("register gauge: {key} failed: {e}");
60 None
61 }
62 },
63 MetricsType::Counter(_) => match register_counter!(key, help_info,) {
64 Ok(c) => {
65 info!("register counter: {key} success");
66 Some(MetricsType::Counter(Some(c)))
67 }
68 Err(e) => {
69 warn!("register counter: {key} failed: {e}");
70 None
71 }
72 },
73 }
74}
75
76fn is_same(m1: &MetricsType, m2: &MetricsType) -> bool {
77 matches!(
78 (m1, m2),
79 (MetricsType::Histogram(_), MetricsType::Histogram(_))
80 | (MetricsType::Gauge(_), MetricsType::Gauge(_))
81 | (MetricsType::Counter(_), MetricsType::Counter(_))
82 )
83}
84
85fn is_key_valid(key: &str) -> bool {
86 let mut key_chars = key.chars();
87 if key.is_empty() || key_chars.next().unwrap().is_ascii_digit() {
88 return false;
89 }
90 if key_chars.any(|c| !c.is_ascii_alphabetic() && !c.is_ascii_digit() && c != '_') {
91 return false;
92 }
93 true
94}
95
96pub fn get_metrics(
97 metrics_type: &MetricsType,
98 key: &str,
99 help_info: &str,
100 histogram_buckets: Option<Vec<f64>>,
101) -> Option<MetricsType> {
102 if !is_key_valid(key) {
103 warn!("get_metrics: {key} failed: key invalid");
104 return None;
105 }
106 match METRICS_DATA.write() {
107 Ok(mut write) => {
108 if !write.contains_key(key) {
109 if let Some(metrics) =
110 register_metrics(metrics_type, key, help_info, histogram_buckets)
111 {
112 write.insert(key.to_string(), metrics);
113 } else {
114 return None;
115 }
116 }
117 }
118 Err(e) => {
119 warn!("get_metrics: {key} failed: {e}");
120 return None;
121 }
122 };
123 match METRICS_DATA.read() {
124 Ok(read) => match read.get(key).cloned() {
125 Some(m) => {
126 if is_same(metrics_type, &m) {
127 Some(m)
128 } else {
129 warn!("get_metrics: {key} failed, type:{metrics_type:?}, type not same");
130 None
131 }
132 }
133 None => {
134 warn!("get_metrics: {key} failed, type:{metrics_type:?}, not found");
135 None
136 }
137 },
138 Err(e) => {
139 warn!("get_metrics: {key} failed: {e}");
140 None
141 }
142 }
143}
144
145macro_rules! impl_metrics {
147 ($func_name:ident, $enum_type:ident, $method_name:ident, $data_type:ident) => {
148 pub fn $func_name(
149 metrics_type: MetricsType,
150 key: String,
151 help_info: String,
152 histogram_buckets: Option<Vec<f64>>,
153 data: $data_type,
154 ) {
155 if let Some(MetricsType::$enum_type(Some(g))) =
156 get_metrics(&metrics_type, &key, &help_info, histogram_buckets)
157 {
158 g.$method_name(data);
159 }
160 }
161 };
162}
163
164impl_metrics!(gauge_set, Gauge, set, f64);
165impl_metrics!(gauge_add, Gauge, add, f64);
166impl_metrics!(gauge_sub, Gauge, sub, f64);
167impl_metrics!(histogram_observe, Histogram, observe, f64);
168impl_metrics!(counter_inc_by, Counter, inc_by, f64);
169
170fn rpc_info_to_key(client_name: &str, method_name: &str) -> String {
172 client_name.to_string() + "_to_" + method_name
173}
174
175#[derive(Debug, Clone)]
176pub struct MiddlewareLayer {
177 buckets: Vec<f64>,
178}
179
180impl MiddlewareLayer {
181 pub fn new(buckets: Vec<f64>) -> Self {
182 MiddlewareLayer { buckets }
183 }
184}
185
186impl<S> Layer<S> for MiddlewareLayer {
187 type Service = RpcMetricsService<S>;
188
189 fn layer(&self, service: S) -> Self::Service {
190 RpcMetricsService {
191 inner: service,
192 buckets: self.buckets.clone(),
193 }
194 }
195}
196
197#[derive(Debug, Clone)]
198pub struct RpcMetricsService<S> {
199 inner: S,
200 buckets: Vec<f64>,
201}
202
203impl<S> Service<Request<BoxBody>> for RpcMetricsService<S>
204where
205 S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Clone + Send + 'static,
206 S::Future: Send + 'static,
207{
208 type Response = S::Response;
209 type Error = S::Error;
210 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
211
212 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213 self.inner.poll_ready(cx)
214 }
215
216 fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
217 let clone = self.inner.clone();
218 let mut inner = std::mem::replace(&mut self.inner, clone);
219
220 let client_name = req
222 .headers()
223 .get("client-name")
224 .map(|v| v.to_str().unwrap());
225 let uri_string = req.uri().to_string();
226 let method_name = uri_string.rsplit_once('/').map(|c| c.1);
227
228 if let (Some(client_name), Some(method_name)) = (client_name, method_name) {
229 let key = rpc_info_to_key(client_name, method_name);
230 let bucket = Some(self.buckets.clone());
231 Box::pin(async move {
232 let started = Instant::now();
233 let response = inner.call(req).await?;
234 let elapsed = started.elapsed().as_secs_f64() * 1000f64;
235 histogram_observe(
236 MetricsType::Histogram(None),
237 key,
238 "request latencies in milliseconds(ms)".to_string(),
239 bucket,
240 elapsed,
241 );
242 Ok(response)
243 })
244 } else {
245 Box::pin(async move {
246 let response = inner.call(req).await?;
247 Ok(response)
248 })
249 }
250 }
251}
252
253pub async fn run_metrics_exporter(
255 port: u16,
256) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
257 let app = Router::new()
258 .route("/metrics", get(exporter))
259 .fallback(handler_404);
260 let listener = tokio::net::TcpListener::bind(format!("[::]:{}", port)).await?;
261 axum::serve(listener, app).await?;
262 info!("exporting metrics to http://[::]:{}/metrics", port);
263
264 Ok(())
265}
266
267async fn exporter() -> impl IntoResponse {
268 let mut buffer = vec![];
269 let encoder = TextEncoder::new();
270 let metric_families = gather();
271 let _ = encoder.encode(&metric_families, &mut buffer);
272
273 (StatusCode::OK, String::from_utf8(buffer).unwrap())
274}
275
276async fn handler_404() -> impl IntoResponse {
277 (
278 StatusCode::NOT_FOUND,
279 "
280default:\n
281/60000/metrics for network\n
282/60001/metrics for consensus\n
283/60002/metrics for executor\n
284/60003/metrics for storage\n
285/60004/metrics for controller\n
286/60005/metrics for crypto\n
287 "
288 .to_string(),
289 )
290}