commonware_runtime/tokio/
telemetry.rs

1//! Utilities for collecting and reporting telemetry data.
2
3use super::{
4    tracing::{export, Config},
5    Context,
6};
7use crate::{Metrics, Spawner};
8use axum::{
9    body::Body,
10    http::{header, Response, StatusCode},
11    routing::get,
12    serve, Extension, Router,
13};
14use std::net::SocketAddr;
15use tokio::net::TcpListener;
16use tracing::Level;
17use tracing_subscriber::{layer::SubscriberExt, Layer, Registry};
18
19/// Logging configuration.
20pub struct Logging {
21    /// The level of logging to use.
22    pub level: Level,
23
24    /// Whether to log in JSON format.
25    ///
26    /// This is useful for structured logging in server-based environments.
27    /// If you are running things locally, it is recommended to use
28    /// `json = false` to get a human-readable format.
29    pub json: bool,
30}
31
32/// Initialize telemetry with the given configuration.
33pub fn init(
34    context: Context,
35    logging: Logging,
36    metrics: Option<SocketAddr>,
37    traces: Option<Config>,
38) {
39    // Create a filter layer to set the maximum level to INFO
40    let filter = tracing_subscriber::EnvFilter::new(logging.level.to_string());
41
42    // Create fmt layer for logging
43    let log_layer = tracing_subscriber::fmt::layer()
44        .with_line_number(true)
45        .with_thread_ids(true)
46        .with_file(true)
47        .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
48
49    // Set the format to JSON (if specified)
50    let log_layer = if logging.json {
51        log_layer.json().boxed()
52    } else {
53        log_layer.pretty().boxed()
54    };
55
56    // Create OpenTelemetry layer for tracing
57    let trace_layer = traces.map(|cfg| {
58        let tracer = export(cfg).expect("Failed to initialize tracer");
59        tracing_opentelemetry::layer().with_tracer(tracer)
60    });
61
62    // Set the global subscriber
63    let registry = Registry::default()
64        .with(filter)
65        .with(log_layer)
66        .with(trace_layer);
67    tracing::subscriber::set_global_default(registry).expect("Failed to set subscriber");
68
69    // Expose metrics over HTTP
70    if let Some(cfg) = metrics {
71        context
72            .with_label("metrics")
73            .spawn(move |context| async move {
74                // Create a tokio listener for the metrics server.
75                //
76                // We explicitly avoid using a runtime `Listener` because
77                // it will track bandwidth used for metrics and apply a policy
78                // for read/write timeouts fit for a p2p network.
79                let listener = TcpListener::bind(cfg)
80                    .await
81                    .expect("Failed to bind metrics server");
82
83                // Create a router for the metrics server
84                let app = Router::new()
85                    .route(
86                        "/metrics",
87                        get(|Extension(ctx): Extension<Context>| async move {
88                            Response::builder()
89                                .status(StatusCode::OK)
90                                .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
91                                .body(Body::from(ctx.encode()))
92                                .expect("Failed to create response")
93                        }),
94                    )
95                    .layer(Extension(context));
96
97                // Serve the metrics over HTTP.
98                //
99                // `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
100                // it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
101                serve(listener, app.into_make_service())
102                    .await
103                    .expect("Could not serve metrics");
104            });
105    }
106}