turn_server/
prometheus.rs1use std::sync::LazyLock;
2
3use anyhow::Result;
4use axum::{
5 Router,
6 http::{StatusCode, header::CONTENT_TYPE},
7 response::IntoResponse,
8 routing::get,
9};
10
11use prometheus::{
12 Encoder, IntCounter, IntGauge, TextEncoder, register_int_counter, register_int_gauge,
13};
14
15use tokio::net::TcpListener;
16
17use crate::{
18 config::Config,
19 server::transport::Transport,
20 statistics::{Counts, Number, Stats},
21};
22
23macro_rules! counter {
27 ($prefix:expr, $operation:expr, $dst:expr) => {
28 register_int_counter!(
29 format!("{}_{}_{}", $prefix, $operation, $dst),
30 format!("The {} amount of {} {}", $prefix, $dst, $operation)
31 )
32 };
33}
34
35pub static METRICS: LazyLock<Metrics> = LazyLock::new(Metrics::default);
36
37impl Number for IntCounter {
38 fn add(&self, value: usize) {
39 self.inc_by(value as u64);
40 }
41
42 fn get(&self) -> usize {
43 IntCounter::get(self) as usize
44 }
45}
46
47impl Counts<IntCounter> {
48 fn new(prefix: &str) -> Result<Self> {
49 Ok(Self {
50 received_bytes: counter!(prefix, "received", "bytes")?,
51 send_bytes: counter!(prefix, "sent", "bytes")?,
52 received_pkts: counter!(prefix, "received", "packets")?,
53 send_pkts: counter!(prefix, "sent", "packets")?,
54 error_pkts: counter!(prefix, "error", "packets")?,
55 })
56 }
57}
58
59pub struct Metrics {
61 pub allocated: IntGauge,
62 pub total: Counts<IntCounter>,
63 pub tcp: Counts<IntCounter>,
64 pub udp: Counts<IntCounter>,
65}
66
67impl Default for Metrics {
68 fn default() -> Self {
69 Self::new().expect("Unable to initialize Prometheus metrics data!")
70 }
71}
72
73impl Metrics {
74 fn new() -> Result<Self> {
75 Ok(Self {
76 total: Counts::new("total")?,
77 tcp: Counts::new("tcp")?,
78 udp: Counts::new("udp")?,
79 allocated: register_int_gauge!(
80 "allocated",
81 "The number of allocated ports, count = 16383"
82 )?,
83 })
84 }
85
86 pub fn add(&self, transport: Transport, payload: &Stats) {
87 self.total.add(payload);
88
89 if transport == Transport::Tcp {
90 self.tcp.add(payload);
91 } else {
92 self.udp.add(payload);
93 }
94 }
95}
96
97fn generate_metrics(buf: &mut Vec<u8>) -> Result<()> {
100 TextEncoder::new().encode(&prometheus::gather(), buf)?;
101
102 Ok(())
103}
104
105pub async fn start_server(config: Config) -> Result<()> {
106 if let Some(config) = config.prometheus {
107 let mut metrics_bytes = Vec::with_capacity(4096);
108
109 let app = Router::new().route(
110 "/metrics",
111 get(|| async move {
112 metrics_bytes.clear();
113
114 if generate_metrics(&mut metrics_bytes).is_err() {
115 StatusCode::INTERNAL_SERVER_ERROR.into_response()
116 } else {
117 ([(CONTENT_TYPE, "text/plain")], metrics_bytes).into_response()
118 }
119 }),
120 );
121
122 #[cfg(feature = "ssl")]
123 if let Some(ssl) = &config.ssl {
124 let server = axum_server::bind_rustls(
125 config.listen,
126 axum_server::tls_rustls::RustlsConfig::from_pem_chain_file(
127 ssl.certificate_chain.clone(),
128 ssl.private_key.clone(),
129 )
130 .await?,
131 );
132
133 log::info!("prometheus server listening={:?}", config.listen);
134
135 server.serve(app.into_make_service()).await?;
136 return Ok(());
137 }
138
139 {
140 let listener = TcpListener::bind(config.listen).await?;
141
142 log::info!("prometheus server listening={:?}", config.listen);
143
144 axum::serve(listener, app).await?;
145 }
146 } else {
147 std::future::pending().await
148 };
149
150 Ok(())
151}