commonware_runtime/tokio/telemetry.rs
1//! Utilities for collecting and reporting telemetry data.
2
3use super::{
4 tracing::{export, Config},
5 Context,
6};
7use crate::{Metrics, Spawner};
8use axum::{
9 body::Body,
10 http::{header, Response, StatusCode},
11 routing::get,
12 serve, Extension, Router,
13};
14use std::net::SocketAddr;
15use tokio::net::TcpListener;
16use tracing::Level;
17use tracing_subscriber::{layer::SubscriberExt, Layer, Registry};
18
19/// Logging configuration.
20pub struct Logging {
21 /// The level of logging to use.
22 pub level: Level,
23
24 /// Whether to log in JSON format.
25 ///
26 /// This is useful for structured logging in server-based environments.
27 /// If you are running things locally, it is recommended to use
28 /// `json = false` to get a human-readable format.
29 pub json: bool,
30}
31
32/// Initialize telemetry with the given configuration.
33///
34/// If `metrics` is provided, starts serving metrics at the given address at `/metrics`.
35/// If `traces` is provided, enables OpenTelemetry trace export.
36pub fn init(
37 context: Context,
38 logging: Logging,
39 metrics: Option<SocketAddr>,
40 traces: Option<Config>,
41) {
42 // Create a filter layer to set the maximum level to INFO
43 let filter = tracing_subscriber::EnvFilter::new(logging.level.to_string());
44
45 // Create fmt layer for logging
46 let log_layer = tracing_subscriber::fmt::layer()
47 .with_line_number(true)
48 .with_thread_ids(true)
49 .with_file(true)
50 .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
51
52 // Set the format to JSON (if specified)
53 let log_layer = if logging.json {
54 log_layer.json().boxed()
55 } else {
56 log_layer.compact().boxed()
57 };
58
59 // Create OpenTelemetry layer for tracing
60 let trace_layer = traces.map(|cfg| {
61 let tracer = export(cfg).expect("Failed to initialize tracer");
62 tracing_opentelemetry::layer().with_tracer(tracer)
63 });
64
65 // Set the global subscriber
66 let registry = Registry::default()
67 .with(filter)
68 .with(log_layer)
69 .with(trace_layer);
70 tracing::subscriber::set_global_default(registry).expect("Failed to set subscriber");
71
72 // Expose metrics over HTTP
73 if let Some(cfg) = metrics {
74 context
75 .with_label("metrics")
76 .spawn(move |context| async move {
77 // Create a tokio listener for the metrics server.
78 //
79 // We explicitly avoid using a runtime `Listener` because
80 // it will track bandwidth used for metrics and apply a policy
81 // for read/write timeouts fit for a p2p network.
82 let listener = TcpListener::bind(cfg)
83 .await
84 .expect("Failed to bind metrics server");
85
86 // Create a router for the metrics server
87 let app = Router::new()
88 .route(
89 "/metrics",
90 get(|Extension(ctx): Extension<Context>| async move {
91 Response::builder()
92 .status(StatusCode::OK)
93 .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
94 .body(Body::from(ctx.encode()))
95 .expect("Failed to create response")
96 }),
97 )
98 .layer(Extension(context));
99
100 // Serve the metrics over HTTP.
101 //
102 // `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
103 // it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
104 serve(listener, app.into_make_service())
105 .await
106 .expect("Could not serve metrics");
107 });
108 }
109}