Skip to main content

commonware_runtime/tokio/
telemetry.rs

1//! Utilities for collecting and reporting telemetry data.
2
3use super::{
4    tracing::{export, Config},
5    Context,
6};
7use crate::{Metrics as _, Spawner, Supervisor as _};
8use axum::{
9    body::Body,
10    http::{header, Response, StatusCode},
11    routing::get,
12    serve, Extension, Router,
13};
14use cfg_if::cfg_if;
15use std::{net::SocketAddr, sync::Arc};
16use tokio::net::TcpListener;
17use tracing::Level;
18use tracing_subscriber::{layer::SubscriberExt, Layer, Registry};
19
20/// Logging configuration.
21pub struct Logging {
22    /// The level of logging to use.
23    pub level: Level,
24
25    /// Whether to log in JSON format.
26    ///
27    /// This is useful for structured logging in server-based environments.
28    /// If you are running things locally, it is recommended to use
29    /// `json = false` to get a human-readable format.
30    pub json: bool,
31}
32
33/// Initialize telemetry with the given configuration.
34///
35/// If `metrics` is provided, starts serving metrics at the given address at `/metrics`.
36/// If `traces` is provided, enables OpenTelemetry trace export.
37pub fn init(
38    context: Context,
39    logging: Logging,
40    metrics: Option<SocketAddr>,
41    traces: Option<Config>,
42) {
43    // Create a filter layer to set the maximum level to INFO
44    let filter = tracing_subscriber::EnvFilter::new(logging.level.to_string());
45
46    // Create fmt layer for logging
47    let log_layer = tracing_subscriber::fmt::layer()
48        .with_line_number(true)
49        .with_thread_ids(true)
50        .with_file(true)
51        .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
52
53    // Set the format to JSON (if specified)
54    let log_layer = if logging.json {
55        log_layer.json().boxed()
56    } else {
57        log_layer.compact().boxed()
58    };
59
60    // Create OpenTelemetry layer for tracing
61    let trace_layer = traces.map(|cfg| {
62        let tracer = export(cfg).expect("Failed to initialize tracer");
63        tracing_opentelemetry::layer().with_tracer(tracer)
64    });
65
66    // Create tracing registry.
67    cfg_if! {
68        if #[cfg(feature = "tokio-console")] {
69            let console_layer = console_subscriber::spawn();
70            let registry = Registry::default()
71                .with(filter)
72                .with(log_layer)
73                .with(trace_layer)
74                .with(console_layer);
75        } else {
76            let registry = Registry::default()
77                .with(filter)
78                .with(log_layer)
79                .with(trace_layer);
80        }
81    }
82
83    // Set the global subscriber
84    tracing::subscriber::set_global_default(registry).expect("Failed to set subscriber");
85
86    // Expose metrics over HTTP
87    if let Some(cfg) = metrics {
88        context.child("metrics").spawn(move |context| async move {
89            // Create a tokio listener for the metrics server.
90            //
91            // We explicitly avoid using a runtime `Listener` because
92            // it will track bandwidth used for metrics and apply a policy
93            // for read/write timeouts fit for a p2p network.
94            let listener = TcpListener::bind(cfg)
95                .await
96                .expect("Failed to bind metrics server");
97
98            // Create a router for the metrics server
99            let shared = Arc::new(context);
100            let app = Router::new()
101                .route(
102                    "/metrics",
103                    get(|Extension(ctx): Extension<Arc<Context>>| async move {
104                        Response::builder()
105                            .status(StatusCode::OK)
106                            .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
107                            .body(Body::from(ctx.encode()))
108                            .expect("Failed to create response")
109                    }),
110                )
111                .layer(Extension(shared));
112
113            // Serve the metrics over HTTP.
114            //
115            // `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
116            // it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
117            serve(listener, app.into_make_service())
118                .await
119                .expect("Could not serve metrics");
120        });
121    }
122}