commonware_runtime/tokio/telemetry.rs
1//! Utilities for collecting and reporting telemetry data.
2
3use super::{
4 tracing::{export, Config},
5 Context,
6};
7use crate::{Metrics, Spawner};
8use axum::{
9 body::Body,
10 http::{header, Response, StatusCode},
11 routing::get,
12 serve, Extension, Router,
13};
14use cfg_if::cfg_if;
15use std::net::SocketAddr;
16use tokio::net::TcpListener;
17use tracing::Level;
18use tracing_subscriber::{layer::SubscriberExt, Layer, Registry};
19
20/// Logging configuration.
21pub struct Logging {
22 /// The level of logging to use.
23 pub level: Level,
24
25 /// Whether to log in JSON format.
26 ///
27 /// This is useful for structured logging in server-based environments.
28 /// If you are running things locally, it is recommended to use
29 /// `json = false` to get a human-readable format.
30 pub json: bool,
31}
32
33/// Initialize telemetry with the given configuration.
34///
35/// If `metrics` is provided, starts serving metrics at the given address at `/metrics`.
36/// If `traces` is provided, enables OpenTelemetry trace export.
37pub fn init(
38 context: Context,
39 logging: Logging,
40 metrics: Option<SocketAddr>,
41 traces: Option<Config>,
42) {
43 // Create a filter layer to set the maximum level to INFO
44 let filter = tracing_subscriber::EnvFilter::new(logging.level.to_string());
45
46 // Create fmt layer for logging
47 let log_layer = tracing_subscriber::fmt::layer()
48 .with_line_number(true)
49 .with_thread_ids(true)
50 .with_file(true)
51 .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
52
53 // Set the format to JSON (if specified)
54 let log_layer = if logging.json {
55 log_layer.json().boxed()
56 } else {
57 log_layer.compact().boxed()
58 };
59
60 // Create OpenTelemetry layer for tracing
61 let trace_layer = traces.map(|cfg| {
62 let tracer = export(cfg).expect("Failed to initialize tracer");
63 tracing_opentelemetry::layer().with_tracer(tracer)
64 });
65
66 // Create tracing registry.
67 cfg_if! {
68 if #[cfg(feature = "tokio-console")] {
69 let console_layer = console_subscriber::spawn();
70 let registry = Registry::default()
71 .with(filter)
72 .with(log_layer)
73 .with(trace_layer)
74 .with(console_layer);
75 } else {
76 let registry = Registry::default()
77 .with(filter)
78 .with(log_layer)
79 .with(trace_layer);
80 }
81 }
82
83 // Set the global subscriber
84 tracing::subscriber::set_global_default(registry).expect("Failed to set subscriber");
85
86 // Expose metrics over HTTP
87 if let Some(cfg) = metrics {
88 context
89 .with_label("metrics")
90 .spawn(move |context| async move {
91 // Create a tokio listener for the metrics server.
92 //
93 // We explicitly avoid using a runtime `Listener` because
94 // it will track bandwidth used for metrics and apply a policy
95 // for read/write timeouts fit for a p2p network.
96 let listener = TcpListener::bind(cfg)
97 .await
98 .expect("Failed to bind metrics server");
99
100 // Create a router for the metrics server
101 let app = Router::new()
102 .route(
103 "/metrics",
104 get(|Extension(ctx): Extension<Context>| async move {
105 Response::builder()
106 .status(StatusCode::OK)
107 .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
108 .body(Body::from(ctx.encode()))
109 .expect("Failed to create response")
110 }),
111 )
112 .layer(Extension(context));
113
114 // Serve the metrics over HTTP.
115 //
116 // `serve` will spawn its own tasks using `tokio::spawn` (and there is no way to specify
117 // it to do otherwise). These tasks will not be tracked like metrics spawned using `Spawner`.
118 serve(listener, app.into_make_service())
119 .await
120 .expect("Could not serve metrics");
121 });
122 }
123}