ckb_metrics_service/
lib.rs

1//! The service which handles the metrics data in CKB.
2
3use std::net::SocketAddr;
4
5use http_body_util::Full;
6use hyper::{
7    Error as HyperError, Method, Request, Response, body::Bytes, header::CONTENT_TYPE,
8    service::service_fn,
9};
10use hyper_util::{
11    rt::TokioExecutor,
12    server::{conn::auto, graceful::GracefulShutdown},
13};
14use prometheus::Encoder as _;
15use tokio::net::TcpListener;
16
17use ckb_async_runtime::Handle;
18use ckb_logger::info;
19use ckb_metrics_config::{Config, Exporter, Target};
20use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
21use ckb_util::strings;
22
23/// Ensures the metrics service can shutdown gracefully.
24#[must_use]
25pub enum Guard {
26    /// The metrics service is disabled.
27    Off,
28    /// The metrics service is enabled.
29    On,
30}
31
32/// Initializes the metrics service and lets it run in the background.
33///
34/// Returns [Guard](enum.Guard.html) if succeeded, or an `String` to describes the reason for the failure.
35pub fn init(config: Config, handle: Handle) -> Result<Guard, String> {
36    if config.exporter.is_empty() {
37        let _ignored = ckb_metrics::METRICS_SERVICE_ENABLED.set(false);
38        return Ok(Guard::Off);
39    }
40
41    for (name, exporter) in config.exporter {
42        check_exporter_name(&name)?;
43        run_exporter(exporter, &handle)?;
44    }
45    // The .set() method's return value can indicate whether the value has set or not.
46    // Just ignore its return value
47    // I don't care this because CKB only initializes the ckb-metrics-service once.
48    let _ignored = ckb_metrics::METRICS_SERVICE_ENABLED.set(true);
49
50    Ok(Guard::On)
51}
52
53fn check_exporter_name(name: &str) -> Result<(), String> {
54    strings::check_if_identifier_is_valid(name)
55}
56
57fn run_exporter(exporter: Exporter, handle: &Handle) -> Result<(), String> {
58    let Exporter { target } = exporter;
59    match target {
60        Target::Prometheus { listen_address } => {
61            let addr = listen_address
62                .parse::<SocketAddr>()
63                .map_err(|err| format!("failed to parse listen_address because {err}"))?;
64            let make_svc = service_fn(start_prometheus_service);
65            ckb_logger::info!("Start prometheus exporter at {}", addr);
66            handle.spawn(async move {
67                let listener = TcpListener::bind(&addr).await.unwrap();
68                let server = auto::Builder::new(TokioExecutor::new());
69                let graceful = GracefulShutdown::new();
70                let stop_rx: CancellationToken = new_tokio_exit_rx();
71                loop {
72                    tokio::select! {
73                        conn = listener.accept() => {
74                            let (stream, _) = match conn {
75                                Ok(conn) => conn,
76                                Err(e) => {
77                                    eprintln!("accept error: {}", e);
78                                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
79                                    continue;
80                                }
81                            };
82                            let stream = hyper_util::rt::TokioIo::new(Box::pin(stream));
83                            let conn = server.serve_connection_with_upgrades(stream, make_svc);
84
85                            let conn = graceful.watch(conn.into_owned());
86                            tokio::spawn(async move {
87                                if let Err(err) = conn.await {
88                                    info!("connection error: {}", err);
89                                }
90                            });
91                        },
92                        _ = stop_rx.cancelled() => {
93                            info!("Prometheus server received exit signal; exit now");
94                            break;
95                        }
96                    }
97                }
98                drop(listener);
99                graceful.shutdown().await;
100            });
101        }
102    }
103    Ok(())
104}
105
106async fn start_prometheus_service(
107    req: Request<hyper::body::Incoming>,
108) -> Result<Response<Full<Bytes>>, HyperError> {
109    Ok(match (req.method(), req.uri().path()) {
110        (&Method::GET, "/") => {
111            let mut buffer = vec![];
112            let encoder = prometheus::TextEncoder::new();
113            let metric_families = ckb_metrics::gather();
114            encoder.encode(&metric_families, &mut buffer).unwrap();
115            Response::builder()
116                .status(200)
117                .header(CONTENT_TYPE, encoder.format_type())
118                .body(Full::new(Bytes::from(buffer)))
119        }
120        _ => Response::builder()
121            .status(404)
122            .body(Full::from("Page Not Found")),
123    }
124    .unwrap())
125}