ckb_metrics_service/
lib.rs1use std::net::SocketAddr;
4
5use http_body_util::Full;
6use hyper::{
7 Error as HyperError, Method, Request, Response, body::Bytes, header::CONTENT_TYPE,
8 service::service_fn,
9};
10use hyper_util::{
11 rt::TokioExecutor,
12 server::{conn::auto, graceful::GracefulShutdown},
13};
14use prometheus::Encoder as _;
15use tokio::net::TcpListener;
16
17use ckb_async_runtime::Handle;
18use ckb_logger::info;
19use ckb_metrics_config::{Config, Exporter, Target};
20use ckb_stop_handler::{CancellationToken, new_tokio_exit_rx};
21use ckb_util::strings;
22
23#[must_use]
25pub enum Guard {
26 Off,
28 On,
30}
31
32pub fn init(config: Config, handle: Handle) -> Result<Guard, String> {
36 if config.exporter.is_empty() {
37 let _ignored = ckb_metrics::METRICS_SERVICE_ENABLED.set(false);
38 return Ok(Guard::Off);
39 }
40
41 for (name, exporter) in config.exporter {
42 check_exporter_name(&name)?;
43 run_exporter(exporter, &handle)?;
44 }
45 let _ignored = ckb_metrics::METRICS_SERVICE_ENABLED.set(true);
49
50 Ok(Guard::On)
51}
52
53fn check_exporter_name(name: &str) -> Result<(), String> {
54 strings::check_if_identifier_is_valid(name)
55}
56
57fn run_exporter(exporter: Exporter, handle: &Handle) -> Result<(), String> {
58 let Exporter { target } = exporter;
59 match target {
60 Target::Prometheus { listen_address } => {
61 let addr = listen_address
62 .parse::<SocketAddr>()
63 .map_err(|err| format!("failed to parse listen_address because {err}"))?;
64 let make_svc = service_fn(start_prometheus_service);
65 ckb_logger::info!("Start prometheus exporter at {}", addr);
66 handle.spawn(async move {
67 let listener = TcpListener::bind(&addr).await.unwrap();
68 let server = auto::Builder::new(TokioExecutor::new());
69 let graceful = GracefulShutdown::new();
70 let stop_rx: CancellationToken = new_tokio_exit_rx();
71 loop {
72 tokio::select! {
73 conn = listener.accept() => {
74 let (stream, _) = match conn {
75 Ok(conn) => conn,
76 Err(e) => {
77 eprintln!("accept error: {}", e);
78 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
79 continue;
80 }
81 };
82 let stream = hyper_util::rt::TokioIo::new(Box::pin(stream));
83 let conn = server.serve_connection_with_upgrades(stream, make_svc);
84
85 let conn = graceful.watch(conn.into_owned());
86 tokio::spawn(async move {
87 if let Err(err) = conn.await {
88 info!("connection error: {}", err);
89 }
90 });
91 },
92 _ = stop_rx.cancelled() => {
93 info!("Prometheus server received exit signal; exit now");
94 break;
95 }
96 }
97 }
98 drop(listener);
99 graceful.shutdown().await;
100 });
101 }
102 }
103 Ok(())
104}
105
106async fn start_prometheus_service(
107 req: Request<hyper::body::Incoming>,
108) -> Result<Response<Full<Bytes>>, HyperError> {
109 Ok(match (req.method(), req.uri().path()) {
110 (&Method::GET, "/") => {
111 let mut buffer = vec![];
112 let encoder = prometheus::TextEncoder::new();
113 let metric_families = ckb_metrics::gather();
114 encoder.encode(&metric_families, &mut buffer).unwrap();
115 Response::builder()
116 .status(200)
117 .header(CONTENT_TYPE, encoder.format_type())
118 .body(Full::new(Bytes::from(buffer)))
119 }
120 _ => Response::builder()
121 .status(404)
122 .body(Full::from("Page Not Found")),
123 }
124 .unwrap())
125}