Skip to main content

turn_server/
prometheus.rs

1use std::sync::LazyLock;
2
3use anyhow::Result;
4use axum::{
5    Router,
6    http::{StatusCode, header::CONTENT_TYPE},
7    response::IntoResponse,
8    routing::get,
9};
10
11use prometheus::{
12    Encoder, IntCounter, IntGauge, TextEncoder, register_int_counter, register_int_gauge,
13};
14
15use tokio::net::TcpListener;
16
17use crate::{
18    config::Config,
19    server::transport::Transport,
20    statistics::{Counts, Number, Stats},
21};
22
23// The `register_int_counter` macro would be too long if written out in full,
24// with too many line breaks after formatting, and this is wrapped directly into
25// a macro again.
26macro_rules! counter {
27    ($prefix:expr, $operation:expr, $dst:expr) => {
28        register_int_counter!(
29            format!("{}_{}_{}", $prefix, $operation, $dst),
30            format!("The {} amount of {} {}", $prefix, $dst, $operation)
31        )
32    };
33}
34
35pub static METRICS: LazyLock<Metrics> = LazyLock::new(Metrics::default);
36
37impl Number for IntCounter {
38    fn add(&self, value: usize) {
39        self.inc_by(value as u64);
40    }
41
42    fn get(&self) -> usize {
43        IntCounter::get(self) as usize
44    }
45}
46
47impl Counts<IntCounter> {
48    fn new(prefix: &str) -> Result<Self> {
49        Ok(Self {
50            received_bytes: counter!(prefix, "received", "bytes")?,
51            send_bytes: counter!(prefix, "sent", "bytes")?,
52            received_pkts: counter!(prefix, "received", "packets")?,
53            send_pkts: counter!(prefix, "sent", "packets")?,
54            error_pkts: counter!(prefix, "error", "packets")?,
55        })
56    }
57}
58
59/// Summarized metrics data for Global/TCP/UDP.
60pub struct Metrics {
61    pub allocated: IntGauge,
62    pub total: Counts<IntCounter>,
63    pub tcp: Counts<IntCounter>,
64    pub udp: Counts<IntCounter>,
65}
66
67impl Default for Metrics {
68    fn default() -> Self {
69        Self::new().expect("Unable to initialize Prometheus metrics data!")
70    }
71}
72
73impl Metrics {
74    fn new() -> Result<Self> {
75        Ok(Self {
76            total: Counts::new("total")?,
77            tcp: Counts::new("tcp")?,
78            udp: Counts::new("udp")?,
79            allocated: register_int_gauge!(
80                "allocated",
81                "The number of allocated ports, count = 16383"
82            )?,
83        })
84    }
85
86    pub fn add(&self, transport: Transport, payload: &Stats) {
87        self.total.add(payload);
88
89        if transport == Transport::Tcp {
90            self.tcp.add(payload);
91        } else {
92            self.udp.add(payload);
93        }
94    }
95}
96
97/// Generate prometheus metrics data that externally needs to be exposed to
98/// the `/metrics` route.
99fn generate_metrics(buf: &mut Vec<u8>) -> Result<()> {
100    TextEncoder::new().encode(&prometheus::gather(), buf)?;
101
102    Ok(())
103}
104
105pub async fn start_server(config: Config) -> Result<()> {
106    if let Some(config) = config.prometheus {
107        let mut metrics_bytes = Vec::with_capacity(4096);
108
109        let app = Router::new().route(
110            "/metrics",
111            get(|| async move {
112                metrics_bytes.clear();
113
114                if generate_metrics(&mut metrics_bytes).is_err() {
115                    StatusCode::INTERNAL_SERVER_ERROR.into_response()
116                } else {
117                    ([(CONTENT_TYPE, "text/plain")], metrics_bytes).into_response()
118                }
119            }),
120        );
121
122        #[cfg(feature = "ssl")]
123        if let Some(ssl) = &config.ssl {
124            let server = axum_server::bind_rustls(
125                config.listen,
126                axum_server::tls_rustls::RustlsConfig::from_pem_chain_file(
127                    ssl.certificate_chain.clone(),
128                    ssl.private_key.clone(),
129                )
130                .await?,
131            );
132
133            log::info!("prometheus server listening={:?}", config.listen);
134
135            server.serve(app.into_make_service()).await?;
136            return Ok(());
137        }
138
139        {
140            let listener = TcpListener::bind(config.listen).await?;
141
142            log::info!("prometheus server listening={:?}", config.listen);
143
144            axum::serve(listener, app).await?;
145        }
146    } else {
147        std::future::pending().await
148    };
149
150    Ok(())
151}