cloud_util/
metrics.rs

1use axum::routing::get;
2use axum::{http::StatusCode, response::IntoResponse, Router};
3use hyper::{Request, Response};
4use lazy_static::lazy_static;
5use prometheus::{
6    exponential_buckets, gather, register_counter, register_gauge, register_histogram, Counter,
7    Encoder, Gauge, Histogram, TextEncoder,
8};
9use std::collections::HashMap;
10use std::time::Instant;
11use std::{
12    sync::{Arc, RwLock},
13    task::{Context, Poll},
14};
15use tonic::body::BoxBody;
16use tower::{Layer, Service};
17
18lazy_static! {
19    static ref METRICS_DATA: Arc<RwLock<HashMap<String, MetricsType>>> =
20        Arc::new(RwLock::new(HashMap::new()));
21}
22
23#[derive(Debug, Clone)]
24pub enum MetricsType {
25    Histogram(Option<Histogram>),
26    Gauge(Option<Gauge>),
27    Counter(Option<Counter>),
28}
29
30fn register_metrics(
31    metrics_type: &MetricsType,
32    key: &str,
33    help_info: &str,
34    histogram_buckets: Option<Vec<f64>>,
35) -> Option<MetricsType> {
36    match metrics_type {
37        MetricsType::Histogram(_) => {
38            match register_histogram!(
39                key,
40                help_info,
41                histogram_buckets.unwrap_or(exponential_buckets(0.0001, 10.0, 8).unwrap()),
42            ) {
43                Ok(h) => {
44                    info!("register histogram: {key} success");
45                    Some(MetricsType::Histogram(Some(h)))
46                }
47                Err(e) => {
48                    warn!("register histogram: {key} failed: {e}");
49                    None
50                }
51            }
52        }
53        MetricsType::Gauge(_) => match register_gauge!(key, help_info,) {
54            Ok(g) => {
55                info!("register gauge: {key} success");
56                Some(MetricsType::Gauge(Some(g)))
57            }
58            Err(e) => {
59                warn!("register gauge: {key} failed: {e}");
60                None
61            }
62        },
63        MetricsType::Counter(_) => match register_counter!(key, help_info,) {
64            Ok(c) => {
65                info!("register counter: {key} success");
66                Some(MetricsType::Counter(Some(c)))
67            }
68            Err(e) => {
69                warn!("register counter: {key} failed: {e}");
70                None
71            }
72        },
73    }
74}
75
76fn is_same(m1: &MetricsType, m2: &MetricsType) -> bool {
77    matches!(
78        (m1, m2),
79        (MetricsType::Histogram(_), MetricsType::Histogram(_))
80            | (MetricsType::Gauge(_), MetricsType::Gauge(_))
81            | (MetricsType::Counter(_), MetricsType::Counter(_))
82    )
83}
84
85fn is_key_valid(key: &str) -> bool {
86    let mut key_chars = key.chars();
87    if key.is_empty() || key_chars.next().unwrap().is_ascii_digit() {
88        return false;
89    }
90    if key_chars.any(|c| !c.is_ascii_alphabetic() && !c.is_ascii_digit() && c != '_') {
91        return false;
92    }
93    true
94}
95
96pub fn get_metrics(
97    metrics_type: &MetricsType,
98    key: &str,
99    help_info: &str,
100    histogram_buckets: Option<Vec<f64>>,
101) -> Option<MetricsType> {
102    if !is_key_valid(key) {
103        warn!("get_metrics: {key} failed: key invalid");
104        return None;
105    }
106    match METRICS_DATA.write() {
107        Ok(mut write) => {
108            if !write.contains_key(key) {
109                if let Some(metrics) =
110                    register_metrics(metrics_type, key, help_info, histogram_buckets)
111                {
112                    write.insert(key.to_string(), metrics);
113                } else {
114                    return None;
115                }
116            }
117        }
118        Err(e) => {
119            warn!("get_metrics: {key} failed: {e}");
120            return None;
121        }
122    };
123    match METRICS_DATA.read() {
124        Ok(read) => match read.get(key).cloned() {
125            Some(m) => {
126                if is_same(metrics_type, &m) {
127                    Some(m)
128                } else {
129                    warn!("get_metrics: {key} failed, type:{metrics_type:?}, type not same");
130                    None
131                }
132            }
133            None => {
134                warn!("get_metrics: {key} failed, type:{metrics_type:?}, not found");
135                None
136            }
137        },
138        Err(e) => {
139            warn!("get_metrics: {key} failed: {e}");
140            None
141        }
142    }
143}
144
145// TODO: match any number of method parameter
146macro_rules! impl_metrics {
147    ($func_name:ident, $enum_type:ident, $method_name:ident, $data_type:ident) => {
148        pub fn $func_name(
149            metrics_type: MetricsType,
150            key: String,
151            help_info: String,
152            histogram_buckets: Option<Vec<f64>>,
153            data: $data_type,
154        ) {
155            if let Some(MetricsType::$enum_type(Some(g))) =
156                get_metrics(&metrics_type, &key, &help_info, histogram_buckets)
157            {
158                g.$method_name(data);
159            }
160        }
161    };
162}
163
164impl_metrics!(gauge_set, Gauge, set, f64);
165impl_metrics!(gauge_add, Gauge, add, f64);
166impl_metrics!(gauge_sub, Gauge, sub, f64);
167impl_metrics!(histogram_observe, Histogram, observe, f64);
168impl_metrics!(counter_inc_by, Counter, inc_by, f64);
169
170// grpc call metrics
171fn rpc_info_to_key(client_name: &str, method_name: &str) -> String {
172    client_name.to_string() + "_to_" + method_name
173}
174
175#[derive(Debug, Clone)]
176pub struct MiddlewareLayer {
177    buckets: Vec<f64>,
178}
179
180impl MiddlewareLayer {
181    pub fn new(buckets: Vec<f64>) -> Self {
182        MiddlewareLayer { buckets }
183    }
184}
185
186impl<S> Layer<S> for MiddlewareLayer {
187    type Service = RpcMetricsService<S>;
188
189    fn layer(&self, service: S) -> Self::Service {
190        RpcMetricsService {
191            inner: service,
192            buckets: self.buckets.clone(),
193        }
194    }
195}
196
197#[derive(Debug, Clone)]
198pub struct RpcMetricsService<S> {
199    inner: S,
200    buckets: Vec<f64>,
201}
202
203impl<S> Service<Request<BoxBody>> for RpcMetricsService<S>
204where
205    S: Service<Request<BoxBody>, Response = Response<BoxBody>> + Clone + Send + 'static,
206    S::Future: Send + 'static,
207{
208    type Response = S::Response;
209    type Error = S::Error;
210    type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
211
212    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
213        self.inner.poll_ready(cx)
214    }
215
216    fn call(&mut self, req: Request<BoxBody>) -> Self::Future {
217        let clone = self.inner.clone();
218        let mut inner = std::mem::replace(&mut self.inner, clone);
219
220        // parse client_name and method_name from request
221        let client_name = req
222            .headers()
223            .get("client-name")
224            .map(|v| v.to_str().unwrap());
225        let uri_string = req.uri().to_string();
226        let method_name = uri_string.rsplit_once('/').map(|c| c.1);
227
228        if let (Some(client_name), Some(method_name)) = (client_name, method_name) {
229            let key = rpc_info_to_key(client_name, method_name);
230            let bucket = Some(self.buckets.clone());
231            Box::pin(async move {
232                let started = Instant::now();
233                let response = inner.call(req).await?;
234                let elapsed = started.elapsed().as_secs_f64() * 1000f64;
235                histogram_observe(
236                    MetricsType::Histogram(None),
237                    key,
238                    "request latencies in milliseconds(ms)".to_string(),
239                    bucket,
240                    elapsed,
241                );
242                Ok(response)
243            })
244        } else {
245            Box::pin(async move {
246                let response = inner.call(req).await?;
247                Ok(response)
248            })
249        }
250    }
251}
252
253// exporter
254pub async fn run_metrics_exporter(
255    port: u16,
256) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
257    let app = Router::new()
258        .route("/metrics", get(exporter))
259        .fallback(handler_404);
260    let listener = tokio::net::TcpListener::bind(format!("[::]:{}", port)).await?;
261    axum::serve(listener, app).await?;
262    info!("exporting metrics to http://[::]:{}/metrics", port);
263
264    Ok(())
265}
266
267async fn exporter() -> impl IntoResponse {
268    let mut buffer = vec![];
269    let encoder = TextEncoder::new();
270    let metric_families = gather();
271    let _ = encoder.encode(&metric_families, &mut buffer);
272
273    (StatusCode::OK, String::from_utf8(buffer).unwrap())
274}
275
276async fn handler_404() -> impl IntoResponse {
277    (
278        StatusCode::NOT_FOUND,
279        "
280default:\n
281/60000/metrics for network\n
282/60001/metrics for consensus\n
283/60002/metrics for executor\n
284/60003/metrics for storage\n
285/60004/metrics for controller\n
286/60005/metrics for crypto\n
287        "
288        .to_string(),
289    )
290}