commonware_runtime/tokio/
telemetry.rs

1//! Utilities for collecting and reporting telemetry data.
2
3use super::{
4    tracing::{export, Config},
5    Context,
6};
7use crate::{Metrics, Spawner};
8use axum::{
9    body::Body,
10    http::{header, Response, StatusCode},
11    routing::get,
12    serve, Extension, Router,
13};
14use std::net::SocketAddr;
15use tokio::net::TcpListener;
16use tracing::Level;
17use tracing_subscriber::{layer::SubscriberExt, Registry};
18
19/// Initialize telemetry with the given configuration.
20pub fn init(context: Context, level: Level, metrics: Option<SocketAddr>, traces: Option<Config>) {
21    // Create fmt layer for logging
22    let fmt_layer = tracing_subscriber::fmt::layer()
23        .json()
24        .with_line_number(true)
25        .with_thread_ids(true)
26        .with_file(true)
27        .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
28
29    // Create a filter layer to set the maximum level to INFO
30    let filter = tracing_subscriber::EnvFilter::new(level.to_string());
31
32    // Expose metrics over HTTP
33    if let Some(cfg) = metrics {
34        context
35            .with_label("metrics")
36            .spawn(move |context| async move {
37                // Create a tokio listener for the metrics server.
38                //
39                // We explicitly avoid using a runtime `Listener` because
40                // it will track bandwidth used for metrics and apply a policy
41                // for read/write timeouts fit for a p2p network.
42                let listener = TcpListener::bind(cfg)
43                    .await
44                    .expect("Failed to bind metrics server");
45
46                // Create a router for the metrics server
47                let app = Router::new()
48                    .route(
49                        "/metrics",
50                        get(|Extension(ctx): Extension<Context>| async move {
51                            Response::builder()
52                                .status(StatusCode::OK)
53                                .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
54                                .body(Body::from(ctx.encode()))
55                                .expect("Failed to create response")
56                        }),
57                    )
58                    .layer(Extension(context));
59
60                // Serve the metrics over HTTP.
61                //
62                // `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
63                // it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
64                serve(listener, app.into_make_service())
65                    .await
66                    .expect("Could not serve metrics");
67            });
68    }
69
70    // Combine layers into a single subscriber
71    if let Some(cfg) = traces {
72        // Initialize tracing
73        let tracer = export(cfg).expect("Failed to initialize tracer");
74
75        // Create OpenTelemetry layer for tracing
76        let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer);
77
78        // Set the global subscriber
79        let subscriber = Registry::default()
80            .with(filter)
81            .with(fmt_layer)
82            .with(telemetry_layer);
83        tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
84    } else {
85        // Set the global subscriber
86        let subscriber = Registry::default().with(filter).with(fmt_layer);
87        tracing::subscriber::set_global_default(subscriber).expect("Failed to set subscriber");
88    };
89}